# Spark Structured Streaming - Demo
## Robotic Arm

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import io
from pyspark.sql.functions import *
import time
import json
import struct
import requests 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka-clients:2.6.0 pyspark-shell'
                                    
spark = (SparkSession.builder 
    .master("local[*]")
    .appName("test")
    .getOrCreate()
        )

spark

set up the environment variables

In [2]:
topic = 'RoboticArm'
servers = "kafka:9092"

### Let's create the streaming Data Frames using the data in the kafka topics

In [3]:
raw_RoboticArm_df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", topic)
  .load())

In [4]:
raw_RoboticArm_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
from pyspark.sql.types import *

RoboticArm_schema = StructType([
    StructField("id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("stressLevel", IntegerType(), True),
    StructField("ts", TimestampType(), True)])


In [6]:
RoboticArm_sdf=(raw_RoboticArm_df
                      .select(from_json(col("value").cast("string"), RoboticArm_schema).alias("value"))
                      .select("value.*"))

In [7]:
RoboticArm_sdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- ts: timestamp (nullable = true)



### to gain some confidence, let's first inspect the content of the stream of smoke event

In [8]:
basic_query = (RoboticArm_sdf
    .writeStream
    .format("memory") # this is for debug purpose only! DO NOT USE IN PRODUCTION
    .queryName("sinkTable")
    .start())

In [10]:
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+---+-----------+-----------+-------------------+
| id|     status|stressLevel|                 ts|
+---+-----------+-----------+-------------------+
|  1|placingGood|          3|2021-10-18 14:50:30|
|  2|placingGood|          3|2021-10-18 14:50:30|
|  2| movingGood|          9|2021-10-18 14:50:20|
|  2|goodGrasped|          5|2021-10-18 14:50:19|
|  2|      ready|          0|2021-10-18 14:50:17|
+---+-----------+-----------+-------------------+
only showing top 5 rows



do not forget to stop queries that you are not using

In [11]:
basic_query.stop()

## E2

Write a continuous query that emits the max stress for each arm.

### the SQL sytyle

In [None]:
# create a logic table on top of the streaming data frame
RoboticArm_sdf.createTempView("RoboticArm") # this time we will not clean it up, because we use it in the next queries

**NOTE**: the following query gives *intentionally* an error

In [None]:
query_string = """
SELECT id, MAX(stressLevel) 
FROM RoboticArm
GROUP BY id
"""

# write your query in SQL, register it and start it
e2 = (spark.sql(query_string)
                     .writeStream
                     .format("memory")
                     .outputMode("complete") 
                     .queryName("sinkTable")
                     .start())

In [None]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) # without ORDER BY TS DESC because the result in the table is already only the most recent

In [None]:
# clean up
e2.stop()

### The DataFrame style

In [None]:
e2bis = (RoboticArm_sdf
                     .groupBy("id")
                     .max()
                     .writeStream
                     .format("memory")
                     .outputMode("complete") # 
                     .queryName("sinkTable")
                     .start())

In [None]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) # woithout ORDER BY TS DESC because the result in the table is already only the most recent

In [None]:
# clean up
e2bis.stop()

# E3
A continuous query that emits the average stress level between a pick (status==goodGrasped) and a place (status==placingGood).
iniziamo con una cosa più semplice
## E3.1
per ora cercate di trovare un braccio in status moving seguito dallo stesso braccio in status placing e restituire lo stress level in status moving e lo stress level in status placing (edited) 

In [62]:
moving_sdf = (RoboticArm_sdf
              .withWatermark("ts","30 seconds")
              .where("status='movingGood'")
              .withColumnRenamed("id","idMoving")
              .withColumnRenamed("status","statusMoving")
              .withColumnRenamed("stressLevel","stressLevelMoving")
              .withColumnRenamed("ts","tsMoving")
             )
placing_sdf = (RoboticArm_sdf
              .withWatermark("ts","30 seconds")
               .where("status='placingGood'")
              .withColumnRenamed("id","idPlacing")
              .withColumnRenamed("status","statusPlacing")
              .withColumnRenamed("stressLevel","stressLevelPlacing")
              .withColumnRenamed("ts","tsPlacing")
              )

In [63]:
joins_sdf = moving_sdf.join(placing_sdf,
                            expr("""
                            idPlacing=idMoving AND
                            tsPlacing > tsMoving AND
                            tsPlacing <= tsMoving + interval 14 seconds
                            """))

In [64]:
e31 = (joins_sdf
                     .writeStream
                     .format("memory")
                     #.outputMode("complete") # 
                     .queryName("sinkTable")
                     .start())

In [67]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) 

+--------+------------+-----------------+-------------------+---------+-------------+------------------+-------------------+
|idMoving|statusMoving|stressLevelMoving|           tsMoving|idPlacing|statusPlacing|stressLevelPlacing|          tsPlacing|
+--------+------------+-----------------+-------------------+---------+-------------+------------------+-------------------+
|       1|  movingGood|                7|2021-10-18 14:22:07|        1|  placingGood|                 3|2021-10-18 14:22:20|
|       1|  movingGood|                7|2021-10-18 14:22:40|        1|  placingGood|                 3|2021-10-18 14:22:53|
|       1|  movingGood|                7|2021-10-18 14:23:13|        1|  placingGood|                 3|2021-10-18 14:23:26|
|       1|  movingGood|                7|2021-10-18 14:23:46|        1|  placingGood|                 3|2021-10-18 14:23:59|
|       1|  movingGood|                7|2021-10-18 14:24:19|        1|  placingGood|                 3|2021-10-18 14:24:32|


In [68]:
spark.sql("SELECT idPlacing, tsPlacing, count(*) FROM sinkTable GROUP BY idPlacing, tsPlacing ORDER BY tsPlacing DESC").show(5) 

+---------+-------------------+--------+
|idPlacing|          tsPlacing|count(1)|
+---------+-------------------+--------+
|        2|2021-10-18 15:15:21|       1|
|        1|2021-10-18 15:15:21|       1|
|        1|2021-10-18 15:14:48|       1|
|        2|2021-10-18 15:14:48|       1|
|        2|2021-10-18 15:14:15|       1|
+---------+-------------------+--------+
only showing top 5 rows



In [78]:
e31.stop()

In [77]:
e31.recentProgress

[{'id': 'd4b166a1-3bfd-4b96-b8b1-acfcb4f62d9e',
  'runId': '63843925-e455-473e-8881-4486dfa2d89a',
  'name': 'sinkTable',
  'timestamp': '2021-10-18T15:36:45.329Z',
  'batchId': 201,
  'numInputRows': 10,
  'inputRowsPerSecond': 1.3598041881968996,
  'processedRowsPerSecond': 1.530690341343946,
  'durationMs': {'addBatch': 6354,
   'getBatch': 0,
   'latestOffset': 1,
   'queryPlanning': 125,
   'triggerExecution': 6533,
   'walCommit': 32},
  'eventTime': {'avg': '2021-10-18T15:36:41.500Z',
   'max': '2021-10-18T15:36:43.000Z',
   'min': '2021-10-18T15:36:40.000Z',
   'watermark': '2021-10-18T15:35:40.000Z'},
  'stateOperators': [{'numRowsTotal': 10,
    'numRowsUpdated': 2,
    'memoryUsedBytes': 330336,
    'customMetrics': {'loadedMapCacheHitCount': 80400,
     'loadedMapCacheMissCount': 0,
     'stateOnCurrentVersionSizeBytes': 37776}}],
  'sources': [{'description': 'KafkaV2[Subscribe[RoboticArm]]',
    'startOffset': {'RoboticArm': {'0': 1621}},
    'endOffset': {'RoboticArm': {

# E3.2

Ora potete trovare un braccio in status goodGrasped sseguito dallo stesso braccio in status moving seguito dallo stesso braccio in status placing e restituire l'id del braccio e (stressLevelGoodGrasped+stressLevelMoving+stressLevelPlacing)/3 (che è la media degli stress level nei tre stati)

In [86]:
grasped_sdf = (RoboticArm_sdf
              .withWatermark("ts","1 minute")
               .where("status='goodGrasped'")
              .withColumnRenamed("id","idGrasped")
              .withColumnRenamed("status","statusGrasped")
              .withColumnRenamed("stressLevel","stressLevelGrasped")
              .withColumnRenamed("ts","tsGrasped")
              )

moving_sdf = (RoboticArm_sdf
              .withWatermark("ts","1 minute")
              .where("status='movingGood'")
              .withColumnRenamed("id","idMoving")
              .withColumnRenamed("status","statusMoving")
              .withColumnRenamed("stressLevel","stressLevelMoving")
              .withColumnRenamed("ts","tsMoving")
             )
placing_sdf = (RoboticArm_sdf
              .withWatermark("ts","1 minute")
               .where("status='placingGood'")
              .withColumnRenamed("id","idPlacing")
              .withColumnRenamed("status","statusPlacing")
              .withColumnRenamed("stressLevel","stressLevelPlacing")
              .withColumnRenamed("ts","tsPlacing")
              )


In [87]:
join1_sdf = grasped_sdf.join(moving_sdf,
                            expr("""
                            idMoving=idGrasped AND
                            tsMoving > tsGrasped AND
                            tsMoving <= tsGrasped + interval 5 seconds
                            """))

join2_sdf = join1_sdf.join(placing_sdf,
                            expr("""
                            idPlacing=idMoving AND
                            tsPlacing > tsMoving AND
                            tsPlacing <= tsMoving + interval 14 seconds
                            """))

In [88]:
prj_sdf = (join2_sdf.select(
                col("idPlacing").alias("ID"),
                expr("(stressLevelGrasped+stressLevelMoving+stressLevelPlacing)/3 AS avgStressLevel")
))

In [89]:
e32 = (prj_sdf
                     .writeStream
                     .format("memory")
                     #.outputMode("complete") # 
                     .queryName("sinkTable")
                     .start())

In [95]:
# look up the most recent results
spark.sql("SELECT ID, COUNT(*) FROM sinkTable GROUP BY ID").show(10) 

+---+--------+
| ID|count(1)|
+---+--------+
|  1|     164|
|  2|     164|
+---+--------+



In [85]:
e32.stop()