# Writing Continuous Applications with Structured Streaming Python APIs in Apache Spark

Tutorial for <img src="https://databricks.com/wp-content/uploads/2018/12/pydata-logo-4.png" alt="" width="6%"/>  Miami 


At first glance, building a distributed streaming engine might seem as simple as launching a set of servers and pushing data between them. Unfortunately, distributed stream processing runs into multiple complications that don’t affect simpler computations like batch jobs. Fortunately, PySpark 2.4 and Databricks makes this simple!

**Orignal Author**: Michael John

Modified and Ported to PySpark by Jules S. Damji for the Tutorial

-sandbox
<div style="line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/12/PySparkStructuredStreaming_FINAL.jpg" alt="Structrured Streaming" width="40%" style=>
</div>

## Setup Data

In [4]:
%run "./setup/setup_data"

### PySpark Documentation: [Click here](https://spark.apache.org/docs/latest/api/python/index.html)
 * [DataFrame](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame)
 * [Spark SQL](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types)
 * [Spark SQL Functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)

# Processing Streaming Sensor Data

Structured Streaming is a powerful capability for building end-to-end continuous applications. At a high-level, it offers the following features:

1. __Output tables are always consistent__ with all the records in a prefix (partition) of the data, we will process and count in order.
1. __Fault tolerance__ is handled holistically by Structured Streaming, including in interactions with output sinks.
1. Ability to handle __late and out-of-order event-time data__. 

<img src="https://demo.cloud.databricks.com/files/mjohns/streaming/continuous-apps-1024x366.png" alt="" width="40%"/>

<sub>reference [Structured Streaming Blog](https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html)</sub>

## 1-Sources

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

<img src="https://demo.cloud.databricks.com/files/mjohns/streaming/cloudtrail-unbounded-tables.png" alt="" width="40%"/>

## 2-Continuous Processing & Queries

The developer then defines a query on this source, or input table, _as if it were a static table_ to compute a final result table that will be written to an output sink. Spark automatically converts this batch-like query to a streaming execution plan. This is called incrementalization: Spark figures out what state needs to be maintained to update the result each time a record arrives. Finally, developers specify triggers to control when to update the results. Each time a trigger fires, Spark checks for new data (new row in the input table), and incrementally updates the result.

<img src="https://demo.cloud.databricks.com/files/mjohns/streaming/cloudtrail-structured-streaming-model.png" alt="" width="40%"/>

## 3-Sinks

The last part of the model is output modes. Each time the result table is updated, the developer wants to write the changes to an external system, such as S3, HDFS, or a database. We usually want to write output incrementally. For this purpose, Structured Streaming provides three output modes:

* __Append__: Only the new rows appended to the result table since the last trigger will be written to the external storage. 
* __Complete__: The entire updated result table will be written to external storage, e.g. for aggregates.
* __Update__: Only the rows that were updated in the result table since the last trigger will be changed in the external storage. 

<img src="https://demo.cloud.databricks.com/files/mjohns/streaming/stream-example1-phone-updated.png" width="40%"/>

## A Continuous Application Example using PySpark Structured Streaming APIs

Setup some file paths our S3 bucket for output, checkpoint, and bad records

In [12]:
output_path = "/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/"
checkpoint_path = "/tmp/pydata/Streaming/continuous_streaming/out/iot-stream-checkpoint"
#
#create checkpoint  path
#
dbutils.fs.rm(checkpoint_path,True) #overwrite checkpoint
dbutils.fs.mkdirs(checkpoint_path)
#
#
bad_records_path = "/tmp/pydata/Streaming/continuous_streaming/badRecordsPath/streaming-sensor/"
dbutils.fs.rm(bad_records_path, True) #empty dir
dbutils.fs.mkdirs(bad_records_path)

### What does the data from the sensors look like?

In [14]:
sensor_path = "/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/"
sensor_file_name= sensor_path + "streaming-sensor_file-1.json"
dbutils.fs.head(sensor_file_name, 233)

### Define schemas for incoming stream and outgoing stream

Good best practice to define a schema and not have Spark infer it, for performance reasons. 
Without a schema, Spark will launch couple of jobs: one to read header, and another to read
a good chuck of the partition to validate the schema, ensuring it matches the data.

Now there are options that you can specifiy so that it fails fast, is tolerant, or subsitute missing
 values or mismatch dataytpes with NaN or null.

In [16]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#original input schema
jsonSchema = (
  StructType()
  .add("timestamp", TimestampType()) #event time at the source
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
)
# modified schema with added columns since we are 
# doing some ETL (transforming and adding extra columns)
# this transformed data will be stored into parquet files
# from which an SQL table can be created for consumption or
# report generation
parquetSchema = (
  StructType()
  .add("timestamp", TimestampType()) #event time at the source
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
  .add("INPUT_FILE_NAME", StringType()) #file name from which this data item was read
  .add("PROCESSED_TIME", TimestampType())) #time at the executor while processing

### Read Stream from Object Store Source

In this case we're simulating a Kafka live stream by reading in a file at a time. But this could as well be Apache Kafka topics

__Notice: Intentionally slowing this down for tutorial.__

In [18]:
inputDF = ( spark 
          .readStream 
          .schema(jsonSchema) 
          .option("maxFilesPerTrigger", 1)  #slow it down for tutorial
          .option("badRecordsPath", bad_records_path) #any bad records will go here
          .json(sensor_path) #the source
          .withColumn("INPUT_FILE_NAME", input_file_name()) #maintain file path
          .withColumn("PROCESSED_TIME", current_timestamp()) #add a processing timestamp at the time of processing
          .withWatermark("PROCESSED_TIME", "1 minute") #optional: window for out of order data
         )

### Write Stream to Parquet File Sink

In [20]:
query = (inputDF
         .writeStream
         .format("parquet") #our sink to save it for posterity or batch queries if needed
         .option("path", output_path)
         .option("checkpointLocation", checkpoint_path) # add checkpointing for resiliency
         .outputMode("append")
         .queryName("devices") #optionally a query name over write to issue queries against
         .trigger(processingTime='5 seconds')
         .start() 
        )

In [21]:
%fs ls /tmp/pydata/Streaming/continuous_streaming/out/iot-stream/

#### Create a temporary table from the input stream so that you can quickly issue SQL queries against it.

In [23]:
inputDF.createOrReplaceTempView("parquet_sensors")

### Run queries againt the temp table created off the input stream

In [25]:
%sql select * from parquet_sensors where deviceType = 'SensorTypeD' or deviceType = 'SensorTypeA'

(Click Run All Above Here)

(Then Individually run below)

### Run queries and additional processing against Parquet file stored from the input stream

In [28]:
spark.conf.set("spark.sql.shuffle.partitions", "1") #keep the size of shuffles small for better query performance
devices = (spark.readStream
           .schema(parquetSchema)
           .format("parquet")
           .option("maxFilesPerTrigger", 1) #slow it down to demo
           .load(output_path)
           .withWatermark("PROCESSED_TIME", "1 minute") #window for out of order data
          )

 # generate temp table for more complex aggregation queries
devices.createOrReplaceTempView("sensors")

#### What files are being processed?

In [30]:
display(
  devices.
  select("INPUT_FILE_NAME", "PROCESSED_TIME")
  .groupBy("INPUT_FILE_NAME", "PROCESSED_TIME")
  .count()
  .orderBy("PROCESSED_TIME", ascending=False)
)

#### How much data is coming through?

In [32]:
%sql select count(*) from sensors

#### What does the min, max, and average strength look like for each sensor type?

Use Spark SQL functions min(), max(), and avg()

Note: I can use SQL within my Python notebook via `%sql` magic command

In [34]:
%sql 

select count(*), deviceType, min(signalStrength), max(signalStrength), avg(signalStrength) 
  from sensors 
    group by deviceType 
    order by deviceType asc

#### Let's create a stream to aggregate signal counts by device and timestamp over 5 second windows.

Note: this is tumbling window, not a sliding window; its size is 5 seconds

For example, discounting day, hours, and min, a tumbling window of size 5 looks as follows:

*[(00:00 - 00:05), (00:05: 00:10), (00:10: 00:15)]*

An event could fall into any of these tumbling windows.

In [36]:
(devices
 .groupBy(
   window("timestamp", "5 seconds"),
   "deviceId"
 )
 .count()
 .createOrReplaceTempView("sensor_counts")) #create a temporary view atop DataFrame

#### Which devices are experiencing signal loss over those 5 second windows?

In [38]:
%sql select * from sensor_counts where count < 5 order by window.start desc

In [39]:
// --- SHOW DASHBOARD ---

### Send Alerts for potentially down sensors that have not sent signals

Let's create a DataFrame from our temporary table `sensor_counts`

In [41]:
lost_sensor_signals = (spark.table("sensor_counts")
         .filter(col("count") < 5)
         .select("window.start", "window.end", "deviceId", "count")
         )

#display our DataFrame
display(lost_sensor_signals)

##### Using `foreach` mechanism to write to workers' logs.

This could be used for monitoring purposes. Either another job scanning the logs for ALERTS
and publishing them onto a topic on Kafka or posting them on Ganglia. A good excerice to try
if you have a Kafka cluster aviable or Ganglia service available via a REST API.

In [43]:
def processRow(row):
  # for now write them to log files, but this logic can easily be extended to publishing alerts
  # to a topic on Kafka or monitoring/paging service such as Ganglia or PagerDuty.
  print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
  
(lost_sensor_signals
 .writeStream
 .outputMode("complete") #could be our Kafka topic "alerts" for monitoring
 .foreach(processRow)
 .start()
)

_Here is an example of the results being written out._

<img src="https://databricks.com/wp-content/uploads/2018/12/Screen-Shot-2018-12-18-at-5.53.39-PM.png" alt="" width="50%"/>

##### CHALLENGE-1: 

Try using`foreachBatch` to write each micro-batch using standard DataFrame API.

__See [docs](https://docs.databricks.com/spark/latest/structured-streaming/foreach.html).__

##### CHALLENGE-2: 

Try adding adding additional monitoring conditions on devices. For example, add another streaming query that
filters monitoring conditions (signal strength < 0.2) from any of the above streams or tables: sensor_counts, devices, or inputStream, or parquet files.

__See [docs](https://docs.databricks.com/spark/latest/structured-streaming/foreach.html).__

## Cleanup Data

In [48]:
%run "./setup/cleanup_data"