# Structured Streaming Lab

In this lab, you will explore the functionality of Spark Streaming and how it's used. 

In [None]:
sc

In [None]:
spark

First, you will load a set of JSON files that comes from the  Heterogeneity Human Activity Recognition Dataset. The data consists of smartphone and smartwatch sensor readings from a variety of devices—specifically, the accelerometer and gyroscope, sampled at the highest possible frequency supported by the devices. Readings from these sensors were recorded while users performed activities like biking, sitting, standing, walking, and so on. 

The path to the files is `s3://bigdatateaching/spark-streaming-sample-data/`. In the following cell, create a static DataFrame object called `static` that reads the JSON files and uses the JSON read function: 

In [None]:
static = [[fill this partk]]

In [None]:
dataSchema = static.schema

Look at the schema:

In [None]:
dataSchema

Next, create a streaming version of the same Dataset, which will read each input file in the dataset one by one as if it was a stream. Lookup the `spark.readStream.schema` function and use the appropriate parameters. Fill in the missing parts below:

In [None]:
streaming = [[fill this part]]\
    .option("maxFilesPerTrigger", 1)\
    .json([[fill this part]])

In the following cell, you will perform one simple transformation. You will group and count data by the gt column.

In [None]:
activityCounts = [[fill this part]].groupBy("gt").count()

Now that you've setup your transformation, you need to specify the your action to start the query.

In [None]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

You can see how many streams are active at a given point in time:

In [None]:
spark.streams.active

To see streaming in action, the next cell you will run a SQL query on the `activity_counts` that was previously generated. This will run the SQL query 5 times, waiting one second, and you'll see how the data changes as more data is read in.

In [None]:
from time import sleep
for x in range(5):
    spark.sql([[fill in this part]]).show()
    sleep(1)

In [None]:
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()


In [None]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

In [None]:
historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
  .cube("gt", "model").avg()\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream.queryName("device_counts_2").format("memory")\
  .outputMode("complete")\
  .start()


In [None]:
spark.sql("select * from device_counts_2").show()