# Spark Structured Streaming - Demo
## Robotic Arm

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import io
from pyspark.sql.functions import *
import time
import json
import struct
import requests 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka-clients:2.6.0 pyspark-shell'
                                    
spark = (SparkSession.builder 
    .master("local[*]")
    .appName("test")
    .getOrCreate()
        )

spark

set up the environment variables

In [3]:
topic = 'RoboticArm'
servers = "kafka:9092"

## Answers in Spark Structured Streaming 

Please refer to [epl_robotic-arm/readme.md](https://github.com/emanueledellavalle/streaming-data-analytics/tree/main/codes/epl_robotic-arm/readme.md) for the EPL version of the following queries.

Let's first try with the model proposed for EPL and see what happens. To get the data run [datagen1.ipynb](datagen1.ipynb)

### Let's create the streaming Data Frames using the data in the kafka topic

In [4]:
from pyspark.sql.types import *

roboticArm_schema = StructType([
    StructField("id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("stressLevel", IntegerType(), True),
    StructField("ts", TimestampType(), True)])

raw_roboticArm_df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", topic)
  .load())

roboticArm_sdf = (raw_roboticArm_df
                      .select(from_json(col("value").cast("string"), roboticArm_schema).alias("value"))
                      .select("value.*"))

In [5]:
roboticArm_sdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- ts: timestamp (nullable = true)



### to make sure that it works, let's first inspect the content of the stream 

In [6]:
basic_query = (roboticArm_sdf
    .writeStream
    .format("memory") # this is for debug purpose only! DO NOT USE IN PRODUCTION
    .queryName("sinkTable")
    .start())

run the following cell to see the most recent content of the sinkTable

In [8]:
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+---+-----------+-----------+-------------------+
| id|     status|stressLevel|                 ts|
+---+-----------+-----------+-------------------+
|  2|placingGood|          3|2022-11-01 14:59:01|
|  1|placingGood|          3|2022-11-01 14:59:01|
|  2| movingGood|          9|2022-11-01 14:58:51|
|  2|goodGrasped|          5|2022-11-01 14:58:50|
|  1| movingGood|          7|2022-11-01 14:58:48|
+---+-----------+-----------+-------------------+
only showing top 5 rows



do not forget to stop queries that you are not using

In [9]:
basic_query.stop()

# E1

> Propose how to model the streaming data generated by the robotic arms.

Let's first try with the model proposed for EPL and see what happens. 

# E2

> Write a continuous query that emits the max stress for each arm.

### the SQL sytyle

In [10]:
# create a logic table on top of the streaming data frame
roboticArm_sdf.createTempView("RoboticArm") # this time we will not clean it up, because we use it in the next queries

In [11]:
query_string = """
SELECT id, max(stressLevel) 
FROM RoboticArm 
GROUP BY id;
"""

# write your query in SQL, register it and start it
e2 = (spark.sql(query_string)
                     .writeStream
                     .format("memory")
                     .outputMode("complete") 
                     .queryName("sinkTable")
                     .start())

In [15]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) # woithout ORDER BY TS DESC because the result in the table is already only the most recent

+---+----------------+
| id|max(stressLevel)|
+---+----------------+
|  1|               7|
|  2|               9|
+---+----------------+



In [16]:
# clean up
e2.stop()

### The DataFrame style

In [17]:
# write your query in SQL, register it and start it
e2bis = (roboticArm_sdf
                     .groupBy("id")
                     .max()
                     .writeStream
                     .format("memory")
                     .outputMode("complete")  
                     .queryName("sinkTable")
                     .start())

In [18]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) # woithout ORDER BY TS DESC because the result in the table is already only the most recent

+---+----------------+
| id|max(stressLevel)|
+---+----------------+
|  1|               7|
|  2|               9|
+---+----------------+



In [19]:
# clean up
e2bis.stop()

# E3

> A continuous query that emits the average stress level between a pick (status==goodGrasped) and a place (status==placingGood).

Spark Structured Streaming does not support the EPL's operator `->` (that reads as *followed by*. We need to use a stream-to-stream join.

this is an hard task, let's simplify it

### E3.1

> A continuous query that emits the events between a moving (status==movingGood) and a place (status==placingGood)

NOTE: no request to average and only two consecutive events

In [20]:
# creating a streaming DataFrame with only event wher status='movingGood'
moving_sdf = (roboticArm_sdf
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("ts","tsMoving")
               )

In [21]:
# creating a streaming DataFrame with only event wher status='placingGood'

placing_sdf = (roboticArm_sdf
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

Join without the event-time constraint

In [22]:
join_sdf = (moving_sdf.join(
  placing_sdf, expr("""
    (idMoving == idPlacing) AND
    (tsMoving < tsPlacing )
    """
    )))

In [23]:
e3 = (join_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [29]:
spark.sql("SELECT * FROM sinkTable ORDER BY tsPlacing DESC").show(5,False) # note, I change ts in tsPlacing

+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
|idMoving|status    |stressLevel|tsMoving           |idPlacing|status     |stressLevel|tsPlacing          |
+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
|1       |movingGood|7          |2022-11-01 14:59:21|1        |placingGood|3          |2022-11-01 15:00:07|
|1       |movingGood|7          |2022-11-01 14:58:15|1        |placingGood|3          |2022-11-01 15:00:07|
|1       |movingGood|7          |2022-11-01 14:58:48|1        |placingGood|3          |2022-11-01 15:00:07|
|1       |movingGood|7          |2022-11-01 14:57:08|1        |placingGood|3          |2022-11-01 15:00:07|
|1       |movingGood|7          |2022-11-01 14:57:41|1        |placingGood|3          |2022-11-01 15:00:07|
+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
only showing top 5 rows



#### Discussion

Is this what we want?

Let's try to count how many joins we have here ...


In [30]:
spark.sql("SELECT idPlacing, tsPlacing, count(*) FROM sinkTable group by idPlacing, tsPlacing ORDER BY tsPlacing DESC").show(20,False) 

+---------+-------------------+--------+
|idPlacing|tsPlacing          |count(1)|
+---------+-------------------+--------+
|1        |2022-11-01 15:00:40|7       |
|2        |2022-11-01 15:00:40|7       |
|2        |2022-11-01 15:00:07|6       |
|1        |2022-11-01 15:00:07|6       |
|1        |2022-11-01 14:59:34|5       |
|2        |2022-11-01 14:59:34|5       |
|2        |2022-11-01 14:59:01|4       |
|1        |2022-11-01 14:59:01|4       |
|2        |2022-11-01 14:58:28|3       |
|1        |2022-11-01 14:58:28|3       |
|1        |2022-11-01 14:57:54|2       |
|2        |2022-11-01 14:57:54|2       |
|1        |2022-11-01 14:57:21|1       |
|2        |2022-11-01 14:57:21|1       |
+---------+-------------------+--------+



**Far too many!** ... and growing :-(

O-: !!! ... and also **the state is growing** !!! :-O

In [31]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3.lastProgress, indent=4))
    print(e3.status)
    time.sleep(1)
    clear_output(wait=True)

{
    "id": "b532ef1d-18f1-486a-90c4-6c9a73a8ac8c",
    "runId": "bbd5167e-d221-459a-903d-18be9c1bee5c",
    "name": "sinkTable",
    "timestamp": "2022-11-01T15:00:55.385Z",
    "batchId": 2,
    "numInputRows": 8,
    "inputRowsPerSecond": 0.6814890535820768,
    "processedRowsPerSecond": 0.6848728704734184,
    "durationMs": {
        "addBatch": 11493,
        "getBatch": 0,
        "latestOffset": 2,
        "queryPlanning": 96,
        "triggerExecution": 11681,
        "walCommit": 21
    },
    "stateOperators": [
        {
            "numRowsTotal": 28,
            "numRowsUpdated": 0,
            "memoryUsedBytes": 329664,
            "customMetrics": {
                "loadedMapCacheHitCount": 800,
                "loadedMapCacheMissCount": 0,
                "stateOnCurrentVersionSizeBytes": 42624
            }
        }
    ],
    "sources": [
        {
            "description": "KafkaV2[Subscribe[RoboticArm]]",
            "startOffset": {
                "RoboticArm": 

KeyboardInterrupt: 

Monitor for a minute the field `"numRowsTotal"` in `"stateOperators"`.

**We need to add watermarks and a time constraint for state cleanup!**

In [32]:
e3.stop()

In [39]:
movingW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placingW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

In [40]:
joinTC_sdf = (movingW_sdf.join(
  placingW_sdf, expr("""
    (idMoving == idPlacing) AND
    (tsPlacing > tsMoving ) AND
    (tsPlacing < tsMoving + interval 14 seconds )""" # TIME CONSTRAIN ADDED HERE (considering also that the time flows at half of the speed) !!!
    )))

In [43]:
e3TC = (joinTC_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [45]:
spark.sql("SELECT idPlacing, tsPlacing, count(*) FROM sinkTable group by idPlacing, tsPlacing ORDER BY tsPlacing DESC").show(20,False) # note, I change ts in tsTemp

+---------+-------------------+--------+
|idPlacing|tsPlacing          |count(1)|
+---------+-------------------+--------+
|2        |2022-11-01 15:03:59|1       |
|1        |2022-11-01 15:03:59|1       |
|1        |2022-11-01 15:03:26|1       |
|2        |2022-11-01 15:03:26|1       |
|1        |2022-11-01 15:02:53|1       |
|2        |2022-11-01 15:02:53|1       |
|1        |2022-11-01 15:02:20|1       |
|2        |2022-11-01 15:02:20|1       |
|1        |2022-11-01 15:01:46|1       |
|2        |2022-11-01 15:01:46|1       |
|2        |2022-11-01 15:01:13|1       |
|1        |2022-11-01 15:01:13|1       |
|2        |2022-11-01 15:00:40|1       |
|1        |2022-11-01 15:00:40|1       |
|2        |2022-11-01 15:00:07|1       |
|1        |2022-11-01 15:00:07|1       |
|2        |2022-11-01 14:59:34|1       |
|1        |2022-11-01 14:59:34|1       |
|2        |2022-11-01 14:59:01|1       |
|1        |2022-11-01 14:59:01|1       |
+---------+-------------------+--------+
only showing top

also notice that the state no longer grows

In [46]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3TC.lastProgress, indent=4))
    print(e3TC.status)
    time.sleep(1)
    clear_output(wait=True)

{
    "id": "803e7cd9-c57d-4cca-98ab-515cd3c6caf0",
    "runId": "f97bff0f-20ac-4823-8d90-312fc55d83fb",
    "name": "sinkTable",
    "timestamp": "2022-11-01T15:05:21.022Z",
    "batchId": 4,
    "numInputRows": 8,
    "inputRowsPerSecond": 0.6559527714004592,
    "processedRowsPerSecond": 0.6537015852263443,
    "durationMs": {
        "addBatch": 12071,
        "getBatch": 0,
        "latestOffset": 3,
        "queryPlanning": 121,
        "triggerExecution": 12238,
        "walCommit": 20
    },
    "eventTime": {
        "watermark": "2022-11-01T15:03:55.000Z"
    },
    "stateOperators": [
        {
            "numRowsTotal": 12,
            "numRowsUpdated": 0,
            "memoryUsedBytes": 331072,
            "customMetrics": {
                "loadedMapCacheHitCount": 1600,
                "loadedMapCacheMissCount": 0,
                "stateOnCurrentVersionSizeBytes": 38400
            }
        }
    ],
    "sources": [
        {
            "description": "KafkaV2[Subscrib

KeyboardInterrupt: 

In [53]:
e3TC.stop()

**How much should the watermark and the time constraint be?**

|watermark | time constraint | number of results per arm |reason                                         |
|----------|-----------------|-------------------|-----------------------------------------------|
|  any     |           <=10  |         0         | for each arm, there is no placing within less than 10 sec to a moving |   
|  any     |    >10 & < 14   |         1 or 0    | for one of the arms, there is 1 placing within 10-14 sec to a moving        |    
|  any     |    >14 & < 44   |         1         | for each arm, there is 1 placing within 10-43 sec to a moving                 
|  any     |    >=44 & <48   |         1 or 2    | for one of the arms, there are 2 placing within 44-47 sec to a moving                |   
|  any     |          >=48   |         2+        | for each of the arm, there are more than 2 placing within 48 or more sec to a moving                     |   

The watermark does not influence the answer because, in this case, the data arrive in order and without any delay, **However it is important to clean the state**.

### E3.2

> A continuous query that emits the events between a **pick (status==goodGrasped)** and a place (status==placingGood)

NOTE: I'm adding one more type of event, good grasped that should appear before any moving

In [47]:
grapsedW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='goodGrasped'")
                .withColumnRenamed("id","idGrasped")
                .withColumnRenamed("ts","tsGrasped")
               )

movingW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placingW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

In [48]:
joinGM_sdf = (grapsedW_sdf.join(
  movingW_sdf, expr("""
    (idGrasped == idMoving) AND
    (tsMoving > tsGrasped ) AND
    (tsMoving < tsGrasped + interval 3 seconds )""" # TIME CONSTRAIN ADDED HERE (considering also that the time flows at half of the speed) !!!
    )))

In [49]:
joinGM_sdf.printSchema()

root
 |-- idGrasped: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- tsGrasped: timestamp (nullable = true)
 |-- idMoving: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- tsMoving: timestamp (nullable = true)



In [50]:
placingW_sdf.printSchema()

root
 |-- idPlacing: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- tsPlacing: timestamp (nullable = true)



In [51]:
joinGMP_sdf = (joinGM_sdf.join(
  placingW_sdf, expr("""
    (idMoving == idPlacing) AND
    (tsPlacing > tsMoving ) AND
    (tsPlacing < tsMoving + interval 14 seconds )""" # TIME CONSTRAIN ADDED HERE (considering also that the time flows at half of the speed) !!!
    )))

In [54]:
e3_2 = (joinGMP_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [63]:
spark.sql("SELECT * FROM sinkTable ORDER BY tsGrasped DESC").show(20,False) # note, I change ts in tsTemp

+---------+-----------+-----------+-------------------+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
|idGrasped|status     |stressLevel|tsGrasped          |idMoving|status    |stressLevel|tsMoving           |idPlacing|status     |stressLevel|tsPlacing          |
+---------+-----------+-----------+-------------------+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
|2        |goodGrasped|5          |2022-11-01 15:06:01|2       |movingGood|9          |2022-11-01 15:06:02|2        |placingGood|3          |2022-11-01 15:06:12|
|1        |goodGrasped|1          |2022-11-01 15:05:57|1       |movingGood|7          |2022-11-01 15:05:59|1        |placingGood|3          |2022-11-01 15:06:12|
|2        |goodGrasped|5          |2022-11-01 15:05:27|2       |movingGood|9          |2022-11-01 15:05:29|2        |placingGood|3          |2022-11-01 15:05:39|
|2        |goodGrasped|5    

In [64]:
e3_2.stop()

### E3.3

> A continuous query that emits **the average stress level** between a pick (status==goodGrasped) and a place (status==placingGood)

NOTE: the original question

In [65]:
grapsedFinal_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='goodGrasped'")
                .withColumnRenamed("stressLevel","stressLevelGrasped")
                .withColumnRenamed("id","idGrasped")
                .withColumnRenamed("ts","tsGrasped")
               )

movingFinal_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='movingGood'")
                .withColumnRenamed("stressLevel","stressLevelMoving")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placingFinal_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='placingGood'")
                .withColumnRenamed("stressLevel","stressLevelPlacing")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

In [66]:
placingFinal_sdf.printSchema()

root
 |-- idPlacing: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelPlacing: integer (nullable = true)
 |-- tsPlacing: timestamp (nullable = true)



In [67]:
joinFinal_sdf = (grapsedFinal_sdf.join(
  movingFinal_sdf, expr("""
    (idGrasped == idMoving) AND
    (tsMoving > tsGrasped ) AND
    (tsMoving < tsGrasped + interval 3 seconds )""" 
    )).join(
  placingFinal_sdf, expr("""
    (idMoving == idPlacing) AND
    (tsPlacing > tsMoving ) AND
    (tsPlacing < tsMoving + interval 14 seconds )"""
    ))
)

In [68]:
joinFinal_sdf.printSchema()

root
 |-- idGrasped: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelGrasped: integer (nullable = true)
 |-- tsGrasped: timestamp (nullable = true)
 |-- idMoving: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelMoving: integer (nullable = true)
 |-- tsMoving: timestamp (nullable = true)
 |-- idPlacing: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelPlacing: integer (nullable = true)
 |-- tsPlacing: timestamp (nullable = true)



In [69]:
final_sdf = joinFinal_sdf.select(col("idGrasped").alias("id"), expr("(stressLevelGrasped+stressLevelMoving+stressLevelPlacing)/3 AS AVG_stressLevel"))

In [70]:
e3_3 = (final_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [78]:
spark.sql("SELECT * FROM sinkTable").show(20,False) # note, I change ts in tsTemp

+---+------------------+
|id |AVG_stressLevel   |
+---+------------------+
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|2  |5.666666666666667 |
|2  |5.666666666666667 |
+---+------------------+
only showing top 20 rows



In [79]:
e3_3.stop()

## Think out of the box!!!

**Is the complexity of temporal joins acceptable? Is there any other solution that does not require them?**

In many cases, query answering is hard because the datamodel is **over simplified**.

We may go back to E1 problem and propose to change the model so to eliminate the need for a temporal join. A sequential number for the cycles of each arm would be enough to make the join deterministic. See [datagen2.ipynb](datagen1.ipynb) for the changes in the data generator.

In [95]:
roboticArmV2_schema = StructType([             ## <-- CHANGE HERE new name for the schema
    StructField("id", StringType(), True),
    StructField("cycle", IntegerType(), True), ## <-- CHANGE HERE new field 
    StructField("status", StringType(), True),
    StructField("stressLevel", IntegerType(), True),
    StructField("ts", TimestampType(), True)])

raw_roboticArmV2_df = (spark                   ## <-- CHANGE HERE new name for the df
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", "RoboticArmV2") ## <-- CHANGE HERE different topic
  .load())

roboticArmV2_sdf = (raw_roboticArmV2_df      ## <-- CHANGE HERE new name sdf
                      .select(from_json(col("value").cast("string"), roboticArmV2_schema).alias("value")) ## <-- CHANGE HERE new schema
                      .select("value.*"))

roboticArmV2_sdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- cycle: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- ts: timestamp (nullable = true)



In [98]:
grapsedFinalCyc_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute") 
                .where("status='goodGrasped'")
                .withColumnRenamed("stressLevel","stressLevelGrasped")
                .withColumnRenamed("id","idGrasped")
                .withColumnRenamed("cycle","cycleGrasped")
               )

movingFinalCyc_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute") 
                .where("status='movingGood'")
                .withColumnRenamed("stressLevel","stressLevelMoving")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("cycle","cycleMoving")
               )

placingFinalCyc_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute") 
                .where("status='placingGood'")
                .withColumnRenamed("stressLevel","stressLevelPlacing")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("cycle","cyclePlacing")
               )

In [99]:
joinFinalCyc_sdf = (grapsedFinalCyc_sdf.join(
  movingFinalCyc_sdf, expr("""
    (idGrasped == idMoving) AND
    (cycleGrasped == cycleMoving )""" ## <- CHANGE HERE  
    )).join(
  placingFinalCyc_sdf, expr("""
    (idMoving == idPlacing) AND
    (cycleMoving == cyclePlacing)""" ## <- CHANGE HERE 
    ))
)

In [None]:
joinV2_sdf = (movingV2_sdf.join(
  placingV2_sdf, expr("""
    (idMoving == idPlacing) AND
    (cyclePlacing == cycleMoving )""" ## <- CHANGE HERE 
    )))

In [100]:
finalCyc_sdf = joinFinalCyc_sdf.select(col("idGrasped").alias("id"), expr("(stressLevelGrasped+stressLevelMoving+stressLevelPlacing)/3 AS AVG_stressLevel"))

In [104]:
e3V2 = (finalCyc_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [106]:
spark.sql("SELECT * FROM sinkTable").show(20,False) # note, I change ts in tsTemp

+---+------------------+
|id |AVG_stressLevel   |
+---+------------------+
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|2  |5.666666666666667 |
|2  |5.666666666666667 |
|2  |5.666666666666667 |
|1  |3.6666666666666665|
|2  |5.666666666666667 |
+---+------------------+
only showing top 20 rows



**Much easier** !!

**REMEMBER**: modeling and querying are *two sides of the same coin*

We are not in a traditional RDBMS where modeling is done once for all by the DB administrator and queries must conform to "the model". We are in a setting where performance matters more than governance and chaning the model is often the only way to keep good performances.

In [107]:
e3V2.stop()

# E4

>A continuous query that returns the robotic arms that,
>
> * in less than 20 second (was 10 in EPL, but here the time passes at half of the speed),
> * picked a good while safely operating,
> * moved it while the controller was raising a warning, and
> * placed it while safely operating again.


In [108]:
goodGraspedSafely_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='goodGrasped' AND stressLevel < 7")
                .withColumnRenamed("id","idGrasped")
                .withColumnRenamed("cycle","cycleGrasped")
                .withColumnRenamed("stressLevel","stressLevelGrasped")
                .withColumnRenamed("ts","tsGrasped")
               )

movingWarning_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='movingGood' AND stressLevel > 6 AND stressLevel < 9")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("cycle","cycleMoving")
                .withColumnRenamed("stressLevel","stressLevelMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placingSafely_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='placingGood' AND stressLevel < 7")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("cycle","cyclePlacing")
                .withColumnRenamed("stressLevel","stressLevelPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

join1_sdf = (goodGraspedSafely_sdf.join(
    movingWarning_sdf, expr("""
    (idGrasped == idMoving) AND
    (cycleGrasped == cycleMoving)""" 
    )))

join2_sdf = (join1_sdf.join(
  placingSafely_sdf, expr("""
    (idMoving == idPlacing) AND
    (cyclePlacing == cycleMoving )""" 
    )))

within20sec = join2_sdf.where("tsPlacing <= tsGrasped + interval 20 seconds")

e4 = (within20sec
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [113]:
spark.sql("SELECT idGrasped AS ID, cyclePlacing AS cycle FROM sinkTable ORDER BY cyclePlacing DESC").show(5,False) 

+---+-----+
|ID |cycle|
+---+-----+
|1  |24   |
|1  |23   |
|1  |22   |
|1  |21   |
|1  |20   |
+---+-----+
only showing top 5 rows



indeed only the arm whose ID is 1

In [114]:
e4.stop()

# E5

> A continuous query that monitors the results of the previous one (i.e., E4) and counts how many times each robotic arm is present in the stream over a window of 20 seconds updating the counting every 4 seconds.

In [139]:
within20sec.printSchema()

root
 |-- idGrasped: string (nullable = true)
 |-- cycleGrasped: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelGrasped: integer (nullable = true)
 |-- tsGrasped: timestamp (nullable = true)
 |-- idMoving: string (nullable = true)
 |-- cycleMoving: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelMoving: integer (nullable = true)
 |-- tsMoving: timestamp (nullable = true)
 |-- idPlacing: string (nullable = true)
 |-- cyclePlacing: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevelPlacing: integer (nullable = true)
 |-- tsPlacing: timestamp (nullable = true)



In [132]:
e5 = (within20sec
                     .withWatermark("tsPlacing", "1 minutes")
                     .groupBy(window("tsPlacing", "40 seconds", "8 seconds"),"idGrasped")
                     .count()
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable") 
                     .start())

In [141]:
spark.sql("SELECT * FROM sinkTable ORDER BY window DESC").show(10,False)

+------------------------------------------+---------+-----+
|window                                    |idGrasped|count|
+------------------------------------------+---------+-----+
|[2021-10-28 14:00:40, 2021-10-28 14:01:20]|1        |1    |
|[2021-10-28 14:00:32, 2021-10-28 14:01:12]|1        |1    |
|[2021-10-28 14:00:24, 2021-10-28 14:01:04]|1        |2    |
|[2021-10-28 14:00:16, 2021-10-28 14:00:56]|1        |1    |
|[2021-10-28 14:00:08, 2021-10-28 14:00:48]|1        |1    |
|[2021-10-28 14:00:00, 2021-10-28 14:00:40]|1        |1    |
|[2021-10-28 13:59:52, 2021-10-28 14:00:32]|1        |2    |
|[2021-10-28 13:59:44, 2021-10-28 14:00:24]|1        |1    |
|[2021-10-28 13:59:36, 2021-10-28 14:00:16]|1        |1    |
|[2021-10-28 13:59:28, 2021-10-28 14:00:08]|1        |1    |
+------------------------------------------+---------+-----+
only showing top 10 rows



In [142]:
e5.stop()