# Spark Lab 11 - Structured Streaming using Python DataFrames API

This lab is modified from Databrick's examples.

Apache Spark 2.0 adds the first version of a new higher-level stream processing API, Structured Streaming. In this lab we are going to take a quick look at how to use DataFrame API to build Structured Streaming applications. We want to compute real-time metrics like running counts and windowed counts on a stream of timestamped actions (e.g. Open, Close, etc).

## Step 1: Inspect Data
We have some sample action data as files in `/databricks-datasets/structured-streaming/events/` which we are going to use to build this appication. Let's take a look at the contents of this directory.

1\. use `%fs ls  /path/to/dir` to view the directory

In [3]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


There are about 50 JSON files in the directory. Let's see what each JSON file contains.

2\. Use `%fs head /path/to/file` to view a file's header

- note that Databrick's support to command line is extremely limited: (https://docs.databricks.com/_static/notebooks/dbutils.html)  and here (https://docs.databricks.com/data/databricks-file-system.html)

In [5]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

Each line in the file contains JSON record with two fields - `time` and `action`. Let's try to analyze these files interactively first

3\. Before we do our analysis, because our cluster is a single node cluster, it is a good idea to use just one partition to do shuffling (at the time of join)

- The `spark.sql.shuffle.partitions` configures the number of partitions that are used when shuffling data for joins or aggregations

In [8]:
spark.conf.set("spark.sql.shuffle.partitions", "1")  # keep the size of shuffles small

## Step 2: Interactive Processing
The usual first step in attempting to process the data is to interactively query the data. Let's define a static DataFrame on the files, and give it a table name.

Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)

- `time`: `timestamp`
- `action`: `string`

4\. In the following, load the data into a DataFrame `staticInputDF`, then verify the dataFrame by showing the first 10 rows

In [10]:
inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = "time timestamp, action string"

# Static DataFrame representing data in the JSON files
staticInputDF = spark.read.schema(jsonSchema).json(inputPath)

# show the first 10 rows
display(staticInputDF.limit(10))


time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


Now we can compute the number of "open" and "close" actions with one hour windows. 

In Structured Streaming, aggregations over a **sliding event-time window** are straightforward and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into.

To specify the sliding even-time window, we need the `window` function from `pyspark.sql.functions` (documentation: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#window).

`window(timeColumn, windowDuration, slideDuration=None, startTime=None)` where

- the time column must be of pyspark.sql.types.TimestampType
- durations can be specified in the form of "5 seconds", "1 week", "1 day 2 hours", and so on.
- If the slideDuration is not provided, the windows will be tumbling (i.e. non-overlapping) windows.

Note that:
- The `Window` function returns a Struct, e.g.,`Row(start=datetime.datetime(2016, 7, 26, 13, 0), end=datetime.datetime(2016, 7, 26, 14, 0))`.
- The same window function can also be used in static queries

5\. create a `staticCountsDF` DataFrame consisting of three columns `action`, `window`, and `count`, where the `count` is the count of each action within the 1-hour tumbling window. 

- Also cache the dataframe

In [12]:
import pyspark.sql.functions as f      # for window() function

staticCountsDF = staticInputDF.groupBy(staticInputDF.action, f.window(staticInputDF.time, "1 hour")).count()
                                       
staticCountsDF.cache()


6\. Register `staticCountsDF` as a Temporary View called "static_counts" and display the first 10 rows.

In [14]:
# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [15]:
display(staticCountsDF.limit(10))

action,window,count
Close,"List(2016-07-28T04:00:00.000+0000, 2016-07-28T05:00:00.000+0000)",960
Open,"List(2016-07-28T04:00:00.000+0000, 2016-07-28T05:00:00.000+0000)",825
Close,"List(2016-07-28T05:00:00.000+0000, 2016-07-28T06:00:00.000+0000)",671
Close,"List(2016-07-28T06:00:00.000+0000, 2016-07-28T07:00:00.000+0000)",191
Close,"List(2016-07-27T12:00:00.000+0000, 2016-07-27T13:00:00.000+0000)",1035
Open,"List(2016-07-27T12:00:00.000+0000, 2016-07-27T13:00:00.000+0000)",994
Open,"List(2016-07-27T13:00:00.000+0000, 2016-07-27T14:00:00.000+0000)",1008
Close,"List(2016-07-27T13:00:00.000+0000, 2016-07-27T14:00:00.000+0000)",986
Open,"List(2016-07-26T13:00:00.000+0000, 2016-07-26T14:00:00.000+0000)",1006
Close,"List(2016-07-26T13:00:00.000+0000, 2016-07-26T14:00:00.000+0000)",1028


Now we can directly use SQL to query the table. 

7\. Use SQL to obtain a timeline of windowed counts of "open" and "close" requests. Then visualize the result using a bar chart (Please use DataBrick's built in function to visualize a query).

In [17]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


Note the two ends of the graph. The close actions are generated such that they are after the corresponding open actions, so there are more "opens" in the beginning and more "closes" in the end.

## Step 3: Stream Processing 
Now that we have analyzed the data interactively, let's convert this to a streaming query that continuously updates as data comes. 

Since we just have a static set of files, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created. The query we have to write is pretty much the same as the interactive query above.

We use `spark.readStream` to access the Structure Stream's readers (file source in particular). 
See here for [some guidelines](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets). 

the file source has a few notable options:
- maxFilesPerTrigger: (default: no max) maximum number of new files to be considered at every trigger
- latestFirst: (default:false) whether to process the latest new files first. By default, we process the earlist file first.
- file format supported including csv, parquet (default), json etc, each has format specific options.

8\. Define a new Structured Streaming source `streamingInputDF` using the same directory, obtaining 1 file per trigger.

In [20]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)


9\. Create a Structured Streaming version of the action count by hourly window example in the above. Then verify if it is a streaming source using `isStreaming`

In [22]:

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(streamingInputDF.action, f.window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

As you can see, `streamingCountsDF` is a streaming Dataframe (`streamingCountsDF.isStreaming` was `true`). You can start streaming computation, by defining the sink and starting it. 
In our case, we want to interactively query the counts (same queries as above), so we will set the complete set of 1 hour counts to be in a in-memory table (note that this for testing purpose only in Spark 2.0).

10\. Use `writeStream.format("memory")` to store an in memory table

- name this table "counts" using `.queryName(tbl_name)`
- set the output mode to `complete`
- don't format to use `start()` to start the query processing.

In [24]:
# spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

`query` is a handle to the streaming query that is running in the background. This query is continuously picking up files and updating the windowed counts. 

Note the status of query in the above cell. The progress bar shows that the query is active. 
Furthermore, if you expand the `> counts` above, you will find the number of files they have already processed. 

Let's wait a bit for a few files to be processed and then interactively query the in-memory `counts` table.

11\. Run the same query with some waiting period in between to see changes as the streaming processing continues

In a real application, we are likely to use sleep to regulate how soon we want to refresh the data, e.g.

```
from time import sleep
sleep(5)  # wait a bit for computation to start
```

In [27]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,328
Open,Jul-26 07:00,320


wait some time before run the next query

In [29]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


We see the timeline of windowed counts (similar to the static one earlier) building up. If we keep running this interactive query repeatedly, we will see the latest updated counts which the streaming query is updating in the background.

Wait some time to rerun the query

In [32]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


Also, let's see the total number of "opens" and "closes".

In [34]:
%sql select action, sum(count) as total_count from counts group by action order by action

action,total_count
Close,16492
Open,17508


If you keep running the above query repeatedly, you will always find that the number of "opens" is more than the number of "closes", as expected in a data stream where a "close" always appear after corresponding "open". This shows that Structured Streaming ensures **prefix integrity**. Read the blog posts linked below if you want to know more.

12\. Note that there are only a few files, so consuming all of them there will be no updates to the counts. Rerun the query if you want to interact with the streaming query again.

- You can stop the query running in the background, either by clicking on the 'Cancel' link in the cell of the query, or by executing `query.stop()`. Either way, when the query is stopped, the status of the corresponding cell above will automatically update to `TERMINATED`.

In [36]:
query.stop()

##What's next?
If you want to learn more about Structured Streaming, here are a few pointers.

- Databricks blog posts on Structured Streaming and Continuous Applications
  - Blog post 1: [Continuous Applications: Evolving Streaming in Apache Spark 2.0](https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html)
  - Blog post 2: [Structured Streaming in Apache Spark]( https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html)

- [Structured Streaming Programming Guide](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

- Spark Summit 2016 Talks
  - [Structuring Spark: Dataframes, Datasets And Streaming](https://spark-summit.org/2016/events/structuring-spark-dataframes-datasets-and-streaming/)
  - [A Deep Dive Into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)