# ***Spark Structured Streaming***

Structured Streaming is a scalable and fault-tolerant stream processing engine that is built on the Spark SQL engine

- Input data are represented by means of (streaming) DataFrames
- Structured Streaming uses the existing Spark SQL APIs to query data streams
- The same methods we used for analyzing “static” DataFrames
    - A set of specific methods that are used to define
    - Input and output streams
    - Windows
    
Each input data stream is modeled as a table that is being continuously appended. The expressed queries are incremental queries that are run incrementally on the unbounded input tables. We consider all the data.

 - The arrive of new data triggers the execution of the incremental queries
 - The result of a query at a specific timestamp is the one obtained by running the query on all the data arrived until that timestamp
 

The queries can be executed

- As micro-batch queries with a fixed batch interval
    
    - Standard behavior
    
    - Exactly-once fault-tolerance guarantees
- As continuous queries
    
    - Experimental
    
    - At-least-once fault-tolerance guarantees
    
    
### **Input Sources**

- File source
    
    - Reads files written in a directory as a stream of data
    
    - Each line of the input file is an input record
    
    - Supported file formats are text, csv, json, orc, parquet, ..

- Kafka source
    
    - Reads data from Kafka
    
    - Each Kafka message is one input record
    
    
The **readStream** property of the SparkSession class is used to create DataStreamReaders
- The methods format() and option() of the DataStreamReader class are used to specify the input streams
- Type, location, ...
- The method load() of the DataStreamReader class is used to return DataFrames associated with the input data stream 

In [None]:
# In this case I have a DataFrame which 
# has one record for each input stream
recordsDF = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

### **Transformations**
Transformations applied on DataFrames can also be applied to DF o streams. However, there are restrictions on some types of queries/transformations that cannot be executed incrementally.

Unsupported operations:

- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DataFrame)

- Limit and take first N rows

- Distinct operations

- Sorting operations are supported on streaming DataFrames only after an aggregation and in complete output mode

- Few types of outer joins on streaming DataFrames are not supported


## **Ouputs**

**Sinks**
- They are instances of the class DataStreamWriter and are used to specify the external destinations and store the results in the external destinations

**File sink**

- Stores the output to a directory

- Supported file formats are text, csv, json, orc,
parquet, ..

**Kafka sink**

- Stores the output to one or more topics in Kafka

**Foreach sink**

- Runs arbitrary computation on the output records

**Console sink (for debugging purposes)**
- Prints the computed output to the console every time a new batch of records has been analyzed


We must define how we want Spark to write output data in the external destinations

Supported output modes:

- Append

- Complete

- Update

The supported output modes depend on the 
query type

### Append mode

Default mode. Only the new rows added to the computed result since the last trigger (computation) will be outputted. Queries with only select, filter, map, flatMap, filter, join, etc. support append mode. So only  on operations that do not need to consider previous data to make a decision.

**N.B.**: if you don't use this approach, you need to store the entire result.

### Complete mode
The whole computed result will be outputted to the sink after every trigger (computation). This mode is supported for aggregation queries. 

### Update mode
Only the rows in the computed result that were updated since the last trigger (computation) will be outputted.

## **Output write**
The writeStream property of the SparkSession class is used to create DataStreamWriters

- The methods outputMode(), format() and option() of the DataStreamWriter class are used to specify the output destination.

- Data format, location, output mode, etc.

In [None]:
streamWriterRes = stationIdTimestampDF \
.writeStream \
.outputMode("append") \
.format("console")

To start executing the defined queries/structured streaming applications you must explicitly invoke the **start()** action on the defined sinks.

You can start several queries in the same application. Structured streaming queries run forever. You must explicitly stop/kill them.


## **Triggers**

For each Spark structured streaming query we can specify when new input data must be processed and whether the query is going to be executed:

- as a micro-batch query with a fixed batch interval

- or as a continuous processing query (experimental)

The trigger type for each query is specified by means of the **trigger()** method of the DataStreamWriter class.

If **no trigger** type is explicitly specified, system will try to give an answer every time it has enough data:

- Default trigger setting

- The query will be executed in micro-batch mode

- Each micro-batch is generated and processed as soon as the previous micro-batch has been processed



**Fixed interval micro-batches**:

- The query will be executed in micro-batch mode

- Micro-batches will be processed at the user- specified intervals
    
    - The parameter processingTime of the trigger method() is used to specify the micro-batch size.
    
    - If the previous micro-batch completes within its interval, then the engine will wait until the interval is over before processing the next micro-batch.
    
    - if the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary is missed), then the next micro-batch will start as soon as the previous one completes
    

**One-time micro-batch**:

- The query will be executed in micro-batch mode

- But the query will be executed only one time on one single micro-batch containing all the available data of the input stream:
    - After the single execution the query stops on its own

- This trigger type is useful when you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster.


## **Example 1**

Problem specification

Input

- A stream of records retrieved from localhost:9999

- Each input record is a reading about the status of a station of a bike sharing system in a specific timestamp

- Each input reading has the format

- stationId,# free slots,#used slots,timestamp

Output

- For each input reading with a number of free slots equal to 0 print on the standard output the value of stationId and timestamp

- Use the standard micro-batch processing mode

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import split

# Create a "receiver" DataFrame that will connect to localhost:9999
# INPUT IS ONE SINGLE STREAM, ONE SINGLE COLUMN!
recordsDF = spark.readStream\
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

In [None]:
# The input records are characterized by one single column called value
# of type string
# Example of an input record: s1,0,3,2016-03-11 09:00:04
# Define four more columns by splitting the input column value
# New columns:
# - stationId
# - freeslots
# - usedslots
# - timestamp

readingsDF = recordsDF\
.withColumn("stationId", split(recordsDF.value, ',')[0].cast("string"))\
.withColumn("freeslots", split(recordsDF.value, ',')[1].cast("integer"))\
.withColumn("usedslots", split(recordsDF.value, ',')[2].cast("integer"))\
.withColumn("timestamp", split(recordsDF.value, ',')[3].cast("timestamp"))

In [None]:
# Filter data
# Use the standard filter transformation
fullReadingsDF = readingsDF.filter("freeslots=0")

# Select stationid and timestamp
# Use the standard select transformation
stationIdTimestampDF = fullReadingsDF.select("stationId", "timestamp")

In [None]:
# The result of the structured streaming query will be stored/printed on
# the console "sink“.
# append output mode
queryFilterStreamWriter = stationIdTimestampDF \
.writeStream \
.outputMode("append") \
.format("console")

In [None]:
# Start the execution of the query (it will be executed until it is explicitly stopped)
queryFilter = queryFilterStreamWriter.start()

### Example 2

Input

- A stream of records retrieved from localhost:9999

- Each input record is a reading about the status of a station of

- bike sharing system in a specific timestamp

- Each input reading has the format:
                     [ stationId, #free_slots, #used_slots, timestamp]

Output

- For each stationId, print on the standard output the total number of received input readings with a number of free slots equal to 0

- Print the requested information when new data are received by using the standard micro-batch processing mode

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import split

# Create a "receiver" DataFrame that will connect to localhost:9999
recordsDF = spark.readStream\
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

In [None]:
# The input records are characterized by one single column called value
# of type string
# Example of an input record: s1,0,3,2016-03-11 09:00:04
# Define four more columns by splitting the input column value
# New columns:
# - stationId
# - freeslots
# - usedslots
# - timestamp
readingsDF = recordsDF\
.withColumn("stationId", split(recordsDF.value, ',')[0].cast("string"))\
.withColumn("freeslots", split(recordsDF.value, ',')[1].cast("integer"))\
.withColumn("usedslots", split(recordsDF.value, ',')[2].cast("integer"))\
.withColumn("timestamp", split(recordsDF.value, ',')[3].cast("timestamp"))

In [None]:
# Filter data
# Use the standard filter transformation
fullReadingsDF = readingsDF.filter("freeslots=0")

# Count the number of readings with a number of free slots equal to 0
# for each stationId
# The standard groupBy method is used
countsDF = fullReadingsDF\
.groupBy("stationId")\
.agg({"*":"count"})

In [None]:
# The result of the structured streaming query will be stored/printed on
# the console "sink"
# complete output mode
# (append mode cannot be used for aggregation queries)
queryCountStreamWriter = countsDF \
.writeStream \
.outputMode("complete") \
.format("console")

# Start the execution of the query (it will be executed until it is explicitly stopped)
queryCount = queryCountStreamWriter.start()