# NOTEBOOK 6.1 Spark Structured Streaming
Adapted from: Spark - The Definitive Guide by Bill Chambers and Matei Zaharia (2018)

This demo requires the folder **activity-data** to be put in the folder named **data** in HDFS. This folder contains the Heterogeneity Human Activity Recognition Dataset which consists of smartphone and smartwatch sensor readings from a variety of devices - specifically, the accelerometer and gyroscope, sampled at the highest possible frequency supported by the devices. Readings from these sensors were recorded while users performed activities like biking, sitting, standing, walking, etc. There were several different smartphones and smartwatches used, anda total of 9 users.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("StructuredStreamingDemo")\
        .getOrCreate()

25/06/12 16:22:24 WARN Utils: Your hostname, PC25. resolves to a loopback address: 127.0.1.1; using 192.168.76.195 instead (on interface eth0)
25/06/12 16:22:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/12 16:22:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Simulating Streaming

### 1.1 Read the static version of the dataset as a DataFrame:

In [2]:
activityStaticDf = spark.read.json("data/activity-data")
dataSchema = activityStaticDf.schema
dataSchema

                                                                                

StructType([StructField('Arrival_Time', LongType(), True), StructField('Creation_Time', LongType(), True), StructField('Device', StringType(), True), StructField('Index', LongType(), True), StructField('Model', StringType(), True), StructField('User', StringType(), True), StructField('gt', StringType(), True), StructField('x', DoubleType(), True), StructField('y', DoubleType(), True), StructField('z', DoubleType(), True)])

In [3]:
activityStaticDf.printSchema()
activityStaticDf.show(5)

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654

### 1.2 What are the information stored in the DataFrame?

### 1.3 Create a streamining version of the same dataset
Each input file in the dataset one by one as if it was a stream.

**Streaming DataFrames** are similar to static DataFrames. We create them within Spark applications and then perform transformations on them to get the data into the correct format. One small difference between streaming DataFrames and static DataFrames is that Structured Streaming does not allow schema inference without explicitly enabling it. We enable schema inference by setting the configuraiton **spark.sql.streaming.schemaInference** to **true**.

Thus, we will read the schema from one file (with a valid schema) and pass the **dataSchema** object from our static DataFrame to our streamining DataFrame:

In [4]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("data/activity-data")

Note: **maxFilesPerTrigger** allows us to control how quickly Spark will read all the files in the folder. In this demo, we set a lower value to limit the flow of the stream to one file per trigger. This is just to demonstrate how Structured Streaming runs incrementally.

### 1.4 Group and count data by the **gt** column
The **gt** column is the activity being performed by the user at that point in time.

In [5]:
activityCounts = streaming.groupBy("gt").count()

#### To avoid too many shuffle partitions

In [6]:
# Set the shuffle partitions to a small value
spark.conf.set("spark.sql.shuffle.partitions", 5)

### 1.5 Specify action to start the query.
- Output destination for the result of this query: write to a _memory sink_ which keeps an in-memory table of the results.
- How Spark will output the data: _complete_ output mode. (This mode rewrites all the keys along with theier counts after every trigger)

Once the following code is executed, the streamining computation will be started in the background. The query object **activityQuery** is a handle to that active streamining query.

In [7]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

25/06/12 16:22:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ef32c3e4-a01d-4f31-b3e2-798e04d338e4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/12 16:22:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### 1.6 Query the in-memory table of the current output of the streaming aggregation
Note: the in-memory table has the same name as the stream, i.e. **activity_counts**

In [8]:
from time import sleep
for i in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(3)

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+----------+------+
|        gt| count|
+----------+------+
|       sit| 98475|
|     stand| 91084|
|stairsdown| 74913|
|      walk|106039|
|  stairsup| 83654|
|      null| 83566|
|      bike| 86368|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|258492|
|     stand|239087|
|stairsdown|196627|
|      walk|278353|
|  stairsup|219602|
|      null|219375|
|      bike|226720|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|443119|
|     stand|409851|
|stairsdown|337074|
|      walk|477183|
|  stairsup|376463|
|      null|376079|
|      bike|388672|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|627756|
|     stand|580625|
|stairsdown|477537|
|      walk|676007|
|  stairsup|533326|
|      null|532772|
|      bike|550608|
+----------+------+



In [9]:
spark.streams.active

[<pyspark.sql.streaming.query.StreamingQuery at 0x7f4ef93cc640>]

In [10]:
# # Specify to wait for termination of the query to prevent the driver process from exiting while the query is active.
# activityQuery.awaitTermination()

In [11]:
activityQuery.stop()

## 2. Transformations on Streams

### 2.1 Selections & Filtering

In this demo, we are not updating any keys over time. Therefore, we will use the _Append_ output mode so that new results are appended to the output table.

In [12]:
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()

# simpleTransform.awaitTermination()

25/06/12 16:22:48 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c1c7a04a-c028-46cb-b945-9b329418fc98. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/12 16:22:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [13]:
# Note: this may take a while
for i in range(3):
    spark.sql("SELECT * FROM simple_transform").show()
    sleep(3)

+---+-----+------------+-------------+
| gt|model|arrival_time|creation_time|
+---+-----+------------+-------------+
+---+-----+------------+-------------+



25/06/12 16:22:52 WARN TaskSetManager: Stage 168 contains a task of very large size (1400 KiB). The maximum recommended task size is 1000 KiB.


+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983758|1424689829813792398|
|stairsup|nexus4|1424687984076|1424687982079402816|
|stairsup|nexus4|1424687984475|1424687982484364704|
|stairsup|nexus4|1424687984878|1424687982887562947|
|stairsup|nexus4|1424687985286|1424687983295717413|
|stairsup|nexus4|1424687985682|1424687983685793097|
|stairsup|nexus4|1424687986090|1424687984093630011|
|stairsup|nexus4|1424687986488|1424687984497347052|
|stairsup|nexus4|1424687986893|1424687984899471108|
|stairsup|nexus4|1424687987295|1424687985302272622|
|stairsup|nexus4|1424687987695|1424687985705056175|
|stairsup|nexus4|1424687988024|1424689834071137344|
|stairsup|nexus4|1424687988225|1424687986233925804|
|stairsup|nexus4|1424687988428|1424689834473938858|
|stairsup|nexus4|1424687988628|1424689834675293838|
|stairsup|nexus4|1424687988830|1424689834876892959|
|stairsup|ne

25/06/12 16:22:56 WARN TaskSetManager: Stage 192 contains a task of very large size (3352 KiB). The maximum recommended task size is 1000 KiB.


+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983758|1424689829813792398|
|stairsup|nexus4|1424687984076|1424687982079402816|
|stairsup|nexus4|1424687984475|1424687982484364704|
|stairsup|nexus4|1424687984878|1424687982887562947|
|stairsup|nexus4|1424687985286|1424687983295717413|
|stairsup|nexus4|1424687985682|1424687983685793097|
|stairsup|nexus4|1424687986090|1424687984093630011|
|stairsup|nexus4|1424687986488|1424687984497347052|
|stairsup|nexus4|1424687986893|1424687984899471108|
|stairsup|nexus4|1424687987295|1424687985302272622|
|stairsup|nexus4|1424687987695|1424687985705056175|
|stairsup|nexus4|1424687988024|1424689834071137344|
|stairsup|nexus4|1424687988225|1424687986233925804|
|stairsup|nexus4|1424687988428|1424689834473938858|
|stairsup|nexus4|1424687988628|1424689834675293838|
|stairsup|nexus4|1424687988830|1424689834876892959|
|stairsup|ne

3/01/26 11:12:05 WARN TaskSetManager: Stage 272 contains a task of very large size (19820 KiB). The maximum recommended task size is 1000 KiB.                                 (0 + 0) / 1]
+--------+------+-------------+-------------------+                                                                                                                                         
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983719|1424687981726802718|
|stairsup|nexus4|1424687984000|1424687982009853255|
|stairsup|nexus4|1424687984404|1424687982411977009|
|stairsup|nexus4|1424687984805|1424687982814351277|
|stairsup|nexus4|1424687985210|1424687983217500861|
|stairsup|nexus4|1424687985620|1424687983620332892|
|stairsup|nexus4|1424687986016|1424687984023164923|
|stairsup|nexus4|1424687986420|1424687984425874884|
|stairsup|nexus4|1424687986820|1424687984828822915|
|stairsup|nexus4|1424687987225|1424687985231654946|
|stairsup|nexus4|1424687987625|1424687985634469017|
|stairsup|nexus4|1424687987992|1424687986002114280|
|stairsup|nexus4|1424687988191|1424689834237427627|
|stairsup|nexus4|1424687988392|1424689834438660537|
|stairsup|nexus4|1424687988592|1424689834640076553|
|stairsup|nexus4|1424687988794|1424689834841675674|
|stairsup|nexus4|1424687988999|1424689835047943984|
|stairsup|nexus4|1424687989200|1424687987205721701|
|stairsup|nexus4|1424687989409|1424689835458070221|
|stairsup|nexus4|1424687989606|1424687987613772238|
+--------+------+-------------+-------------------+

In [14]:
simpleTransform.stop()

### 2.2 Aggregations
The following example uses the aggregation **cube** on the phone model and activity and the average x, y, z accelerations of the sensor:

In [15]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

25/06/12 16:22:59 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-82434947-5988-491e-8a36-7d4d9bb909ce. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/12 16:22:59 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [16]:
for x in range(2):
    spark.sql("SELECT * FROM device_counts").show()
    sleep(3)

+---+-----+------+------+------+
| gt|model|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+
+---+-----+------+------+------+

+----------+------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+
|       sit|  NULL|-5.00171701972525...| 3.29206100777474E-4|-1.91068888778222...|
|      walk|nexus4|-0.00457872511456...|0.001398124138588...|-0.00166899850844...|
|      walk|  NULL|-0.00457872511456...|0.001398124138588...|-0.00166899850844...|
|  stairsup|  NULL|-0.02528056535479...|-0.01004925366732...|-0.10050482947278248|
|     stand|  NULL|-3.24754958945150...|2.709525612972639...| 3.83496236784506E-4|
|      bike|  NULL|0.023420025203896726|-0.00878518609104...|-0.08219914587308406|
|  stairsup|nexus4|-0.02528056535479...|-0.01004925366732...|-0.10050482947278248|
|      NULL|nexus4|5.367647483176713E

In [17]:
deviceModelStats.stop()

25/06/12 16:23:05 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 23, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@67474b9b] is aborting.
25/06/12 16:23:05 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 23, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@67474b9b] aborted.


### 2.3 Joins
Example for join streaming DataFrames to static DataFrames:

In [18]:
historicalAgg = activityStaticDf.groupBy("gt", "model").avg()

deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
  .cube("gt", "model").avg()\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

25/06/12 16:23:06 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-629e051d-3a07-40af-8ce6-f5a1f5be5804. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/12 16:23:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [19]:
for x in range(2):
    spark.sql("SELECT * FROM device_counts").show()
    sleep(2)

+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
| gt|model|avg(x)|avg(y)|avg(z)|avg(Arrival_Time)|avg(Creation_Time)|avg(Index)|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+



[Stage 266:>                (0 + 0) / 5][Stage 267:>              (0 + 16) / 16]

+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
| gt|model|avg(x)|avg(y)|avg(z)|avg(Arrival_Time)|avg(Creation_Time)|avg(Index)|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+



                                                                                

In [20]:
deviceModelStats.stop()

25/06/12 16:23:10 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@22854a3d] is aborting.
25/06/12 16:23:10 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@22854a3d] aborted.


In [21]:
spark.stop()

25/06/12 16:23:10 WARN TaskSetManager: Lost task 11.0 in stage 269.0 (TID 1256) (192.168.76.195 executor driver): TaskKilled (Stage cancelled: Job 174 cancelled part of cancelled job group 35957687-cb64-481f-b532-ea5977e4aef1)
25/06/12 16:23:10 WARN TaskSetManager: Lost task 3.0 in stage 269.0 (TID 1248) (192.168.76.195 executor driver): TaskKilled (Stage cancelled: Job 174 cancelled part of cancelled job group 35957687-cb64-481f-b532-ea5977e4aef1)
25/06/12 16:23:10 WARN TaskSetManager: Lost task 9.0 in stage 269.0 (TID 1254) (192.168.76.195 executor driver): TaskKilled (Stage cancelled: Job 174 cancelled part of cancelled job group 35957687-cb64-481f-b532-ea5977e4aef1)
25/06/12 16:23:10 WARN TaskSetManager: Lost task 1.0 in stage 269.0 (TID 1246) (192.168.76.195 executor driver): TaskKilled (Stage cancelled: Job 174 cancelled part of cancelled job group 35957687-cb64-481f-b532-ea5977e4aef1)
25/06/12 16:23:10 WARN TaskSetManager: Lost task 5.0 in stage 269.0 (TID 1250) (192.168.76.195 