## The streaming loop

[ Ref _Learning Spark v2_ book, _Chapter 8_.]

❓ How dows streaming loop work?

<img src="https://raw.githubusercontent.com/Marco-Santoni/databricks-from-scratch/main/training-spark/img/streaming_loop.png" width="500">

How does Spark Streaming work under the hood? Spark SQL starts a background thread that continuously executes the following loop.

1. Based on the configured trigger interval, the thread checks the streaming sources for the availability of **new data**. 
2. If available, the new data is executed by running a **micro-batch**. From the optimized logical plan, an optimized Spark execution plan is generated that reads the new data from the source, incrementally computes the updated result, and writes the output to the sink according to the configured **output mode**.
3. For every micro-batch, the exact range of data processed (e.g., the set of files or the range of Apache Kafka offsets) and any associated state are saved in the configured **checkpoint** location so that the query can deterministically reprocess the exact range if needed.

### End of loop?

❓ How will a streaming job end?

This loop continues until the query is terminated, which can occur for one of the following reasons

- A failure has occurred in the query (either a processing error or a failure in the cluster).
- The query is explicitly stopped using `streamingQuery.stop()`.
- If the trigger is set to `Once`, then the query will stop on its own after executing a single micro-batch containing all the available data.

### Recovering from failures

To restart a terminated query in a completely new process, you have to create a new SparkSession, redefine all the DataFrames, and start the streaming query on the final result using the **same checkpoint** location as the one used when the query was started the first time.

The checkpoint location must be the same across restarts because this directory contains the unique identity of a streaming query and determines the life cycle of the query. If the checkpoint directory is deleted or the same query is started with a different checkpoint directory, it is like starting a new query from scratch. Specifically, checkpoints have record-level information (e.g., Apache Kafka offsets) to track the data range the last incomplete micro-batch was processing. The restarted query will use this information to start processing records precisely after the last successfully completed micro-batch.

### Details on files as data sources

Some remarks on what we've seen so far.

**Reading.** Structured Streaming can treat files written into a directory as a data stream.

- All the files must be of the same format and are expected to have the same schema. Violation of these assumptions can lead to incorrect parsing (e.g., unexpected null values) or query failures.
- The whole file must be available at once for reading, and once it is available, the file cannot be updated or modified. Any changes to that file will not be processed.
- When there are multiple new files to process but it can only pick some of them in the next micro-batch (e.g., because of rate limits), it will select the files with the earliest timestamps. Within the micro-batch, however, there is no predefined order of reading of the selected files; all of them will be read in parallel.

**Writing.** Spark Streaming only supports **append** mode, because while it is easy to write new files in the output directory (i.e., append data to a directory), it is hard to modify existing data files (as would be expected with update and complete modes).

### Streaming state

Each execution is considered as a micro-batch, and the partial intermediate result that is communicated between the executions is called the streaming **state.** Data‐ Frame operations can be broadly classified into stateless and stateful operations based on whether executing the operation incrementally requires maintaining a state.

- **stateless**. Process each input record individually without needing any information from previous rows. Eg `select`, `where`, `withColumn`. They support the append and update output modes, but **not** complete mode.
- **stateful**. Eg `df.groupBy().count()`. In every micro-batch, the incremental plan adds the count of new records to the previous count generated by the previous micro-batch. This partial count communicated between plans is the state. This state is maintained in the memory of the Spark executors and is checkpointed to the configured location in order to tolerate failures.

We'll now focus on the 2 types of stateful aggregations

- aggregations not based on time. 
- aggregations based with event-time windows

**Aggregations not based on time.** Any aggregation that does not involve time. They can be _global_ and aggregate over all records of the stream. Example, consider a streaming DataFrame named `streaming_df`:

```py
streaming_df.groupBy().count()
```

The `count` is applied on all the rows of the DataFrame. _Grouped_ aggregations are instead aggregations applied to groups of the data stream. If our `streaming_df` has a column `sensor_type`, we may count the occurrencies of every record of each value of the column.

```py
streaming_df.groupBy('sensor_type').count()
```

**Aggregations based with event-time windows.** Before starting, let's look again at the example from our previous lesson.

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size,modificationTime
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530,1469673865000
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961,1469673866000
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025,1469673878000
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999,1469673879000
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987,1469673880000
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006,1469673881000
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003,1469673882000
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007,1469673883000
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978,1469673885000
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008,1469673886000


In [0]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

In [0]:
from pyspark.sql.types import StructType, StructField, TimestampType, StringType

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType(
    [
        StructField("time", TimestampType()),
        StructField("action", StringType())
    ]
)

In [0]:
# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


In [0]:
from pyspark.sql.functions import window

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)

In [0]:
display(staticCountsDF.orderBy(staticCountsDF.window.start.asc()))

action,window,count
Close,"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",11
Open,"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",179
Close,"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",344
Open,"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",1001
Open,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",999
Close,"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",815
Close,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1003
Open,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1000
Close,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",1011
Open,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",993


In [0]:
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputPath)
)

streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

streamingCountsDF\
    .writeStream\
    .format("memory")\
    .queryName("counts")\
    .outputMode("complete")\
    .start()

Out[9]: <pyspark.sql.streaming.StreamingQuery at 0x7f5319611910>

❓ What do you think is happening under the hood?

.

.

.

What if events do not arrive in the exact sequential order? What if an event from the beginning of the series arrives in one of the latest files? How should it be handled?

.

.

.

<img src="https://raw.githubusercontent.com/Marco-Santoni/databricks-from-scratch/main/training-spark/img/chapter_8_event_window.png" width="700">


❓ So, what could the problem be here?

.

.

.

From the point of view of resource usage, this poses a different problem: indefinitely **growing state** size. As new groups are created corresponding to the latest time windows, the older groups continue to occupy the state memory, waiting for any late data to update them. Even if in practice there is a bound on how late the input data can be (e.g., data cannot be more than seven days late), the query does not know that information. Hence, it does not know when to consider a window as “too old to receive updates” and drop it from the state.

**Watermarks**.

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/0/07/Water_Level_Measure_on_Mundaring_Weir_Wall.jpg/800px-Water_Level_Measure_on_Mundaring_Weir_Wall.jpg" width="700">

A watermark is defined as a moving threshold in event time that trails behind the maximum event time seen by the query in the processed data. The trailing gap, known as the watermark delay, defines how long the engine will wait for late data to arrive. By knowing the point at which no more data will arrive for a given group, the engine can automatically finalize the aggregates of certain groups and drop them from the state. This limits the total amount of state that the engine has to maintain to compute the results of the query.

Going back to our example, we can add the following command:

```py
.withWatermark("time", "10 minutes")
```

If we know that, by design, our events should not arrive with a delay larger than 10 minutes, we can define a _watermark_ with a size of 10 minutes on the column `time`. By doing so, Spark will keep track of the maximum value of `time` over time. It will subtract 10 minutes to the latest maximum `time` value. Events whose `time` is older than that, will be ignored. How should our query look like?

In [0]:
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputPath)
)

streamingCountsDF = (                 
  streamingInputDF
    # watermark on the same timestamp of the window grouping below
    .withWatermark("time", "10 minutes") # before groupBy!
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

streamingCountsDF\
    .writeStream\
    .format("memory")\
    .queryName("counts")\
    .outputMode("complete")\
    .start()

Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x7f5319603b50>

### Time aggregations: output modes

Unlike streaming aggregations not involving time, aggregations with time windows can use all three output modes. However, there are other implications regarding state cleanup that you need to be aware of, depending on the mode:

- **Update**. In this mode, every micro-batch will output only the rows where the aggregate got updated. This mode can be used with all types of aggregations. Specifically for time window aggregations, watermarking will ensure that the state will get **cleaned** up regularly. This is the most useful and efficient mode to run queries with streaming aggregations. However, you can**not** use this mode to write aggregates to append-only streaming sinks, such as any file-based formats like Parquet
- **Complete**. In this mode, every micro-batch will output all the updated aggregates, irrespective of their age or whether they contain changes. While this mode can be used on all types of aggregations, for time window aggregations, using complete mode means state will **not be cleaned** up even if a watermark is specified. Outputting all aggregates requires all past state, and hence aggregation data must be preserved even if a watermark has been defined. Use this mode on time window aggregations with caution, as this can lead to an indefinite increase in state size and memory usage.
- **Append**. This mode can be used only with aggregations on event-time windows and with **watermarking** enabled. Hence, instead of outputting the updated rows, append mode outputs each key and its final aggregate value only when the watermark ensures that the aggregate is not going to be updated again. The advantage of this mode is that it allows you to write aggregates to append-only streaming sinks (e.g., files). The disadvantage is that the output will be **delayed** by the watermark duration—the query has to wait for the trailing watermark to exceed the time window of a key before its aggregate can be finalized.

## Exercise

Consider again the JSON files under `dbfs:/databricks-datasets/iot-stream/data-device/`. Define a streaming job that

- computes the sum of `num_steps` for time windows of 5 minutes (eg 20th July from 8:00 to 8:05)
- writes the results to a Parquet file in append mode

**Bonus.** Can you edit the `readStream` options so that you can see the total sum of the number increase as if the stream job processes one file at a time?

## Solution

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, TimestampType

filename = 'dbfs:/databricks-datasets/iot-stream/'

device_schema = StructType([
    StructField("id",LongType(),False),
    StructField("user_id",LongType(),True),
    StructField("device_id",LongType(),True),
    StructField("num_steps",LongType(),True),
    StructField("miles_walked",FloatType(),True),
    StructField("calories_burnt",FloatType(),True),
    StructField("timestamp",TimestampType(),True),
    StructField("value",StringType(),True)
])

In [0]:
display(
    spark.read
    .schema(device_schema)
    .json(filename + 'data-device/')
)

id,user_id,device_id,num_steps,miles_walked,calories_burnt,timestamp,value
950000,24,5,5014,2.507,250.7,2018-07-22T06:44:25.732+0000,"{""user_id"": 24, ""calories_burnt"": 250.6999969482422, ""num_steps"": 5014, ""miles_walked"": 2.506999969482422, ""time_stamp"": ""2018-07-22 06:44:25.732267"", ""device_id"": 5}"
950001,24,13,2536,1.268,126.8,2018-07-21T01:18:10.732+0000,"{""user_id"": 24, ""calories_burnt"": 126.80000305175781, ""num_steps"": 2536, ""miles_walked"": 1.2680000066757202, ""time_stamp"": ""2018-07-21 01:18:10.732306"", ""device_id"": 13}"
950002,4,5,7314,3.657,365.7,2018-07-24T12:42:53.732+0000,"{""user_id"": 4, ""calories_burnt"": 365.70001220703125, ""num_steps"": 7314, ""miles_walked"": 3.6570000648498535, ""time_stamp"": ""2018-07-24 12:42:53.732332"", ""device_id"": 5}"
950003,22,10,9796,4.898,489.8,2018-07-23T22:56:23.732+0000,"{""user_id"": 22, ""calories_burnt"": 489.79998779296875, ""num_steps"": 9796, ""miles_walked"": 4.8979997634887695, ""time_stamp"": ""2018-07-23 22:56:23.732358"", ""device_id"": 10}"
950004,34,13,5603,2.8015,280.15,2018-07-21T13:50:39.732+0000,"{""user_id"": 34, ""calories_burnt"": 280.1499938964844, ""num_steps"": 5603, ""miles_walked"": 2.801500082015991, ""time_stamp"": ""2018-07-21 13:50:39.732385"", ""device_id"": 13}"
950005,21,1,11832,5.916,591.6,2018-07-23T11:05:48.732+0000,"{""user_id"": 21, ""calories_burnt"": 591.5999755859375, ""num_steps"": 11832, ""miles_walked"": 5.915999889373779, ""time_stamp"": ""2018-07-23 11:05:48.732412"", ""device_id"": 1}"
950006,7,12,10962,5.481,548.1,2018-07-23T02:10:39.732+0000,"{""user_id"": 7, ""calories_burnt"": 548.0999755859375, ""num_steps"": 10962, ""miles_walked"": 5.480999946594238, ""time_stamp"": ""2018-07-23 02:10:39.732438"", ""device_id"": 12}"
950007,33,4,5448,2.724,272.4,2018-07-22T17:42:18.732+0000,"{""user_id"": 33, ""calories_burnt"": 272.3999938964844, ""num_steps"": 5448, ""miles_walked"": 2.7239999771118164, ""time_stamp"": ""2018-07-22 17:42:18.732465"", ""device_id"": 4}"
950008,16,5,7637,3.8185,381.85,2018-07-21T12:21:21.732+0000,"{""user_id"": 16, ""calories_burnt"": 381.8500061035156, ""num_steps"": 7637, ""miles_walked"": 3.81850004196167, ""time_stamp"": ""2018-07-21 12:21:21.732491"", ""device_id"": 5}"
950009,27,8,11709,5.8545,585.44995,2018-07-22T12:40:55.732+0000,"{""user_id"": 27, ""calories_burnt"": 585.449951171875, ""num_steps"": 11709, ""miles_walked"": 5.854499816894531, ""time_stamp"": ""2018-07-22 12:40:55.732517"", ""device_id"": 8}"


In [0]:
%fs ls /output/streaming-steps

In [0]:
dbutils.fs.rm("/output/streaming-steps/hourly.parquet", recurse=True)

Out[56]: True

In [0]:
%fs ls /checkpoints/

path,name,size,modificationTime
dbfs:/checkpoints/live-exercise/,live-exercise/,0,0
dbfs:/checkpoints/streaming-exercise-1/,streaming-exercise-1/,0,0
dbfs:/checkpoints/streaming-exercise-2/,streaming-exercise-2/,0,0


In [0]:
dbutils.fs.rm("/checkpoints/streaming-exercise-3/", recurse=True)

Out[57]: True

In [0]:
%fs ls dbfs:/databricks-datasets/iot-stream/data-device

path,name,size,modificationTime
dbfs:/databricks-datasets/iot-stream/data-device/part-00000.json.gz,part-00000.json.gz,2610922,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00001.json.gz,part-00001.json.gz,2612478,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00002.json.gz,part-00002.json.gz,2619023,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00003.json.gz,part-00003.json.gz,2620016,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00004.json.gz,part-00004.json.gz,2618699,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00005.json.gz,part-00005.json.gz,2619772,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00006.json.gz,part-00006.json.gz,2619027,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00007.json.gz,part-00007.json.gz,2619832,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00008.json.gz,part-00008.json.gz,2617893,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00009.json.gz,part-00009.json.gz,2619764,1532465738000


In [0]:
from pyspark.sql.functions import col

device_df = spark.readStream\
    .schema(device_schema)\
    .json(filename + 'data-device/')\
    .withWatermark("timestamp", "10 minutes")\
    .groupBy(
        window(col("timestamp"), "5 minutes")
    )\
    .agg({'num_steps': 'sum'})

In [0]:
device_df.writeStream\
    .format("parquet")\
    .outputMode("append")\
    .option("checkpointLocation", "/checkpoints/streaming-exercise-3/")\
    .option("path", "/output/streaming-steps/hourly.parquet")\
    .start()

Out[60]: <pyspark.sql.streaming.StreamingQuery at 0x7f53184d3910>

In [0]:
display(
    spark.read.parquet('/output/streaming-steps/hourly.parquet').count()
)

1450