# Chapter 21. Structured Streaming Basics

In [1]:
static = spark.read.json("/Users/sumitagrawal/PSpace/Projects/spark/Spark-The-Definitive-Guide-master/data/activity-data-small/")
dataSchema = static.schema
static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



Structured Streaming does not let you perform schema inference without explicitly enabling it. You can enable schema inference for this by setting the configuration spark.sql.streaming.schemaInference to true.

Given that fact, we will read the schema from one file (that we know has a valid schema) and pass the dataSchema object from our static DataFrame to our streaming DataFrame. As mentioned, you should avoid doing this in a production scenario where your data may (accidentally) change out from under you:

In [2]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("/Users/sumitagrawal/PSpace/Projects/spark/Spark-The-Definitive-Guide-master/data/activity-data-small/")

Just like with other Spark APIs, streaming DataFrame creation and execution is lazy. In particular, we can now specify transformations on our streaming DataFrame before finally calling an action to start the stream. In this case, we’ll show one simple transformation—we will group and count data by the gt column, which is the activity being performed by the user at that point in time:

In [22]:
activityQuery.stop()
activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

In [23]:
 activityQuery.stop()

In [24]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

+-----+-----+
|   gt|count|
+-----+-----+
|stand| 1000|
+-----+-----+

+-----+-----+
|   gt|count|
+-----+-----+
|stand| 1000|
+-----+-----+

+-----+-----+
|   gt|count|
+-----+-----+
|stand| 1000|
+-----+-----+

+-----+-----+
|   gt|count|
+-----+-----+
|stand| 1000|
+-----+-----+

+-----+-----+
|   gt|count|
+-----+-----+
|stand| 1000|
+-----+-----+



In [None]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

## Transformations on Streams

All select and filter transformations are supported in Structured Streaming, as are all DataFrame functions and individual column manipulations. We show a simple example using selections and filtering below. In this case, because we are not updating any keys over time, we will use the Append output mode, so that new results are appended to the output table:



In [13]:
# in Python
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stand", expr("gt like '%stand%'"))\
  .where("stand")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()

In [14]:
simpleTransform.stop()

## Aggregations

In [6]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

spark.sql("SELECT * FROM device_counts").show()

+---+-----+------+------+------+
| gt|model|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+
+---+-----+------+------+------+

