# 4. Structured Streaming with the Retail Data-Set #
Use the by-day retail data-set to simulate a daily feed of data to be read by a Spark Structured Streaming job.   

Each file that is read in by the `spark.ReadStream` method uses the option *maxFilesPerTrigger=1* to trigger an update to the streaming dataframe contents.  
  
A transformation called `purchaseByCustomerPerHour` is mapped to the streaming dataframe with a query transformation to show the top 5 customers and their spend so far. 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.\
        builder.\
        appName("pyspark-nb-4-streaming").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.eventLog.enabled", "true").\
        config("spark.eventLog.dir", "file:///opt/workspace/events").\
        getOrCreate()      

24/02/12 11:28:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# Spark Streaming functions for creating window and defining a column (col) to calculate the window over
#from pyspark.sql.functions import window, column, desc, col
from pyspark.sql.functions import window,  col

First, get a sample of the data to be processed in a continuous stream.  
+ We have to assume that future data will have the same schema.  

In [3]:
data_sample = spark.read.option("inferSchema", True).option("header", True).csv("/opt/workspace/datain/retail-data/by-day/2010-12-01.csv")

                                                                                

In [4]:
data_sample.take(5)

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.55, CustomerID=17850.0, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=3.39, CustomerID=17850.0, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.75, CustomerID=17850.0, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=3.39, CustomerID=17850.0, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, InvoiceDate=datetime.datetime(20

#### Create a Streaming Dataframe ####
The Streaming Dataframe is created with a Schema based on the data sample we took earlier.

In [5]:
# Get a Schema from the data_sample
staticSchema = data_sample.schema

Create a streaming dataframe using `maxFilesPerTrigger` to trigger an update to the dataframe on file-by-file basis.  The `header` option caters for (assumes) headers on each file that is loaded.

In [6]:
# create a Streaming DataFrame based on a Static Schema from the data_sample
streamingDataFrame = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger",1).format("csv").option("header", "true")\
       .load("/opt/workspace/datain/retail-data/by-day/*.csv")

                                                                                

In [7]:
# check the stream status
streamingDataFrame.isStreaming

True

A streaming transformation `purchaseByCustomerPerHour` is created below; This applies a dataframe transformation which groups the data by *CustomerID* and *InvoiceDate* and sums the "total_cost" of *UnitPrice* x *Quantity*.  

The `window` function is used to define the size of window over which this summary is provided - in this case a 1-day window of time.  This means we get to see which customers have bought the most *in a single day*

In [8]:
# Create a transformation that sum's customer purchase per hour
purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerID", "(UnitPrice * Quantity) as total_cost", "InvoiceDate") \
                                              .groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day")) \
                                              .sum("total_cost")

Create an in-memory structure called `customer_purchases` that  is written to with the output from the streaming transformation. This can be queried using Spark SQL to see the latest result-set.

In [9]:
# Generate updates to an in-memory table after each trigger
purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f5239e22208>

Query `customer_purchases` to see the latest view of which customers have spent the most in a single day.  The results change as new retail data day-files are processed.

In [10]:
spark.sql("""
SELECT * FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""").show(5)

                                                                                

+----------+------+---------------+
|CustomerID|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [11]:
spark.sql("""
SELECT * FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""").show(5)

+----------+------+---------------+
|CustomerID|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [12]:
spark.stop()

24/02/12 11:29:16 ERROR MicroBatchExecution: Query customer_purchases [id = d15934ba-04b8-4b93-b08a-5bca9ba52e0a, runId = d58e30c8-8395-46a9-b8d6-ad45e1042cca] terminated with error
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(CustomerID#110, window#60, 200)
+- *(1) HashAggregate(keys=[CustomerID#110, window#60], functions=[partial_sum(total_cost#50)], output=[CustomerID#110, window#60, sum#125])
   +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(InvoiceDate#108, TimestampType, LongType) - 0) as double) / 8.64E10)) as double) = (cast((precisetimestampconversion(InvoiceDate#108, TimestampType, LongType) - 0) as double) / 8.64E10)) THEN (CEIL((cast((precisetimestampconversion(InvoiceDate#108, TimestampType, LongType) - 0) as double) / 8.64E10)) + 1) ELSE CEIL((cast((precisetimestampconversion(InvoiceDate#108, TimestampType, LongType) - 0) as double) / 8.6