# Structured Streaming using the Python DataFrames API

Apache Spark includes a high-level stream processing API, [Structured Streaming](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). In this notebook we take a quick look at how to use the DataFrame API to build Structured Streaming applications. We want to compute real-time metrics like running counts and windowed counts on a stream of timestamped actions (e.g. Open, Close, etc).

To run this notebook, import it and attach it to a Spark cluster.

## Sample Data
We have some sample action data as files in `/databricks-datasets/structured-streaming/events/` which we are going to use to build this appication. Let's take a look at the contents of this directory.

In [0]:
# Look at the content of the following folder: /databricks-datasets/structured-streaming/events/
# What do you see?
display(dbutils.fs.ls('/databricks-datasets/structured-streaming/events'))

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


There are about 50 JSON files in the directory. Let's see what each JSON file contains.

In [0]:
# Open one file
dbutils.fs.head('/databricks-datasets/structured-streaming/events/file-2.json').split()

Each line in the file contains JSON record with two fields - `time` and `action`. Let's try to analyze these files interactively.

## Batch/Interactive Processing
The usual first step in attempting to process the data is to interactively query the data. Let's define a static DataFrame on the files, and give it a table name.

In [0]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType(
  [ StructField("time", TimestampType(), True),
   StructField("action", StringType(), True) ]
)



In [0]:
# Read one json file taking into account the defined schema and display the content 
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


In [0]:
testSchema = StructType(
  [ StructField("Time", TimestampType(), True),
   StructField("action", StringType(), True) ]
)


df = (spark.read.format('json')
            .option("header", "true")
            #.option("InferSchema", "true")
            .schema(testSchema)
            .load(inputPath))
df.display()

Time,action
,Close
,Close
,Open
,Close
,Open
,Open
,Close
,Close
,Close
,Open


- Compare the dates from the output without schema and with it. 
- Did you notice that inputPath is a folder?

In [0]:
# Calculate the total number of 'Open' and 'Close' actions 
(
  staticInputDF
    .groupBy(staticInputDF.action)    
    .count()
).display()

action,count
Open,50000
Close,50000


In [0]:
# Determine min time
from pyspark.sql.functions import min, to_date

(
  staticInputDF
  .select(min("time").alias('min_time'))
  .display()
)

min_time
2016-07-26T02:45:07.000+0000


In [0]:
staticInputDF.explain()

In [0]:
# Determine max time
from pyspark.sql.functions import max

(
  staticInputDF
  .select(max("time").alias('max_time'))
  .display()
)

max_time
2016-07-28T06:48:19.000+0000


In [0]:
# Calculate the number of "open" and "close" actions with one hour windows: staticCountsDF
# Look at groupBy(..., window()) function
from pyspark.sql.functions import *

staticCountsDF = (
  staticInputDF
    .groupBy(staticInputDF.action, window(staticInputDF.time, "1 hour"))    
    .count()
    .sort('window')
)
staticCountsDF.display()

action,window,count
Close,"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",11
Open,"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",179
Close,"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",344
Open,"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",1001
Open,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",999
Close,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",815
Close,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1003
Open,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1000
Close,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",1011
Open,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",993


In [0]:
# Make this window a sliding window (30 minutes overlap): staticCountsSW
staticCountsSW = (
    staticInputDF
      .groupBy(staticInputDF.action, window(staticInputDF.time, "1 hour", "30 minutes"))    
      .count()
)
staticCountsSW.display()

action,window,count
Close,"List(2016-07-26T13:00:00.000+0000, 2016-07-26T14:00:00.000+0000)",1028
Open,"List(2016-07-27T04:00:00.000+0000, 2016-07-27T05:00:00.000+0000)",995
Close,"List(2016-07-26T10:30:00.000+0000, 2016-07-26T11:30:00.000+0000)",1040
Close,"List(2016-07-28T05:30:00.000+0000, 2016-07-28T06:30:00.000+0000)",422
Open,"List(2016-07-26T11:00:00.000+0000, 2016-07-26T12:00:00.000+0000)",991
Open,"List(2016-07-26T10:00:00.000+0000, 2016-07-26T11:00:00.000+0000)",1007
Close,"List(2016-07-27T20:30:00.000+0000, 2016-07-27T21:30:00.000+0000)",1001
Close,"List(2016-07-27T03:00:00.000+0000, 2016-07-27T04:00:00.000+0000)",1025
Close,"List(2016-07-27T13:00:00.000+0000, 2016-07-27T14:00:00.000+0000)",986
Open,"List(2016-07-27T22:30:00.000+0000, 2016-07-27T23:30:00.000+0000)",999


In [0]:
# Register staticCountsDF (createOrReplaceTempView) as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

Now we can directly use SQL to query the table. For example, here are the total counts across all the hours.

In [0]:
%sql
select action, sum(count) as total_count 
from static_counts
group by action

action,total_count
Open,50000
Close,50000


In [0]:
%sql
-- How many actions (Close and Open separately) is within each time window (in the table static_counts)
-- select action, date_format(window.end, "MMM-dd HH:mm") as time, count
select action, window.start as time, count
from static_counts
order by time, action

action,time,count
Close,2016-07-26T02:00:00.000+0000,11
Open,2016-07-26T02:00:00.000+0000,179
Close,2016-07-26T03:00:00.000+0000,344
Open,2016-07-26T03:00:00.000+0000,1001
Close,2016-07-26T04:00:00.000+0000,815
Open,2016-07-26T04:00:00.000+0000,999
Close,2016-07-26T05:00:00.000+0000,1003
Open,2016-07-26T05:00:00.000+0000,1000
Close,2016-07-26T06:00:00.000+0000,1011
Open,2016-07-26T06:00:00.000+0000,993


Note the two ends of the graph. The close actions are generated such that they are after the corresponding open actions, so there are more "opens" in the beginning and more "closes" in the end.

## Stream Processing 
Now that we have analyzed the data interactively, let's convert this to a streaming query that continuously updates as data comes. Since we just have a static set of files, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created. The query we have to write is pretty much the same as the interactive query above.

In [0]:
from pyspark.sql.functions import *

# Read data from a file
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Do some transformations
# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [0]:
streamingInputDF.display()

time,action
2016-07-26T02:45:07.000+0000,Open
2016-07-26T02:45:47.000+0000,Open
2016-07-26T02:46:42.000+0000,Open
2016-07-26T02:46:59.000+0000,Open
2016-07-26T02:47:05.000+0000,Open
2016-07-26T02:47:14.000+0000,Open
2016-07-26T02:47:25.000+0000,Open
2016-07-26T02:47:26.000+0000,Open
2016-07-26T02:47:28.000+0000,Open
2016-07-26T02:47:36.000+0000,Open


In [0]:
streamingCountsDF.display()

action,window,count
Open,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1000
Close,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",1011
Close,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",815
Close,"List(2016-07-26T08:00:00.000+0000, 2016-07-26T09:00:00.000+0000)",985
Open,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",999
Open,"List(2016-07-26T07:00:00.000+0000, 2016-07-26T08:00:00.000+0000)",1008
Close,"List(2016-07-26T07:00:00.000+0000, 2016-07-26T08:00:00.000+0000)",989
Close,"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",344
Open,"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",1001
Close,"List(2016-07-26T09:00:00.000+0000, 2016-07-26T10:00:00.000+0000)",332


As you can see, `streamingCountsDF` is a streaming Dataframe (`streamingCountsDF.isStreaming` was `true`). You can start streaming computation, by defining the sink and starting it. 
In our case, we want to interactively query the counts (same queries as above), so we will set the complete set of 1 hour counts to be in a in-memory table.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    #.trigger(processingTime='15 seconds')
    .start()
)

`query` is a handle to the streaming query that is running in the background. This query is continuously picking up files and updating the windowed counts. 

Note the status of query in the above cell. The progress bar shows that the query is active. 
Furthermore, if you expand the `> counts` above, you will find the number of files they have already processed. 

Let's wait a bit for a few files to be processed and then interactively query the in-memory `counts` table.

In [0]:
%sql
SELECT *
FROM counts

action,window,count
Open,"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",179
Close,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",815
Open,"List(2016-07-26T09:00:00.000+0000, 2016-07-26T10:00:00.000+0000)",1000
Close,"List(2016-07-26T08:00:00.000+0000, 2016-07-26T09:00:00.000+0000)",985
Close,"List(2016-07-26T07:00:00.000+0000, 2016-07-26T08:00:00.000+0000)",989
Close,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1003
Close,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",1011
Open,"List(2016-07-26T07:00:00.000+0000, 2016-07-26T08:00:00.000+0000)",1008
Open,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",993
Close,"List(2016-07-26T10:00:00.000+0000, 2016-07-26T11:00:00.000+0000)",347


In [0]:
from time import sleep
sleep(5)  # wait a bit for computation to start

In [0]:
%sql
select action, date_format(window.end, "MMM-dd HH:mm") as time, count
from counts
order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


We see the timeline of windowed counts (similar to the static one earlier) building up. If we keep running this interactive query repeatedly, we will see the latest updated counts which the streaming query is updating in the background.

In [0]:
sleep(5)  # wait a bit more for more data to be computed

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


In [0]:
sleep(5)  # wait a bit more for more data to be computed

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

Also, let's see the total number of "opens" and "closes".

In [0]:
%sql 
select action, sum(count) as total_count 
from counts 
group by action 
order by action

If you keep running the above query repeatedly, you will always find that the number of "opens" is more than the number of "closes", as expected in a data stream where a "close" always appear after corresponding "open". This shows that Structured Streaming ensures **prefix integrity**. Read the blog posts linked below if you want to know more.

Note that there are only a few files, so consuming all of them there will be no updates to the counts. Rerun the query if you want to interact with the streaming query again.

Finally, you can stop the query running in the background, either by clicking on the 'Cancel' link in the cell of the query, or by executing `query.stop()`. Either way, when the query is stopped, the status of the corresponding cell above will automatically update to `TERMINATED`.

In [0]:
query.stop()

### Airlines

To test your streaming competences, develop an example on `airlines`dataset.

In [0]:
display(dbutils.fs.ls('/databricks-datasets/airlines/'))

path,name,size
dbfs:/databricks-datasets/airlines/README.md,README.md,1089
dbfs:/databricks-datasets/airlines/_SUCCESS,_SUCCESS,0
dbfs:/databricks-datasets/airlines/part-00000,part-00000,67108879
dbfs:/databricks-datasets/airlines/part-00001,part-00001,67108862
dbfs:/databricks-datasets/airlines/part-00002,part-00002,67108930
dbfs:/databricks-datasets/airlines/part-00003,part-00003,67108804
dbfs:/databricks-datasets/airlines/part-00004,part-00004,67108908
dbfs:/databricks-datasets/airlines/part-00005,part-00005,67108890
dbfs:/databricks-datasets/airlines/part-00006,part-00006,67108825
dbfs:/databricks-datasets/airlines/part-00007,part-00007,67108880


In [0]:
spark.read.text('dbfs:/databricks-datasets/airlines/README.md').display()

value
================================================
Airline On-Time Statistics and Delay Causes
================================================
## Background
"The U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS) tracks the on-time performance of domestic flights operated by large air carriers. Summary information on the number of on-time, delayed, canceled and diverted flights appears in DOT's monthly Air Travel Consumer Report, published about 30 days after the month's end, as well as in summary tables posted on this website. BTS began collecting details on the causes of flight delays in June 2003. Summary statistics and raw data are made available to the public at the time the Air Travel Consumer Report is released."
FAQ Information is available at http://www.rita.dot.gov/bts/help_with_data/aviation/index.html
## Data Source
http://www.transtats.bts.gov/OT_Delay/OT_DelayCause1.asp
## Usage Restrictions
The data is released under the Freedom of Information act. More information can be found at http://www.fas.org/sgp/foia/citizen.html


In [0]:
dbutils.fs.head('/databricks-datasets/airlines/part-00000').split()

In [0]:
# Read one json file by inferring schema and display the content 
airline_in = (
  spark
    .read
    .option("header", "true")
    .option("InferSchema", "true")
    .csv('/databricks-datasets/airlines/part-00001')
)

display(airline_in)

1987,11,29,73,1239,1224,1430,1423,PI,542,NA10,111,119,NA13,714,15,CLT,BOS,728,NA19,NA20,021,NA22,023,NA24,NA25,NA26,NA27,NA28,YES29,YES30
1987,11,30,1,1300.0,1224,1507.0,1423,PI,542,,127.0,119,,44.0,36.0,CLT,BOS,728,,,0,,0,,,,,,YES,YES
1987,11,1,7,1012.0,1010,1136.0,1134,PI,542,,84.0,84,,2.0,2.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,2,1,1007.0,1010,1134.0,1134,PI,542,,87.0,84,,0.0,-3.0,MCO,CLT,468,,,0,,0,,,,,,NO,NO
1987,11,3,2,1020.0,1010,1144.0,1134,PI,542,,84.0,84,,10.0,10.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,4,3,1008.0,1010,1135.0,1134,PI,542,,87.0,84,,1.0,-2.0,MCO,CLT,468,,,0,,0,,,,,,YES,NO
1987,11,5,4,1012.0,1010,1144.0,1134,PI,542,,92.0,84,,10.0,2.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,6,5,1010.0,1010,1138.0,1134,PI,542,,88.0,84,,4.0,0.0,MCO,CLT,468,,,0,,0,,,,,,YES,NO
1987,11,7,6,1008.0,1010,1136.0,1134,PI,542,,88.0,84,,2.0,-2.0,MCO,CLT,468,,,0,,0,,,,,,YES,NO
1987,11,8,7,1012.0,1010,1139.0,1134,PI,542,,87.0,84,,5.0,2.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,9,1,1011.0,1010,1139.0,1134,PI,542,,88.0,84,,5.0,1.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES


In [0]:
import databricks.koalas as ks

In [0]:
kdf = airline_in.to_koalas()
display(kdf)

1987,11,29,73,1239,1224,1430,1423,PI,542,NA10,111,119,NA13,714,15,CLT,BOS,728,NA19,NA20,021,NA22,023,NA24,NA25,NA26,NA27,NA28,YES29,YES30
1987,11,30,1,1300.0,1224,1507.0,1423,PI,542,,127.0,119,,44.0,36.0,CLT,BOS,728,,,0,,0,,,,,,YES,YES
1987,11,1,7,1012.0,1010,1136.0,1134,PI,542,,84.0,84,,2.0,2.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,2,1,1007.0,1010,1134.0,1134,PI,542,,87.0,84,,0.0,-3.0,MCO,CLT,468,,,0,,0,,,,,,NO,NO
1987,11,3,2,1020.0,1010,1144.0,1134,PI,542,,84.0,84,,10.0,10.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,4,3,1008.0,1010,1135.0,1134,PI,542,,87.0,84,,1.0,-2.0,MCO,CLT,468,,,0,,0,,,,,,YES,NO
1987,11,5,4,1012.0,1010,1144.0,1134,PI,542,,92.0,84,,10.0,2.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,6,5,1010.0,1010,1138.0,1134,PI,542,,88.0,84,,4.0,0.0,MCO,CLT,468,,,0,,0,,,,,,YES,NO
1987,11,7,6,1008.0,1010,1136.0,1134,PI,542,,88.0,84,,2.0,-2.0,MCO,CLT,468,,,0,,0,,,,,,YES,NO
1987,11,8,7,1012.0,1010,1139.0,1134,PI,542,,87.0,84,,5.0,2.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
1987,11,9,1,1011.0,1010,1139.0,1134,PI,542,,88.0,84,,5.0,1.0,MCO,CLT,468,,,0,,0,,,,,,YES,YES
