# Spark Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.


# Importing Libraries


In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col

# Start Spark Session


In [24]:
spark = SparkSession.builder.config(
    "spark.sql.repl.eagerEval.enabled", True
).getOrCreate()

# Read from localhost:9999 by running nc -lk 9999


## Create DataFrame representing the stream of input lines


In [25]:
lines = (
    spark.readStream.format("socket")
    .option("host", "192.168.1.19")
    .option("port", 9999)
    .load()
)

## Split the lines into words


In [26]:
words = lines.select(explode(split(lines.value, " ")).alias("word"))

# Generate running word count


In [27]:
wordCounts = words.groupBy("word").count()

## Key Takeaways

1. This lines DataFrame represents an unbounded table containing the streaming text data.
2. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it.
3. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function alias to name the new column as “word”.
4. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.


## Start running the query that prints the running counts to the console


In [28]:
# query = (
#     wordCounts.writeStream.outputMode("complete")
#     .format("console")
#     .trigger(processingTime="2 seconds")
#     .start()
# )


# query.awaitTermination()

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.


# Working of Spark Structured Streaming

<img src="https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png">

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

<img src="https://spark.apache.org/docs/latest/img/structured-streaming-model.png">


The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

1. Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
2. Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
3. Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.


To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.


<img src = "https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png">


Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.


# Handling Event-time and Late Data

Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.


# Fault Tolerance Semantics

Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.


# Creating streaming DataFrames and streaming Datasets

Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream(). In R, with the read.stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.

Input Sources
There are a few built-in sources.

1. File source - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If latestFirst is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
   Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.

2. Socket source (for testing) - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

3. Rate source (for testing) - Generates data at the specified number of rows per second, each output row contains a timestamp and value. Where timestamp is a Timestamp type containing the time of message dispatch, and value is of Long type containing the message count, starting from 0 as the first row. This source is intended for testing and benchmarking.

Rate Per Micro-Batch source (for testing) - Generates data at the specified number of rows per micro-batch, each output row contains a timestamp and value. Where timestamp is a Timestamp type containing the time of message dispatch, and value is of Long type containing the message count, starting from 0 as the first row. Unlike rate data source, this data source provides a consistent set of input rows per micro-batch regardless of query execution (configuration of trigger, query being lagging, etc.), say, batch 0 will produce 0~999 and batch 1 will produce 1000~1999, and so on. Same applies to the generated time. This source is intended for testing and benchmarking.


# Reading a Stream in Spark

The method `SparkSession.readStream` returns a `DataStreamReader` used to configure the stream.

There are a number of key points to the configuration of a `DataStreamReader`:

- The schema
- The type of stream: Files, Kafka, TCP/IP, etc
- Configuration specific to the type of stream
  - For files, the file type, the path to the files, max files, etc...
  - For TCP/IP the server's address, port number, etc...
  - For Kafka the server's address, port, topics, partitions, etc...


## The Schema

Every streaming DataFrame must have a schema - the definition of column names and data types.

Some sources such as Pub/Sub sources like Kafka and Event Hubs define the schema for you.

For file-based streaming sources, the schema must be user-defined.


### Why must a schema be specified for a streaming DataFrame?

To say that another way...

### Why are streaming DataFrames unable to infer/read a schema?

If you have enough data, you can infer the schema.
<br><br>
If you don't have enough data you run the risk of miss-inferring the schema.
<br><br>
For example, you think you have all integers but the last value contains "1.123" (a float) or "snoopy" (a string).
<br><br>
With a stream, we have to assume we don't have enough data because we are starting with zero records.
<br><br>
And unlike reading from a table or parquet file, there is nowhere from which to "read" the stream's schema.
<br><br>
For this reason, we must specify the schema manually.


In [29]:
dataSchema = "Arrival_Time timestamp, Creation_Time timestamp, Device string, Index integer, Model string, User String, gt string, x double, y double, z double"

### Configuring a File Stream

In our example below, we will be consuming files written continuously to a pre-defined directory.

To control how much data is pulled into Spark at once, we can specify the option `maxFilesPerTrigger`.

In our example below, we will be reading in only one file for every trigger interval:

`.option("maxFilesPerTrigger", 1)`

Both the location and file type are specified with the following call, which itself returns a `DataFrame`:

`.json(dataPath)`

> Supported formats

1. csv(path[, schema, sep, encoding, quote, …]): Loads a CSV file stream and returns the result as a DataFrame.
2. format(source): Specifies the input data source format.
3. json(path[, schema, primitivesAsString, …]): Loads a JSON file stream and returns the results as a DataFrame.
4. load([path, format, schema]): Loads a data stream from a data source and returns it as a DataFrame.
5. option(key, value): Adds an input option for the underlying data source.
6. options(\*\*options): Adds input options for the underlying data source.
7. orc(path[, mergeSchema, pathGlobFilter, …]): Loads a ORC file stream, returning the result as a DataFrame.
8. parquet(path[, mergeSchema, pathGlobFilter, …]): Loads a Parquet file stream, returning the result as a DataFrame.
9. schema(schema): Specifies the input schema.
10. table(tableName): Define a Streaming DataFrame on a Table.
11. text(path[, wholetext, lineSep, …]): Loads a text file stream and returns a DataFrame whose schema starts with a string column named “value”, and followed by partitioned columns if there are any.


In [30]:
initialDF = (
    spark.readStream.option(  # Returns DataStreamReader
        "maxFilesPerTrigger", 1
    )  # Force processing of only 1 file per trigger
    .schema(dataSchema)  # Required for all streaming DataFrames
    .json("./datasets/activity-data")  # The stream's source directory and file type
)

And with the initial `DataFrame`, we can apply some transformations:


In [31]:
streamingDF = initialDF.withColumnRenamed("Index", "User_ID")

### Differentiate b/w a normal and a streaming DataFrame


In [32]:
streamingDF.isStreaming

True

### Unsupported Operations

Most operations on a "streaming" DataFrame are identical to a "static" DataFrame.

There are some exceptions to this.

One such example would be to sort our never-ending stream by `Recorded_At`.


In [33]:
try:
    sortedDF = streamingDF.orderBy(col("Recorded_At").desc())
    display(sortedDF)
except:
    print("Sorting is not supported on an unaggregated stream")

Sorting is not supported on an unaggregated stream


Sorting is one of a handful of operations that is either too complex or logically not possible to do with a stream.

#### Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

1. Limit and take the first N rows are not supported on streaming Datasets.
2. Distinct operations on streaming Datasets are not supported.
3. Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode.

In addition, mapGroupsWithState/flatMapGroupsWithState operation followed by other stateful operation is not supported in Append mode.
A known workaround is to split your streaming query into multiple queries having a single stateful operation per each query, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

1. count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.
2. foreach() - Instead use ds.writeStream.foreach(...) (see next section).
3. show() - Instead use the console sink (see next section).

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

> We will see in the following module how we can sort an **aggregated** stream.


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Writing a Stream</h2>

The method `DataFrame.writeStream` returns a `DataStreamWriter` used to configure the output of the stream.

There are a number of parameters to the `DataStreamWriter` configuration:

- Query's name (optional) - This name must be unique among all the currently active queries in the associated SQLContext.
- Trigger (optional) - Default value is `ProcessingTime(0`) and it will run the query as fast as possible.
- Checkpointing directory (optional for pub/sub sinks)
- Output mode
- Output sink
- Configuration specific to the output sink, such as:
  - The host, port and topic of the receiving Kafka server
  - The file format and final destination of files
  - A <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=foreach#pyspark.sql.streaming.DataStreamWriter.foreach"target="_blank">custom sink</a> via `writeStream.foreach(...)`

Once the configuration is completed, we can trigger the job with a call to `.start()`

### Triggers

The trigger specifies when the system should process the next set of data.

| Trigger Type                           | Example                                       | Notes                                                                                                                                                                                                                                  |
| -------------------------------------- | --------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Unspecified                            |                                               | _DEFAULT_- The query will be executed as soon as the system has completed processing the previous query                                                                                                                                |
| Fixed interval micro-batches           | `.trigger(Trigger.ProcessingTime("6 hours"))` | The query will be executed in micro-batches and kicked off at the user-specified intervals                                                                                                                                             |
| One-time micro-batch                   | `.trigger(Trigger.Once())`                    | The query will execute _only one_ micro-batch to process all the available data and then stop on its own                                                                                                                               |
| Continuous w/fixed checkpoint interval | `.trigger(Trigger.Continuous("1 second"))`    | The query will be executed in a low-latency, <a href="http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing" target = "_blank">continuous processing mode</a>. _EXPERIMENTAL_ in 2.3.2 |

In the example below, you will be using a fixed interval of 3 seconds:

`.trigger(Trigger.ProcessingTime("3 seconds"))`

### Checkpointing

A <b>checkpoint</b> stores the current state of your streaming job to a reliable storage system such as Azure Blob Storage or HDFS. It does not store the state of your streaming job to the local file system of any node in your cluster.

Together with write ahead logs, a terminated stream can be restarted and it will continue from where it left off.

To enable this feature, you only need to specify the location of a checkpoint directory:

`.option("checkpointLocation", checkpointPath)`

Points to consider:

- If you do not have a checkpoint directory, when the streaming job stops, you lose all state around your streaming job and upon restart, you start from scratch.
- For some sinks, you will get an error if you do not specify a checkpoint directory:<br/>
  `analysisException: 'checkpointLocation must be specified either through option("checkpointLocation", ...)..`
- Also note that every streaming job should have its own checkpoint directory: no sharing.

### Output Modes

| Mode         | Example                   | Notes                                                                                                                                      |
| ------------ | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
| **Complete** | `.outputMode("complete")` | The entire updated Result Table is written to the sink. The individual sink implementation decides how to handle writing the entire table. |
| **Append**   | `.outputMode("append")`   | Only the new rows appended to the Result Table since the last trigger are written to the sink.                                             |
| **Update**   | `.outputMode("update")`   | Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. Since Spark 2.1.1                |

In the example below, we are writing to a Parquet directory which only supports the `append` mode:

`dsw.outputMode("append")`

### Output Sinks

`DataStreamWriter.format` accepts the following values, among others:

| Output Sink | Example                                         | Notes                                                                                   |
| ----------- | ----------------------------------------------- | --------------------------------------------------------------------------------------- |
| **File**    | `dsw.format("parquet")`, `dsw.format("csv")`... | Dumps the Result Table to a file. Supports Parquet, json, csv, etc.                     |
| **Kafka**   | `dsw.format("kafka")`                           | Writes the output to one or more topics in Kafka                                        |
| **Console** | `dsw.format("console")`                         | Prints data to the console (useful for debugging)                                       |
| **Memory**  | `dsw.format("memory")`                          | Updates an in-memory table, which can be queried through Spark SQL or the DataFrame API |
| **foreach** | `dsw.foreach(writer: ForeachWriter)`            | This is your "escape hatch", allowing you to write your own type of sink.               |
| **Delta**   | `dsw.format("delta")`                           | A proprietary sink                                                                      |

In the example below, we will be appending files to a Parquet directory and specifying its location with this call:

`.format("parquet").start(outputPathDir)`


# Working file streaming example


In [34]:
outputPathDir = "./streaming/outputdir/streamop.json"
checkpointPath = (
    "./streaming/checkpointdir"  # A subdirectory for our checkpoint & W-A logs
)

streamingQuery = (
    streamingDF.writeStream.queryName(  # Start with our "streaming" DataFrame  # Get the DataStreamWriter
        "stream_1p"
    )  # Name the query
    .trigger(processingTime="1 seconds")  # Configure for a 3-second micro-batch
    .format("json")  # Specify the sink type, a Parquet file
    .option(
        "checkpointLocation", checkpointPath
    )  # Specify the location of checkpoint files & W-A logs
    .outputMode("append")  # Write only new data to the "file"
    .start(outputPathDir)  # Start the job, writing to the specified directory
)

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Managing Streaming Queries</h2>

When a query is started, the `StreamingQuery` object can be used to monitor and manage the query.

| Method               | Description                                                                                   |
| -------------------- | --------------------------------------------------------------------------------------------- |
| `id`                 | get unique identifier of the running query that persists across restarts from checkpoint data |
| `runId`              | get unique id of this run of the query, which will be generated at every start/restart        |
| `name`               | get name of the auto-generated or user-specified name                                         |
| `explain()`          | print detailed explanations of the query                                                      |
| `stop()`             | stop query                                                                                    |
| `awaitTermination()` | block until query is terminated, with stop() or with error                                    |
| `exception`          | exception if query terminated with error                                                      |
| `recentProgress`     | array of most recent progress updates for this query                                          |
| `lastProgress`       | most recent progress update of this streaming query                                           |


In [35]:
streamingQuery.recentProgress

[]

In [36]:
for s in spark.streams.active:  # Iterate over all streams
    print("{}: {}".format(s.id, s.name))  # Print the stream's id and name

6f8d599e-0a71-45c4-94e5-925dcb089c96: stream_1p


The code below stops the `streamingQuery` defined above and introduces `awaitTermination()`

`awaitTermination()` will block the current thread

- Until the stream stops or
- Until the specified timeout elapses


In [37]:
streamingQuery.awaitTermination(
    5
)  # Stream for another 5 seconds while the current thread blocks
streamingQuery.stop()  # Stop the stream

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> The Display function</h2>

Within the Databricks notebooks, we can use the `display()` function to render a live plot

When you pass a "streaming" `DataFrame` to `display()`:

- A "memory" sink is being used
- The output mode is complete
- The query name is specified with the `streamName` parameter
- The trigger is specified with the `trigger` parameter
- The checkpointing location is specified with the `checkpointLocation`

`display(myDF, streamName = "myQuery")`

> We just programmatically stopped our only streaming query in the previous cell. In the cell below, `display` will automatically start our streaming DataFrame, `streamingDF`. We are passing `stream_2p` as the name for this newly started stream.


In [41]:
# myStream = "stream_2p"
# display(streamingDF, streamName=myStream) only for databricks

Using the value passed to `streamName` in the call to `display`, we can programatically access this specific stream:


In [43]:
print("Looking for {}".format("stream_2p"))

for stream in spark.streams.active:  # Loop over all active streams
    if stream.name == "stream_2p":  # Single out "streamWithTimestamp"
        print("Found {} ({})".format(stream.name, stream.id))

Looking for stream_2p


Since the `streamName` get's registered as a temporary table pointing to the memory sink, we can use SQL to query the sink.


In [44]:
spark.catalog.listTables()

[]

Stop all remaining streams.


In [45]:
for s in spark.streams.active:
    s.stop()