## Practical 3: Spark Streaming

This notebook provides a structure streaming example using Spark. 

Source: https://github.com/databricks/Spark-The-Definitive-Guide

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [0]:
static = spark.read.json("/databricks-datasets/definitive-guide/data/activity-data/")
dataSchema = static.schema


In [0]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("/databricks-datasets/definitive-guide/data/activity-data")


In [0]:
activityCounts = streaming.groupBy("gt").count()


In [0]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()


In [0]:
activityQuery.lastProgress

Out[32]: {'id': '3aa468d2-26b3-4a65-af03-da352d4299f0',
 'runId': '2ea9a799-e04c-4974-9279-bafa6685ec0f',
 'name': 'activity_counts',
 'timestamp': '2023-02-13T08:36:53.500Z',
 'batchId': 80,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 109, 'triggerExecution': 109},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 7,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 180,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 1649,
   'memoryUsedBytes': 4280,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 5,
   'numStateStoreInstances': 5,
   'customMetrics': {'loadedMapCacheHitCount': 790,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 1864}}],
 'sources': [{'description': 'FileStreamSource[dbfs:/databricks-datasets/definitive-guide/data/activity-data]',
   'startOffset': {'logOffset': 79},
   'endOffset': {'logOffset': 79},
   'latestOffset':

In [0]:
activityQuery.status

Out[33]: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [0]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)


+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       s

In [0]:
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()


In [0]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()


In [0]:
historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
  .cube("gt", "model").avg()\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()
