# Structured Streaming Basics

Structured Streaming is a stream processing framework built on the Spark SQL engine. Rather than introducing a separate API, Structured Streaming uses the existing structured APIs in Spark (DataFrames, Datasets, and SQL), meaning that all the operations you are familiar with there are supported. Users express a streaming computation in the same way they’d write a batch computation on static data. Upon specifying this, and specifying a streaming destination, the Structured Streaming engine will take care of running your query incrementally and continuously as new data arrives into the system. These logical instructions for the computation are then executed using the same Catalyst engine, including query optimization, code generation, etc. Beyond the core structured processing engine, Structured Streaming includes a number of features specifically for streaming. For instance, Structured Streaming ensures end-to-end, exactly-once processing as well as fault-tolerance through checkpointing and write-ahead logs.

The main idea behind Structured Streaming is to treat a stream of data as a table to which data is continuously appended. The job then periodically checks for new input data, process it, updates some internal state located in a state store if needed, and updates its result. A cornerstone of the API is that you should not have to change your query’s code when doing batch or stream processing—you should have to specify only whether to run that query in a batch or streaming fashion. Internally, Structured Streaming will automatically figure out how to “incrementalize” your query, i.e., update its result efficiently whenever new data arrives, and will run it in a fault-tolerant fashion.

<img src="https://github.com/soltaniehha/Big-Data-Analytics-for-Business/blob/master/figs/13-02-streaming.png?raw=true" width="800" align="center"/>

In simplest terms, Structured Streaming is “your DataFrame, but streaming.” This makes it very easy to get started using streaming applications. You probably already have the code for them! There are some limits to the types of queries Structured Streaming will be able to run, however, as well as some new concepts you have to think about that are specific to streaming, such as event-time and out-of-order data.

You can use Structured Streaming to continuously update a table that users query interactively with Spark SQL, serve a machine learning model trained by MLlib, or join streams with offline data in any of Spark’s data sources—applications that would be much more complex to build using a mix of different tools.

## Core Concepts

Now that we introduced the high-level idea, let’s cover some of the important concepts in a Structured Streaming job. One thing you will hopefully find is that there aren’t many. That’s because Structured Streaming is designed to be simple. Read some other big data streaming books and you’ll notice that they begin by introducing terminology like distributed stream processing topologies for skewed data reducers (a caricature, but accurate) and other complex verbiage. Spark’s goal is to handle these concerns automatically and give users a simple way to run any Spark computation on a stream.

### Transformations and Actions

Structured Streaming maintains the same concept of transformations and actions that we have seen throughout this book. The transformations available in Structured Streaming are, with a few restrictions, the exact same transformations that we saw before. The restrictions usually involve some types of queries that the engine cannot incrementalize yet, although some of the limitations are being lifted in new versions of Spark. There is generally only one action available in Structured Streaming: that of starting a stream, which will then run continuously and output results.

### Input Sources

Structured Streaming supports several input sources for reading in a streaming fashion. As of Spark 3.1, the supported input sources are as follows:

* Apache Kafka

* Flume

* Files on a distributed file system like HDFS, Google Cloud Storage, AWS S3 (Spark will continuously read new files in a directory)

* Kinesis

* Twitter

* A socket source for testing

<img src="https://github.com/soltaniehha/Big-Data-Analytics-for-Business/blob/master/figs/streaming-input-source.png?raw=true" width="800" align="center"/>

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

<img src="https://github.com/soltaniehha/Big-Data-Analytics-for-Business/blob/master/figs/streaming-batches.png?raw=true" width="800" align="center"/>


### Sinks

Just as sources allow you to get data into Structured Streaming, sinks specify the destination for the result set of that stream. Sinks and the execution engine are also responsible for reliably tracking the exact progress of data processing. Here are the supported output sinks as of Spark 3.1:

* Apache Kafka

* Almost any file format

* A foreach sink for running arbitary computation on the output records

* A console sink for testing

* A memory sink for debugging

### Output Modes

Defining a sink for our Structured Streaming job is only half of the story. We also need to define how we want Spark to write data to that sink. For instance, do we only want to append new information? Do we want to update rows as we receive more information about them over time (e.g., updating the click count for a given web page)? Do we want to completely overwrite the result set every single time (i.e. always write a file with the complete click counts for all pages)? To do this, we define an output mode, similar to how we define output modes in the static Structured APIs.

The supported output modes are as follows:

* Append (only add new records to the output sink)

* Update (update changed records in place)

* Complete (rewrite the full output)

One important detail is that certain queries, and certain sinks, only support certain output modes. For example, suppose that your job is just performing a map on a stream. The output data will grow indefinitely as new records arrive, so it would not make sense to use Complete mode, which requires writing all the data to a new file at once. In contrast, if you are doing an aggregation into a limited number of keys, Complete and Update modes would make sense, but Append would not, because the values of some keys need to be updated over time.

### Triggers

Whereas output modes define how data is output, triggers define when data is output—that is, when Structured Streaming should check for new input data and update its result. By default, Structured Streaming will look for new input records as soon as it has finished processing the last group of input data, giving the lowest latency possible for new results. However, this behavior can lead to writing many small output files when the sink is a set of files. Thus, Spark also supports triggers based on processing time (only look for new data at a fixed interval). In the future, other types of triggers may also be supported.

## Structured Streaming in Action

Let’s get to an applied example of how you might use Structured Streaming. For our examples, we’re going to be working with the Heterogeneity Human Activity Recognition Dataset. The data consists of smartphone and smartwatch sensor readings from a variety of devices—specifically, the accelerometer and gyroscope, sampled at the highest possible frequency supported by the devices. Readings from these sensors were recorded while users performed activities like biking, sitting, standing, walking, and so on. There are several different smartphones and smartwatches used, and nine total users.

Let’s read in the **static** version of the dataset as a DataFrame:

In [1]:
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
data = "gs://is843-public/data/"

In [2]:
static = spark.read.json(data + "activity-data/")
static.printSchema()
static.show(2)
print("There are {} rows.".format(static.count()))

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
+-------------+-------------------+--------+-----+------+----+-----+------------+---------

You can see in the preceding example, which includes a number of timestamp columns, models, user, and device information. The `gt` field specifies what activity the user was doing at that time.

Next, let’s create a streaming version of the same Dataset, which will read each input file in the dataset one by one as if it was a stream.

Streaming DataFrames are largely the same as static DataFrames. We create them within Spark applications and then perform transformations on them to get our data into the correct format. Basically, all of the transformations that are available in the static Structured APIs apply to Streaming DataFrames. However, one small difference is that Structured Streaming does not let you perform schema inference without explicitly enabling it. You can enable schema inference for this by setting the configuration spark.sql.streaming.schemaInference to true. Given that fact, we will read the schema from one file (that we know has a valid schema) and pass the dataSchema object from our static DataFrame to our streaming DataFrame. As mentioned, you should avoid doing this in a production scenario where your data may (accidentally) change out from under you:

In [3]:
dataSchema = static.schema

In [4]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json(data + "activity-data/")

`maxFilesPerTrigger` essentially it allows you to control how quickly Spark will read all of the files in the folder. By specifying this value lower, we’re artificially limiting the flow of the stream to one file per trigger. This helps us demonstrate how Structured Streaming runs incrementally in our example, but probably isn’t something you’d use in production.

Just like with other Spark APIs, streaming DataFrame creation and execution is lazy. In particular, we can now specify transformations on our streaming DataFrame before finally calling an action to start the stream. In this case, we’ll show one simple transformation—we will group and count data by the `gt` column, which is the activity being performed by the user at that point in time:

In [5]:
activityCounts = streaming.groupBy("gt").count()

Because this code is being written in a small cluster, we are going to set the shuffle partitions to a small value to avoid creating too many shuffle partitions:

In [6]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

Now that we set up our transformation, we need only to specify our action to start the query. We will specify an output destination, or output sink for our result of this query. For this basic example, we are going to write to a memory sink which keeps an in-memory table of the results.

In the process of specifying this sink, we’re going to need to define how Spark will output that data. In this example, we use the complete output mode. This mode rewrites all of the keys along with their counts after every trigger:

In [7]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

We are now writing out our stream! You’ll notice that we set a unique query name to represent this stream, in this case activity_counts. We specified our format as an in-memory table and we set the output mode.

When we run the preceding code, we also want to include the following line:

```python
activityQuery.awaitTermination()
```

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we must specify that we would like to wait for the termination of the query using activityQuery.awaitTermination() to prevent the driver process from exiting while the query is active. We will omit this for now, but it must be included in your production applications; otherwise, your stream won’t be able to run.

Spark lists this stream, and other active ones, under the active streams in our SparkSession. We can see a list of those streams by running the following:

In [8]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7ffb4ace0490>]

Now that this stream is running, we can experiment with the results by querying the in-memory table it is maintaining of the current output of our streaming aggregation. This table will be called activity_counts, the same as the stream. To see the current data in this output table, we simply need to query it! We’ll do this in a simple loop that will print the results of the streaming query every second:

In [11]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(2)

+----------+------+
|        gt| count|
+----------+------+
|       sit| 98471|
|     stand| 91079|
|stairsdown| 74922|
|      walk|106048|
|  stairsup| 83614|
|      null| 83584|
|      bike| 86377|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|110778|
|     stand|102464|
|stairsdown| 84286|
|      walk|119304|
|  stairsup| 94067|
|      null| 94033|
|      bike| 97175|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|147699|
|     stand|136619|
|stairsdown|112371|
|      walk|159072|
|  stairsup|125431|
|      null|125379|
|      bike|129572|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|172313|
|     stand|159391|
|stairsdown|131094|
|      walk|185583|
|  stairsup|146344|
|      null|146274|
|      bike|151168|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|196927|
|     stand|182165|
|stairsdown|1498

With this simple example, the power of Structured Streaming should become clear. You can take the same operations that you use in batch and run them on a stream of data with very few code changes (essentially just specifying that it’s a stream).