# Spark Structured Streaming - Demo
## Fire alarm

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import io
from pyspark.sql.functions import *
import time
import json
import struct
import requests 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka-clients:2.6.0 pyspark-shell'
                                    
spark = (SparkSession.builder 
    .master("local[*]")
    .appName("test")
    .getOrCreate()
        )

spark

set up the environment variables

In [2]:
smoke_topic = 'SmokeSensorEvent'
temperature_topic = 'TemperatureSensorEvent'
servers = "kafka:9092"

## Understanding spark-kafka integration
Let's treat first kafka as a bulk source

In [3]:
smoke_df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", smoke_topic)
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load())

In [4]:
smoke_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
smoke_df.show(5)

+-------+--------------------+----------------+---------+------+--------------------+-------------+
|    key|               value|           topic|partition|offset|           timestamp|timestampType|
+-------+--------------------+----------------+---------+------+--------------------+-------------+
|[53 31]|[7B 22 73 65 6E 7...|SmokeSensorEvent|        0|     0|2021-10-18 10:32:...|            0|
|[53 31]|[7B 22 73 65 6E 7...|SmokeSensorEvent|        0|     1|2021-10-18 10:32:...|            0|
|[53 31]|[7B 22 73 65 6E 7...|SmokeSensorEvent|        0|     2|2021-10-18 10:32:...|            0|
|[53 31]|[7B 22 73 65 6E 7...|SmokeSensorEvent|        0|     3|2021-10-18 10:32:...|            0|
|[53 31]|[7B 22 73 65 6E 7...|SmokeSensorEvent|        0|     4|2021-10-18 10:33:...|            0|
+-------+--------------------+----------------+---------+------+--------------------+-------------+
only showing top 5 rows



In [5]:
stringified_smoke_df = smoke_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
stringified_smoke_df.show(5,False)

+---+--------------------------------------------------+
|key|value                                             |
+---+--------------------------------------------------+
|S1 |{"sensor": "S1", "smoke": false, "ts": 1666243707}|
|S1 |{"sensor": "S1", "smoke": false, "ts": 1666243718}|
|S1 |{"sensor": "S1", "smoke": false, "ts": 1666243728}|
|S1 |{"sensor": "S1", "smoke": false, "ts": 1666243738}|
|S1 |{"sensor": "S1", "smoke": false, "ts": 1666243748}|
+---+--------------------------------------------------+
only showing top 5 rows



In [6]:
stringified_smoke_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [7]:
from pyspark.sql.types import *

smoke_schema = StructType([
    StructField("sensor", StringType(), True),
    StructField("smoke", BooleanType(), True),
    StructField("ts", TimestampType(), True)])

In [8]:
smoke_df = stringified_smoke_df.select(col("key").cast("string"),from_json(col("value"), smoke_schema).alias("value"))

In [9]:
smoke_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- sensor: string (nullable = true)
 |    |-- smoke: boolean (nullable = true)
 |    |-- ts: timestamp (nullable = true)



In [10]:
smoke_df.select("value.*").show(5)

+------+-----+-------------------+
|sensor|smoke|                 ts|
+------+-----+-------------------+
|    S1|false|2022-10-20 05:28:27|
|    S1|false|2022-10-20 05:28:38|
|    S1|false|2022-10-20 05:28:48|
|    S1|false|2022-10-20 05:28:58|
|    S1|false|2022-10-20 05:29:08|
+------+-----+-------------------+
only showing top 5 rows



## Let's explore Spark Structured Streaming by example
Please refer to [EPL fire allarm](https://github.com/emanueledellavalle/streaming-data-analytics/tree/main/codes/epl_firealarm) for the EPL version of the following queries.

### Let's create the streaming Data Frames using the data in the kafka smoke topic

In [11]:
raw_streaming_smoke_df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", smoke_topic)
  .load())

In [12]:
raw_streaming_smoke_df.isStreaming

True

In [13]:
raw_streaming_smoke_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [14]:
smoke_sdf=(raw_streaming_smoke_df
                      .select(from_json(col("value").cast("string"), smoke_schema).alias("value"))
                      .select("value.*"))

In [15]:
smoke_sdf.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- smoke: boolean (nullable = true)
 |-- ts: timestamp (nullable = true)



**NOTE**: it is not a DataFrame, you cannot directly execute an action on it. 

**The following cell *intetionally* gives an error**.

In [16]:
smoke_sdf.count()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka

Queries with streaming sources must be registred and started with `writeStream.start()`

### Let's register and start a simple query

In [17]:
basic_query = (smoke_sdf
    .writeStream
    .format("memory") # this is for debug purpose only! DO NOT USE IN PRODUCTION
    .queryName("sinkTable")
    .start())

In [18]:
basic_query.lastProgress

{'id': '18d3f6e7-e158-425e-b47f-3a58efe35160',
 'runId': '6c02958d-8dd8-47a7-a87c-2bf5ebf96aee',
 'name': 'sinkTable',
 'timestamp': '2022-10-20T05:49:51.620Z',
 'batchId': 0,
 'numInputRows': 109,
 'processedRowsPerSecond': 118.73638344226579,
 'durationMs': {'addBatch': 332,
  'getBatch': 27,
  'latestOffset': 285,
  'queryPlanning': 98,
  'triggerExecution': 916,
  'walCommit': 90},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[SmokeSensorEvent]]',
   'startOffset': None,
   'endOffset': {'SmokeSensorEvent': {'0': 109}},
   'numInputRows': 109,
   'processedRowsPerSecond': 118.73638344226579}],
 'sink': {'description': 'MemorySink', 'numOutputRows': 109}}

run the following cell to see the most recent content of the sinkTable

In [19]:
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+------+-----+-------------------+
|sensor|smoke|                 ts|
+------+-----+-------------------+
|    S1|false|2022-10-20 05:49:58|
|    S1|false|2022-10-20 05:49:48|
|    S1|false|2022-10-20 05:49:38|
|    S1|false|2022-10-20 05:49:28|
|    S1|false|2022-10-20 05:49:18|
+------+-----+-------------------+
only showing top 5 rows



do not forget to stop queries that you are not using

In [20]:
basic_query.stop()

### Let's create the streaming Data Frames for the kafka temperature topic

In [21]:
temperarture_schema = StructType([
    StructField("sensor", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("ts", TimestampType(), True)])

raw_streaming_temperature_df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", temperature_topic)
  .load())

temperature_sdf = (raw_streaming_temperature_df
                      .select(from_json(col("value").cast("string"), temperarture_schema).alias("value"))
                      .select("value.*"))

In [22]:
temperature_sdf.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- temperature: double (nullable = true)
 |-- ts: timestamp (nullable = true)



## Q0 - Filter

The temperature events whose temperature is greater than 50 °C 

### the SQL style

In [23]:
# create a logic table on top of the streaming data frame
temperature_sdf.createTempView("TemperatureSensorEvent")

# write your query in SQL, register it and start it
q0 = (spark.sql("select * from TemperatureSensorEvent where temperature > 50")
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

let's ask for the execution plan, we will compare it with cells down with the one of the query in Data Frame style

In [24]:
q0.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@69e2fe66
+- Project [from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).sensor AS sensor#224, from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).temperature AS temperature#225, from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).ts AS ts#226]
   +- Filter (from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).temperature > 50.0)
      +- *(1) Project [key#208, value#209, topic#210, partition#211, offset#212L, timestamp#213, timesta

In [25]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+------+-----------+---+
|sensor|temperature| ts|
+------+-----------+---+
+------+-----------+---+



if you are following carefully the instruction it should be empty because we are sending temperature aropund 20 °C.

Go back to the `temperature_sensor_simulator` notebook, stop the cell that is sending temperature around 20°C and run the one that sends temperature around 55°C

In [26]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+------+------------------+-------------------+
|sensor|       temperature|                 ts|
+------+------------------+-------------------+
|    S1| 55.99165591049667|2022-10-20 05:57:47|
|    S1|57.079930631493596|2022-10-20 05:57:37|
|    S1| 55.30969832755271|2022-10-20 05:57:27|
+------+------------------+-------------------+



Now you should see results.

In [27]:
# clean up
q0.stop()
spark.catalog.dropTempView("TemperatureSensorEvent")

### The DataFrame style

In [28]:
q0bis = (temperature_sdf
                     .where("temperature > 20") # you can add anything that fits in a SQL where statemente 
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

let's ask for the explanation of the plan. Comparing with the one of the SQL style, you can see that there is no difference. This is expected because the [catalyst optimizer](https://databricks.com/glossary/catalyst-optimizer) created it out of our declarations (which are semantically equivalent)

In [29]:
q0bis.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@35f37606
+- Project [from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).sensor AS sensor#224, from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).temperature AS temperature#225, from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).ts AS ts#226]
   +- Filter (from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)).temperature > 20.0)
      +- *(1) Project [key#208, value#209, topic#210, partition#211, offset#212L, timestamp#213, timesta

In [30]:
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+------+------------------+-------------------+
|sensor|       temperature|                 ts|
+------+------------------+-------------------+
|    S1| 56.17634911609217|2022-10-20 05:58:27|
|    S1|56.453865090623694|2022-10-20 05:58:17|
|    S1|57.018290678811624|2022-10-20 05:58:07|
|    S1| 54.45974640232827|2022-10-20 05:57:57|
|    S1| 55.99165591049667|2022-10-20 05:57:47|
+------+------------------+-------------------+
only showing top 5 rows



In [31]:
q0bis.stop()

> NOTE: there was no need to
> * create a logic table on top of the streaming data frame with `temperature_sdf.createTempView("TemperatureSensorEvent")`
> * drop such a logic table with `spark.catalog.dropTempView("TemperatureSensorEvent")`

## Q1 - Avg

the average of all the temperature observation for each sensor up to the last event received

### the SQL sytyle

In [32]:
# create a logic table on top of the streaming data frame
temperature_sdf.createTempView("TemperatureSensorEvent") # this time we will not clean it up, because we use it in the next queries

**NOTE**: the following query gives *intentionally* an error

In [33]:
query_string = """
SELECT SENSOR, AVG(temperature) 
FROM TemperatureSensorEvent
GROUP BY SENSOR
"""

# write your query in SQL, register it and start it
q1 = (spark.sql(query_string)
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [SENSOR#224], [SENSOR#224, avg(temperature#225) AS avg(temperature)#411]
+- SubqueryAlias temperaturesensorevent
   +- Project [value#222.sensor AS sensor#224, value#222.temperature AS temperature#225, value#222.ts AS ts#226]
      +- Project [from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)) AS value#222]
         +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@339ae263, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@4f7f3119, org.apache.spark.sql.util.CaseInsensitiveStringMap@24f684c4, [key#208, value#209, topic#210, partition#211, offset#212L, timestamp#213, timestampType#214], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3076ecbf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> TemperatureSensorEvent, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#201, value#202, topic#203, partition#204, offset#205L, timestamp#206, timestampType#207]


The **append output mode** (i.e., the default one) is not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark, we need to use the **complete output mode**.

In [34]:
query_string = """
SELECT SENSOR, AVG(temperature) 
FROM TemperatureSensorEvent
GROUP BY SENSOR
"""

# write your query in SQL, register it and start it
q1 = (spark.sql(query_string)
                     .writeStream
                     .format("memory")
                     .outputMode("complete") # <-- CHANGE HERE
                     .queryName("sinkTable")
                     .start())

In [42]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show() # without ORDER BY TS DESC because the result in the table is already only the most recent

+------+------------------+
|SENSOR|  avg(temperature)|
+------+------------------+
|    S1|28.591171554323132|
+------+------------------+



**NOTE**: if the cell above gives an empty result, wait 10 seconds and run it again. The very first excution may take time, expecially if you have already ingested many temperature events in kafka. Here we are querying the sink table and it may be empty because the first execution is still running.

In [43]:
# clean up
q1.stop()

### The DataFrame style

In [49]:
# write your query in SQL, register it and start it
q1bis = (temperature_sdf 
                     .groupBy("sensor")
                     .avg()
                     .writeStream
                     .format("memory")
                     .outputMode("complete") 
                     .queryName("sinkTable")
                     .start())

In [51]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show() # woithout ORDER BY TS DESC because the result in the table is already only the most recent

+------+-----------------+
|sensor| avg(temperature)|
+------+-----------------+
|    S1|31.26328820179674|
+------+-----------------+



In [52]:
# clean up
q1bis.stop()

## Q2 - Logical Sliding Window

The average temperature observed by each sensor in the last 4 seconds

MEMO: the average should change as soon as the receive a new event

**Not supported**

## Q3 - Logical Tumbling Window

The average temperature of the last 30 seconds every 30 seconds (was 4 seconds in EPL)

NOTE: this query is not possibile in the SQL style

In [53]:
q3 = (temperature_sdf
                  .groupBy(window("TS", "30 seconds"),"SENSOR")
                  .avg("TEMPERATURE")
                  .writeStream
                  .outputMode("complete")
                  .format("memory")
                  .queryName("sinkTable")
                  .start())

In [54]:
spark.sql("SELECT * FROM sinkTable ORDER BY window DESC").show(5,False) # NOTE: here we order by window instead of ordering by timestamp# window instead of timestamp, again

+------------------------------------------+------+-----------------+
|window                                    |SENSOR|avg(TEMPERATURE) |
+------------------------------------------+------+-----------------+
|[2022-10-20 06:18:30, 2022-10-20 06:19:00]|S1    |54.09425983476128|
|[2022-10-20 06:18:00, 2022-10-20 06:18:30]|S1    |53.56637931573874|
|[2022-10-20 06:17:30, 2022-10-20 06:18:00]|S1    |55.77247620392745|
|[2022-10-20 06:17:00, 2022-10-20 06:17:30]|S1    |55.35344589668589|
|[2022-10-20 06:16:30, 2022-10-20 06:17:00]|S1    |55.11347815270373|
+------------------------------------------+------+-----------------+
only showing top 5 rows



In [55]:
q3.stop()

## Q4 - Physical Sliding Window

The moving average of the last 4 temperature events

**Not supported**

## Q5 - Physical Tumbling Window

The moving average of the last 4 temperature events every 4 events 

**Not supported**

## Q6 - Logical Hopping Window

The average temperature of the last 1 minute (was 4 seconds in EPL) every 30 seconds (was 2 seconds in EPL)

In [62]:
q6 = (temperature_sdf
      .groupBy(window("TS", "1 minutes", "30 seconds"),"SENSOR")
      .avg("TEMPERATURE")
      .writeStream
      .outputMode("complete")
      .format("memory")
      .queryName("sinkTable")
      .start())

In [64]:
spark.sql("SELECT * FROM sinkTable ORDER BY window DESC").show(6,False) # NOTE: here we order by window instead of ordering by timestamp

+------------------------------------------+------+------------------+
|window                                    |SENSOR|avg(TEMPERATURE)  |
+------------------------------------------+------+------------------+
|[2022-10-20 06:20:30, 2022-10-20 06:21:30]|S1    |55.44172805944555 |
|[2022-10-20 06:20:00, 2022-10-20 06:21:00]|S1    |55.02271758350366 |
|[2022-10-20 06:19:30, 2022-10-20 06:20:30]|S1    |54.9370860066254  |
|[2022-10-20 06:19:00, 2022-10-20 06:20:00]|S1    |55.23212543098216 |
|[2022-10-20 06:18:30, 2022-10-20 06:19:30]|S1    |55.08145722444231 |
|[2022-10-20 06:18:00, 2022-10-20 06:19:00]|S1    |54.267753904174036|
+------------------------------------------+------+------------------+
only showing top 6 rows



In [65]:
q6.stop()

## Q7 - Stream-to-Stream Join

In EPL, at this point we moved on to the pattern matching part required to satisfy the information need, i.e., "find every smoke event followed by a temperature event whose temperature is above 50 °C within 2 minutes."

Spark Structured Streaming does not support the EPL's operator `->` (that reads as *followed by*. We need to use a stream-to-stream join.

Let's apply the watermarks on event-time columns and the other two filters.

In [66]:
last_minute_smoke_events = (smoke_sdf
                #.withWatermark("ts", "1 minute")
                .where("smoke = True")
                .withColumnRenamed("sensor","sensorSmoke")
                .withColumnRenamed("ts","tsSmoke")
               )

last_minute_high_temperature_events = (temperature_sdf
                #.withWatermark("ts", "1 minute")
                .where("temperature > 50")
                .withColumnRenamed("sensor","sensorTemp")
                .withColumnRenamed("ts","tsTemp")
               )

Join with event-time constraints

In [67]:
join_sdf = (last_minute_smoke_events.join(
  last_minute_high_temperature_events, expr("""
    (sensorTemp == sensorSmoke) AND
    (tsTemp > tsSmoke ) AND
    (tsTemp < tsSmoke + interval 2 minute )
    """
    )))

In [68]:
q7 = (join_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

**IMPORTANT** To detect fire, run the appropriate cells in the data generators.

In [76]:
spark.sql("SELECT * FROM sinkTable ORDER BY tsTemp DESC").show(20,False) # note, I change ts in tsTemp

+-----------+-----+-------+----------+-----------+------+
|sensorSmoke|smoke|tsSmoke|sensorTemp|temperature|tsTemp|
+-----------+-----+-------+----------+-----------+------+
+-----------+-----+-------+----------+-----------+------+



If you are following carefully the instructions the answer should be empty because we are sending temperature around 55°C, but there is no `smoke==true`, yet.

Go to the `smoke_sensor_simulator` notebook and start sending `smoke==true`.

In [77]:
spark.sql("SELECT * FROM sinkTable ORDER BY tsTemp DESC").show(20,False) # note, I change ts in tsTemp

+-----------+-----+-------------------+----------+------------------+-------------------+
|sensorSmoke|smoke|tsSmoke            |sensorTemp|temperature       |tsTemp             |
+-----------+-----+-------------------+----------+------------------+-------------------+
|S1         |true |2022-10-20 06:26:17|S1        |54.083253545588214|2022-10-20 06:26:18|
+-----------+-----+-------------------+----------+------------------+-------------------+



let's have a look to the progresses

In [78]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(q7.lastProgress, indent=4))
    print(q7.status)
    time.sleep(1)
    clear_output(wait=True)
    

{
    "id": "8b7622fe-4435-4cd7-bb0a-1a9610bb5fc1",
    "runId": "ca0d7618-d082-436d-acf1-17dcda27aab8",
    "name": "sinkTable",
    "timestamp": "2022-10-20T06:28:38.003Z",
    "batchId": 34,
    "numInputRows": 3,
    "inputRowsPerSecond": 0.2812675792237015,
    "processedRowsPerSecond": 0.27644673792849245,
    "durationMs": {
        "addBatch": 10731,
        "getBatch": 1,
        "latestOffset": 2,
        "queryPlanning": 76,
        "triggerExecution": 10852,
        "walCommit": 18
    },
    "stateOperators": [
        {
            "numRowsTotal": 148,
            "numRowsUpdated": 3,
            "memoryUsedBytes": 359136,
            "customMetrics": {
                "loadedMapCacheHitCount": 13600,
                "loadedMapCacheMissCount": 0,
                "stateOnCurrentVersionSizeBytes": 67280
            }
        }
    ],
    "sources": [
        {
            "description": "KafkaV2[Subscribe[SmokeSensorEvent]]",
            "startOffset": {
                "Sm

KeyboardInterrupt: 

to interrupt the execution of the cell, prese the square icon in the bar or choose *interrupt kernel* from the *kernel* dropdown menu

#### Discussion

> This query is equivalent to the EPL pattern `every a = SmokeSensorEvent(smoke=true) -> every TemperatureSensorEvent(temperature > 50, sensor=a.sensor) where timer:within(1 min)`. 
>
> Do not expect the same performances! It is evaluated as a relational join. Spark Structured Streaming lacks the specilized data structure of Esper.
>
> **It does not tame the torrent effect**, but this is expected! 
>
> Spark Structured Streaming is a Data Stream Management System meant to tame *flow that you cannot stop*

Even id Q8 consumes Q7 results, we can stop Q7 because we only need the streaming Data Frame `join_sdf`. We do not need Q7 to write its result in the in memory table.

In [79]:
q7.stop()

## Q8 - Count FireEvent

we are very close to the solution of the running example, we "just" need to count the number of events generated by the previous query over an hopping window of 1 minutes that slides every 30 seconds (was a sliding window of 10 secondsin EPL). 

So let's count the results of Q7. 

**NOTE**: the following queries give *intentionally* errors

In [82]:
q8 = (join_sdf
            .groupBy(window("tsTemp", "1 minutes", "30 seconds"),"sensorTemp")
            .count()
            .writeStream
            .outputMode("complete")
            .format("memory")
            .queryName("sinkTable") 
            .start())

AnalysisException: Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;;
Join Inner, (((sensorTemp#77397 = sensorSmoke#77389) AND (tsTemp#77401 > tsSmoke#77393)) AND (tsTemp#77401 < cast(tsSmoke#77393 + 2 minutes as timestamp)))
:- Project [sensorSmoke#77389, smoke#118, ts#119 AS tsSmoke#77393]
:  +- Project [sensor#117 AS sensorSmoke#77389, smoke#118, ts#119]
:     +- Filter (smoke#118 = true)
:        +- Project [value#115.sensor AS sensor#117, value#115.smoke AS smoke#118, value#115.ts AS ts#119]
:           +- Project [from_json(StructField(sensor,StringType,true), StructField(smoke,BooleanType,true), StructField(ts,TimestampType,true), cast(value#102 as string), Some(Etc/UTC)) AS value#115]
:              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49b55271, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@39763e6e, org.apache.spark.sql.util.CaseInsensitiveStringMap@7098eff9, [key#101, value#102, topic#103, partition#104, offset#105L, timestamp#106, timestampType#107], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3076ecbf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> SmokeSensorEvent, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#94, value#95, topic#96, partition#97, offset#98L, timestamp#99, timestampType#100]
+- Project [sensorTemp#77397, temperature#225, ts#226 AS tsTemp#77401]
   +- Project [sensor#224 AS sensorTemp#77397, temperature#225, ts#226]
      +- Filter (temperature#225 > cast(50 as double))
         +- Project [value#222.sensor AS sensor#224, value#222.temperature AS temperature#225, value#222.ts AS ts#226]
            +- Project [from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)) AS value#222]
               +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@339ae263, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@4f7f3119, org.apache.spark.sql.util.CaseInsensitiveStringMap@24f684c4, [key#208, value#209, topic#210, partition#211, offset#212L, timestamp#213, timestampType#214], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3076ecbf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> TemperatureSensorEvent, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#201, value#202, topic#203, partition#204, offset#205L, timestamp#206, timestampType#207]


Indeed, join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode. So let's try to use append mode.

In [84]:
q8 = (join_sdf
            .groupBy(window("tsTemp", "1 minutes", "30 seconds"),"sensorTemp")
            .count()
            .writeStream
            .outputMode("append") # <-- CHANGE HERE
            .format("memory")
            .queryName("sinkTable") 
            .start())

AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [window#308484, sensorTemp#77397], [window#308484 AS window#308475, sensorTemp#77397, count(1) AS count#308483L]
+- Filter ((tsTemp#77401 >= window#308484.start) AND (tsTemp#77401 < window#308484.end))
   +- Expand [ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) as double) = (cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) THEN (CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 30000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) as double) = (cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) THEN (CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 30000000) + 0) + 60000000), LongType, TimestampType)), sensorSmoke#77389, smoke#118, tsSmoke#77393, sensorTemp#77397, temperature#225, tsTemp#77401), ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) as double) = (cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) THEN (CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 30000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) as double) = (cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) THEN (CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(tsTemp#77401, TimestampType, LongType) - 0) as double) / cast(30000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 30000000) + 0) + 60000000), LongType, TimestampType)), sensorSmoke#77389, smoke#118, tsSmoke#77393, sensorTemp#77397, temperature#225, tsTemp#77401)], [window#308484, sensorSmoke#77389, smoke#118, tsSmoke#77393, sensorTemp#77397, temperature#225, tsTemp#77401]
      +- Join Inner, (((sensorTemp#77397 = sensorSmoke#77389) AND (tsTemp#77401 > tsSmoke#77393)) AND (tsTemp#77401 < cast(tsSmoke#77393 + 2 minutes as timestamp)))
         :- Project [sensorSmoke#77389, smoke#118, ts#119 AS tsSmoke#77393]
         :  +- Project [sensor#117 AS sensorSmoke#77389, smoke#118, ts#119]
         :     +- Filter (smoke#118 = true)
         :        +- Project [value#115.sensor AS sensor#117, value#115.smoke AS smoke#118, value#115.ts AS ts#119]
         :           +- Project [from_json(StructField(sensor,StringType,true), StructField(smoke,BooleanType,true), StructField(ts,TimestampType,true), cast(value#102 as string), Some(Etc/UTC)) AS value#115]
         :              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49b55271, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@39763e6e, org.apache.spark.sql.util.CaseInsensitiveStringMap@7098eff9, [key#101, value#102, topic#103, partition#104, offset#105L, timestamp#106, timestampType#107], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3076ecbf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> SmokeSensorEvent, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#94, value#95, topic#96, partition#97, offset#98L, timestamp#99, timestampType#100]
         +- Project [sensorTemp#77397, temperature#225, ts#226 AS tsTemp#77401]
            +- Project [sensor#224 AS sensorTemp#77397, temperature#225, ts#226]
               +- Filter (temperature#225 > cast(50 as double))
                  +- Project [value#222.sensor AS sensor#224, value#222.temperature AS temperature#225, value#222.ts AS ts#226]
                     +- Project [from_json(StructField(sensor,StringType,true), StructField(temperature,DoubleType,true), StructField(ts,TimestampType,true), cast(value#209 as string), Some(Etc/UTC)) AS value#222]
                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@339ae263, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@4f7f3119, org.apache.spark.sql.util.CaseInsensitiveStringMap@24f684c4, [key#208, value#209, topic#210, partition#211, offset#212L, timestamp#213, timestampType#214], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3076ecbf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> TemperatureSensorEvent, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#201, value#202, topic#203, partition#204, offset#205L, timestamp#206, timestampType#207]


Append output mode is not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Indeed, **the streaming join can create out of orders**.

Let's add a watermark, then.

In [87]:
q8 = (join_sdf
            .withWatermark(delayThreshold="2 minutes",eventTime="tsTemp") # <-- CHANGE HERE
            .groupBy(window("tsTemp", "1 minutes", "30 seconds"),"sensorTemp")
            .count()
            .writeStream
            .outputMode("append") 
            .format("memory")
            .queryName("sinkTable") 
            .start())

NOTE: 2 minutes is maximum delay that the join can cause given the way we declared it. The temperature and the smoke event cannot be more than 2 minutes apart.

In [89]:
spark.sql("SELECT * FROM sinkTable ORDER BY window DESC").show(5,False)

+------------------------------------------+----------+-----+
|window                                    |sensorTemp|count|
+------------------------------------------+----------+-----+
|[2022-10-20 07:13:00, 2022-10-20 07:14:00]|S1        |66   |
|[2022-10-20 07:12:30, 2022-10-20 07:13:30]|S1        |66   |
|[2022-10-20 07:12:00, 2022-10-20 07:13:00]|S1        |66   |
|[2022-10-20 07:11:30, 2022-10-20 07:12:30]|S1        |66   |
|[2022-10-20 07:11:00, 2022-10-20 07:12:00]|S1        |66   |
+------------------------------------------+----------+-----+
only showing top 5 rows



In [90]:
q8.stop()