Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [1]:
NAME = "Roope Niemi"

---

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## Structured Streaming exercises

In this problem set you will use structured streaming to analyze made-up trail camera data. We will simulate real-time streaming by having multiple data files and loading them one by one.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html



In [2]:
from pyspark.sql import SparkSession, Row
import pyspark.sql
import json
from pyspark.sql.types import *
import time
from IPython.display import display, clear_output
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()

path = "data/"

## Load data

First we'll start with normal dataframe exercises. Create a method that loads the trail camera data into a dataframe. The data is in JSON format. You might have to specify the schema with the StructType methods. The dataframe will have null values called 'null', you can either remove them or leave them be. This dataframe simulates the input dataframe that we will use for streaming.

param `path`: path to the JSON dataset.

`return`: dataframe containing trail camera information.

schema:

Name | Type
------| :-----
time  | Timestamp (nullable = true)
animal_name | String (nullable = true)
weather | String (nullable = true)
battery | Double (nullable = true)

In [3]:
# Create DataFrame representing data in the JSON files
sc = pyspark.SparkContext.getOrCreate()
def loadData(path):    
    # YOUR CODE HERE
    df = spark.read.json(sc.textFile(path)).drop("_corrupt_record")
    return df.select(df.time.cast('timestamp'), df.animal_name, df.weather, df.battery.cast('double')).where((df.time.isNotNull()) & (df.animal_name.isNotNull()) & (df.weather.isNotNull()) & (df.battery.isNotNull()))

In [4]:
#example print
loadData(path).show()

+-------------------+-----------+-------+-------+
|               time|animal_name|weather|battery|
+-------------------+-----------+-------+-------+
|2019-08-13 19:13:18|     Rabbit|  Clear|   66.0|
|2020-01-18 10:27:10|   Squirrel|  Clear|   60.0|
|2020-01-08 23:08:22|   Squirrel| Cloudy|   42.0|
|2020-05-03 02:35:34|     Rabbit|  Clear|   33.0|
|2019-06-09 20:07:34|     Rabbit| Cloudy|   68.0|
|2020-04-30 22:39:53|   Squirrel|  Clear|   20.0|
|2019-07-12 15:10:56|   Squirrel|  Clear|   51.0|
|2020-01-01 22:47:46|       Deer|  Rainy|   65.0|
|2019-09-11 13:45:03|     Rabbit|  Clear|   69.0|
|2020-01-19 11:45:41|   Squirrel|  Storm|   83.0|
|2019-12-16 22:15:37|       Wolf|  Clear|   42.0|
|2019-11-18 07:01:59|     Rabbit|  Clear|   17.0|
|2019-06-18 10:10:17|     Rabbit| Cloudy|   11.0|
|2020-05-31 18:43:44|   Squirrel|  Clear|   85.0|
|2020-06-03 19:08:20|   Squirrel|  Clear|   29.0|
|2020-01-22 07:20:50|   Squirrel|  Rainy|   81.0|
|2020-07-30 09:19:45|     Rabbit| Cloudy|   22.0|


In [5]:
'''loadData tests'''

cols = StructType([ StructField("time", TimestampType(), True),
                    StructField("animal_name", StringType(), True),
                    StructField("weather", StringType(), True),
                    StructField("battery", DoubleType(), True)])


from datetime import datetime
testTs = datetime(2020, 1, 1)

fakeData = [(testTs, "dog", "cloudy", 100.0)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = loadData(path)

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)


## Animal count

Next we will simulate the output dataframe that we will use for streaming. Create a method that counts the number of appearences for each animal. The dataframe should be sorted by count in descending order. You should remove the null rows now if you didn't do it in the last method.

param `df`: trail camera dataframe created using `loadData`.

`return`: dataframe containing number of appearences per animal. The dataframe should include columns "animal_name" and "count". "count"  should be in Long format, it should happen automatically with spark functions. The dataframe must not include count for null values.

example output:

animal_name|count
-------:|-----
Dog| 1234|
Cat| 1111|
Mouse| 999|

In [6]:
from pyspark.sql.functions import *
def animalCount(df):    
    # YOUR CODE HERE
    return df.select(df.animal_name).groupBy(df.animal_name).count().sort(desc("count"))

In [7]:
#example print
animalCount(loadData(path)).show()

+-----------+-----+
|animal_name|count|
+-----------+-----+
|     Rabbit| 1187|
|   Squirrel| 1147|
|       Deer|  351|
|       Bear|  119|
|       Wolf|  111|
+-----------+-----+



In [8]:
'''animalCount tests'''

cols = StructType([ StructField("animal_name", StringType(), True),
                    StructField("count", LongType(), False)])


fakeData = [("dog", 1)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = animalCount(loadData(path))

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

assert df.count() == 5, "the number of rows was expected to be 5 but it was %s" % df.count()

df = df.toPandas()

assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit", "the first item was expected to be Rabbit but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf", "the last item was expected to be Wolf but it was %s" % df.loc[4][0]


## inputDf

Now we will finally do the streaming. First you should specify the schema for the input dataframe. The schema is the same as in the Load Data exercise. Then you should create the input dataframe with `spark.readStream` method. Remember to include the schema and the path. You will also have to include `.option("maxFilesPerTrigger", 1)` so that we can simulate real-time streaming by loading one file at a time.

param `path`: path to the JSON dataset.

`return`: input dataframe containing trail camera information.

In [9]:
def inputDf(path):    
    # YOUR CODE HERE
    userSchema = StructType([ StructField("time", TimestampType(), True),
                    StructField("animal_name", StringType(), True),
                    StructField("weather", StringType(), True),
                    StructField("battery", DoubleType(), True)])
    print(spark.readStream.option("maxFilesPerTrigger", 1).schema(userSchema).json(path))
    return spark.readStream.option("maxFilesPerTrigger", 1).schema(userSchema).json(path)

## outputDf

Next you should create the output dataframe, similar to the Animal Count exercise. You will have to exclude the null values and sort the dataframe by count, descending order.

param `inputDF`: input dataframe created by `inputDf()`.

`return`: filtered and sorted dataframe containing the number of appearences per animal.

In [10]:
def outputDf(inputDF):    
    # YOUR CODE HERE
    return inputDF.select(inputDF.animal_name).where(inputDF.animal_name.isNotNull()).groupBy(inputDF.animal_name).count().sort(desc("count"))
    

## createQuery

Finally, you should start streaming the output dataframe with the `writeStream` method. You will have to include the options `format`="memory", `queryName`="counts" and `outputMode`="complete".

param `outputDF`: output dataframe created by `outputDf()`.

`return`: a query on the output dataframe

In [11]:
def createQuery(outputDF):    
    # YOUR CODE HERE
    return outputDF.writeStream.format("memory").queryName("counts").outputMode("complete").start()

In [12]:
'''streaming tests'''
inputStreamDf = inputDf(path)
outputStreamDf= outputDf(inputStreamDf)
query = createQuery(outputStreamDf)

assert outputStreamDf.isStreaming, "the outputDF was expected to be streaming"

df = spark.sql("select * from counts")

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

status = {'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

assert query.status == status, "the status was expected to be %s but it was %s" % (status, query.status)

assert df.count() == 0, "the number of rows was expected to be 0 when the streaming just started but it was %s" % df.count()
time.sleep(20)
assert df.count() == 5, "the number of rows was expected to be 5 when the streaming has been going for a while but it was %s" % df.count()

df = df.toPandas()

assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit" or df.loc[0][0] == "Squirrel", "the first item was expected to be Rabbit or Squirrel but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf" or df.loc[4][0] == "Bear", "the last item was expected to be Wolf or Bear but it was %s" % df.loc[4][0]


DataFrame[time: timestamp, animal_name: string, weather: string, battery: double]


AssertionError: the number of rows was expected to be 5 when the streaming has been going for a while but it was 5

In [None]:
# stream printing in real-time
# you can increase the total streaming simulation duration by increasing "nIter"
# However, when finally submitting the assignment make sure to change "nIter" back to a smaller value. 
nIter = 5
for i in range(nIter):
    clear_output(wait=True)
    display(query.status)
    display()
    time.sleep(3)

In [None]:
spark.stop()