# Colab 4: Stream Processing using Spark (20 points)

In this lab, you will learn how to process stream data using Spark's [Structured Streaming API](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).

## Setup

Let's setup Spark on your Colab environment.  Run the cells below!

---



In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u392-ga-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 33 not upgraded.


In [None]:
#Import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import random
import sys
import time

spark = SparkSession.builder.getOrCreate()

## Data Loading

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Structured Streaming supports several [input sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) for reading in a streaming fashion.

Please create a new folder "activity-data" under your current working folder (i.e., "content" folder). Download the zip file from Canvas, extract all the files, and then upload these files to "activity-data" folder.

Let’s first read in the **static** version of one file in the dataset as a DataFrame. We are going to work with the Heterogeneity Human Activity Recognition Dataset. The data consists of smartphone and smartwatch sensor readings from a variety of devices—specifically, the accelerometer and gyroscope, sampled at the highest possible frequency supported by the devices. Readings from these sensors were recorded while users performed activities like biking, sitting, standing, walking, and so on.

Run the cell below to verify you can access the files properly.

In [None]:
# dont forget to change file path back to original here
static = spark.read.json("/content/drive/MyDrive/CSCD/cscd 430/activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")

static.show(2)


+-------------+-------------------+--------+-----+------+----+-----+-------------+-----------+-------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|          y|            z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+-----------+-------------+
|1424686735175|1424686733176178965|nexus4_1|   35|nexus4|   g|stand| 0.0014038086|  5.0354E-4|-0.0124053955|
|1424686735378|1424686733382813486|nexus4_1|   76|nexus4|   g|stand|-0.0039367676|0.026138306|  -0.01133728|
+-------------+-------------------+--------+-----+------+----+-----+-------------+-----------+-------------+
only showing top 2 rows



##**Q1: Query the static DataFrame (4 points)**

In this query, find the count of each type of activities collected from each device. Sort the result based on the device ID and the activity count.

In [None]:
# your code goes here

static.createOrReplaceTempView("stat")

q1 = spark.sql(
    '''SELECT Device, gt, count(gt) as activity_count
       FROM stat
       GROUP BY Device, gt
       ORDER BY Device, activity_count
    ''')

q1.show()

+--------+----------+--------------+
|  Device|        gt|activity_count|
+--------+----------+--------------+
|nexus4_1|stairsdown|          4788|
|nexus4_1|      bike|          5013|
|nexus4_1|      null|          5181|
|nexus4_1|  stairsup|          5278|
|nexus4_1|     stand|          5395|
|nexus4_1|       sit|          6294|
|nexus4_1|      walk|          6705|
|nexus4_2|stairsdown|          4577|
|nexus4_2|  stairsup|          5174|
|nexus4_2|      null|          5268|
|nexus4_2|      bike|          5783|
|nexus4_2|     stand|          5989|
|nexus4_2|       sit|          6015|
|nexus4_2|      walk|          6551|
+--------+----------+--------------+



## Creating Streaming DataFrame

Next, we want to create a **streaming** DataFrame. By default, Structured Streaming from file-based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. Now let's create a streaming DataFrame using the data schema from the above static DataFrame.


In [None]:
dataSchema = static.schema

# dont forget to change file path back to original
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
    .json("/content/drive/MyDrive/CSCD/cscd 430/activity-data")

Here `maxFilesPerTrigger` allows us to control how quickly Spark will read all of the files in the folder. By specifying this value lower, we’re partificially limiting the flow of the stream to one file per trigger. This helps us demonstrate how Structured Streaming runs incrementally in this exercise. Note that this streaming DataFrame is going to use all the files inside `activity-data` folder.


## **Q2: Query the streaming DataFrame (4 points)**

Streaming DataFrames are largely the same as static DataFrames. Basically, all of the transformations that are available in the static Structured APIs apply to Streaming DataFrames.

In this query, you define the exact same query as Q1, **but use the streaming DataFrame instead**.

In [None]:
# your code goes here

treaming.createOrReplaceTempView("stream")

q2 = spark.sql(
    '''SELECT Device, gt, count(gt) as activity_count
       FROM stream
       GROUP BY Device, gt
       ORDER BY Device, activity_count
    ''')



Let's set the shuffle partitions to a small value to avoid creating too many shuffle partitions.

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

## **Q3: Start the streaming query (4 points)**

Now, you want to start this streaming query. You need to write the output to an in-momory table called `activity_counts` (later you will query this table to see the query results). Please use `complete` as the [output mode](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) and use `memory` as the [output sink](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks) for this query.

In [None]:
# your code goes here

query = q2.writeStream.outputMode("complete").format("memory").queryName("activity_counts").start()

Now this stream is running. We can experiment with the results by querying the in-memory table `activity_counts` you have created in the previous step. We’ll do this in a simple loop that will print the results of the streaming query every three seconds. Your results might be different from mine, but overall, you should be able to see the incremental activity counts as the query loops through.

In [None]:
from time import sleep
for x in range(4):
      spark.sql("SELECT * FROM activity_counts").show()
      sleep(3)

+------+---+--------------+
|Device| gt|activity_count|
+------+---+--------------+
+------+---+--------------+

+--------+----------+--------------+
|  Device|        gt|activity_count|
+--------+----------+--------------+
|nexus4_1|stairsdown|          4815|
|nexus4_1|      bike|          4892|
|nexus4_1|      null|          5247|
|nexus4_1|  stairsup|          5273|
|nexus4_1|     stand|          5357|
|nexus4_1|       sit|          6217|
|nexus4_1|      walk|          6665|
|nexus4_2|stairsdown|          4547|
|nexus4_2|  stairsup|          5183|
|nexus4_2|      null|          5200|
|nexus4_2|      bike|          5905|
|nexus4_2|     stand|          6030|
|nexus4_2|       sit|          6090|
|nexus4_2|      walk|          6591|
+--------+----------+--------------+

+--------+----------+--------------+
|  Device|        gt|activity_count|
+--------+----------+--------------+
|nexus4_1|stairsdown|          9526|
|nexus4_1|      bike|          9806|
|nexus4_1|      null|         10538

## Understand Event Time

In stream processing systems, there are two relevant times for each event: the time at which it actually occurred (***event time***), and the time that it was processed or reached the stream processing system (***processing time***).

In the activity dataset, there are two time-based columns. The `Creation_Time` column defines when an event was created, whereas the `Arrival_Time` defines when an event hit the servers. We will use `Creation_Time` for our experiments.

The first step in event-time analysis is to convert the timestamp column into the proper Spark SQL timestamp type. The current `Creation_Time` column is unixtime nanoseconds (represented as a long), therefore we need to do a little manipulation to get it into the proper timestamp format.

In [None]:
withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

You can see that a new column `event_time` is added to the streaming DataFrame `withEventTime`:

In [None]:
withEventTime.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- event_time: timestamp (nullable = true)



## **Q4: Query Tumbling Windows (4 points)**

Spark has two types of [Windows](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#types-of-time-windows) - Tumbling Window and Sliding Window. The main difference between these windows is that, tumbling windows are **non-overlapping** whereas sliding windows can be **overlapping**.

Let's first query non-overlapping **tumbling windows**. The query is to count the number of activities in **each window of 20 minutes** based on the `event_time` in the `withEventTime` DataFrame. We want to show the streaming query results incrementally by setting the output mode to `complete` and sending the output to an in-memory table called `tumbling_windows`. Please sort the query results based on the time ordering of these windows.

In [None]:
from pyspark.sql.functions import window, col

# your code goes here



Now let's query the in-memory table `tumbling_windows` 4 times to see the incremental results.

In [None]:
from time import sleep
for x in range(4):
      spark.sql("SELECT * FROM tumbling_windows").show(truncate=False)
      sleep(3)

+------+-----+
|window|count|
+------+-----+
+------+-----+

+------+-----+
|window|count|
+------+-----+
+------+-----+

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-02-22 00:40:00, 2015-02-22 01:00:00}|1    |
|{2015-02-23 10:00:00, 2015-02-23 10:20:00}|115  |
|{2015-02-23 10:20:00, 2015-02-23 10:40:00}|2493 |
|{2015-02-23 10:40:00, 2015-02-23 11:00:00}|3211 |
|{2015-02-23 11:00:00, 2015-02-23 11:20:00}|2451 |
|{2015-02-23 11:20:00, 2015-02-23 11:40:00}|898  |
|{2015-02-23 12:00:00, 2015-02-23 12:20:00}|708  |
|{2015-02-23 12:20:00, 2015-02-23 12:40:00}|2566 |
|{2015-02-23 12:40:00, 2015-02-23 13:00:00}|2545 |
|{2015-02-23 13:00:00, 2015-02-23 13:20:00}|4172 |
|{2015-02-23 13:20:00, 2015-02-23 13:40:00}|2457 |
|{2015-02-23 13:40:00, 2015-02-23 14:00:00}|4511 |
|{2015-02-23 14:00:00, 2015-02-23 14:20:00}|3364 |
|{2015-02-23 14:20:00, 2015-02-23 14:40:00}|2269 |
|{2015-02-2

## **Q5: Query Sliding Windows (4 points)**

Lastly, we want to develop a similar query to the streaming DataFrame `withEventTime`, but this time we want to define our query using **sliding windows**. We want to use the same **window width of 20 minutes**, but we want to slide the window through the timeline **every 10 minutes**. This time send the query results to an in-memory table called `sliding_windows`. Please make sure to sort the query results based on the time ordering of these windows.

In [None]:
from pyspark.sql.functions import window, col

# your code goes here

Finally, let's query the in-memory table `sliding_windows` 4 times to see the incremental results.

In [None]:
from time import sleep
for x in range(4):
      spark.sql("SELECT * FROM sliding_windows").show(truncate=False)
      sleep(3)

+------+-----+
|window|count|
+------+-----+
+------+-----+

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-02-22 00:30:00, 2015-02-22 00:50:00}|1    |
|{2015-02-22 00:40:00, 2015-02-22 01:00:00}|1    |
|{2015-02-23 10:00:00, 2015-02-23 10:20:00}|115  |
|{2015-02-23 10:10:00, 2015-02-23 10:30:00}|1316 |
|{2015-02-23 10:20:00, 2015-02-23 10:40:00}|2493 |
|{2015-02-23 10:30:00, 2015-02-23 10:50:00}|2404 |
|{2015-02-23 10:40:00, 2015-02-23 11:00:00}|3211 |
|{2015-02-23 10:50:00, 2015-02-23 11:10:00}|3406 |
|{2015-02-23 11:00:00, 2015-02-23 11:20:00}|2451 |
|{2015-02-23 11:10:00, 2015-02-23 11:30:00}|2042 |
|{2015-02-23 11:20:00, 2015-02-23 11:40:00}|898  |
|{2015-02-23 12:00:00, 2015-02-23 12:20:00}|708  |
|{2015-02-23 12:10:00, 2015-02-23 12:30:00}|2067 |
|{2015-02-23 12:20:00, 2015-02-23 12:40:00}|2566 |
|{2015-02-23 12:30:00, 2015-02-23 12:50:00}|2458 |
|{2015-02-23 12:40:00

Congratulations on the completion of another colab assignment! I hope you learned the basic concepts related to Spark Structured Streaming through this assignment. I would recommend you to read the rest of [this document](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide) and further explore other features such as [Handling Late Data and Watermarking](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking), [Join Operations](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations), and [Streaming Deduplication](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication), etc. Thank you!

##Reference
*Spark: The Definitive Guide* (2018), by Bill Chambers and Matei Zaharia, ISBN: 9781491912218