# Chapter 8. Structured Streaming

In [None]:
from uuid import uuid1

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (SparkSession.builder
  # Add Kafka-source library.  The version after ":" must be the Kafka version that you usew
  .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")
  .master("local[4]")
  .appName("StructuredStreaming")
  .getOrCreate())
spark

## The Fundamentals of a Structured Streaming Query

For the following streaming query to work, we need a TCP server that will listen at `127.0.0.1:61080` and will be sending text lines.

We can use `netcat-openbsd` for this. In a terminal run `nc -lk -s 127.0.0.1 -p 61080` and start typing text lines. Observe the output in this notebook. It should be something like this

```
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
| foo|    1|
+----+-----+
```

To terminate the query interrupt the Jupyter kernel (menu Krenel -> Interrupt Kernel)

In [None]:
# Random checkpoint dirname. Ust it if you want every query to start anew.
checkpoint_dir = f"/tmp/spark-streaming-checkpoints-{uuid1()}"

# Static checkpoint dirname. Use it if you want to restart a stopped query.
# checkpoint_dir = f"./spark-streaming-checkpoints"

# Step 1: Define input sources 
lines = (spark
         .readStream
         .format("socket")
         .option("host", "127.0.0.1")
         .option("port", "61080")
         .load())
# Step 2: Transform data
words = lines.select(F.explode(F.split(F.col("value"), "\\s")).alias("word"))
counts = words.groupBy("word").count()
# Step 3: Define output sink and output mode
writer = (counts
         .writeStream
         .format("console")
         .outputMode("complete"))
# Step 4: Specify processing details
writer2 = (writer
           .trigger(processingTime="1 second")
           .option("checkpointLocation", checkpoint_dir))
# Step 5: Start the query
streaming_query = writer2.start()
# The following line will block for 60 seconds and the console output will be echoed in this notebook
# in the cell output. You can unblock earlier by interrupting the Jupyter kernel (menu Krenel -> Interrupt Kernel)
streaming_query.awaitTermination(60)

In [None]:
# The streaming query is still running. You can still observe the console output
# in the terminal in which you started Jupyter.
streaming_query.status

In [None]:
streaming_query.stop()
streaming_query.status

Now the query is stopped.

If you used a static checkpoint dirname, you can restart the query from the point where it left off. To restart the query, reexecute the cell that creates and starts the streaming query (with steps 1 to 5). You may get "ERROR MicroBatchExecution" with IndexOutOfBoundsException. In this case rerun the cell one more time.

**NOTE:** If you use a static checkpoint dirname and you stopped and restart netcat inbetween, your restarted query may stop accepting input from netcat. In this case you may need a complete reset: stop the query, remove the checkpoint directory manually, restart netcat, and finaly restart the query.

In [None]:
streaming_query.lastProgress

## Streaming Data Sources and Sinks

### Reading from Files

In [None]:
input_directory_of_json_files = "../data/streaming_json"
file_schema_read_json = "`key` integer, `value` string"

df_read_json = (spark
           .readStream
           .format("json")
           .schema(file_schema_read_json)
           .load(input_directory_of_json_files))

After starting the query in the next cell you will see the data from the file `00.json` in the cell output. Create a new file by copying `00.json` to `1.json`:
```shell
cp data/streaming_json/00.json data/streaming_json/1.json
```
and you will see the same data output again.

In [None]:
checkpoint_dir_read_json = f"./spark-streaming-checkpoints-read-json"

streaming_query_read_json = (df_read_json
                        .writeStream
                        .format("console")
                        .outputMode("append")
                        .trigger(processingTime="1 second")
                        .option("checkpointLocation", checkpoint_dir_read_json)
                        .start())
# The following line will block for 60 seconds and the console output will be echoed in this notebook
# in the cell output. You can unblock earlier by interrupting the Jupyter kernel (menu Krenel -> Interrupt Kernel)
streaming_query_read_json.awaitTermination(60)

In [None]:
streaming_query_read_json.stop()
streaming_query_read_json.status

If you want to restart the streaming query with the same JSON files all over again, remove the checkpoint directory `checkpoint_dir_read_json`. Otherwise the query will skip the files that it have read already. 

### Writitng to Files

The following streaming query writes data read by `df_read_json` from JSON files in `input_directory_of_json_files` directory to files in `output_directory_for_json_files` directory.

In [None]:
output_directory_for_json_files = "../data_output/streaming_json"
checkpoint_dir_write_json = f"./spark-streaming-checkpoints-write-json"

streaming_query_write_json = (df_read_json
                              .writeStream
                              .format("json")
                              .option("checkpointLocation", checkpoint_dir_write_json)
                              .start(output_directory_for_json_files))

In [None]:
streaming_query_write_json.status

In [None]:
streaming_query_write_json.stop()
streaming_query_write_json.status

### Reading from Apache Kafka

Before we can read anything from Kafka, we need to write some data into a topic.  We will use `kafka-time-producer.py` to wirte a stream of timestamps to the `timestamps` Kafka topic.  Then we will read this stream and write it out to console using Spark streaming query.

To start producing timestamps into the Kafka topic run the following command from the project root
```shell
poetry run python3 bin/kafka-time-producer.py
```

FYI: `kafka-time-producer.py` generates the key-value pairs and wirtes them to Kafka using Spark, too.

In [None]:
df_read_kafka = (spark
                 .readStream
                 .format("kafka")
                 .option("kafka.bootstrap.servers", "localhost:9093,localhost:9094,localhost:9095")
                 .option("subscribe", "timestamps")
                 .option("startingOffsets", "earliest")  # the default for streaming queries is "latest"
                 .load())
# df_read_kafka_transformed = df_read_kafka.withColumns({"key_string": F.expr("cast(key as string)"),
#                                                        "value_string": F.expr("cast(value as string)")})
df_read_kafka_transformed = df_read_kafka.withColumns({"key_string": F.col("key").cast("string"),
                                                       "value_string": F.col("value").cast("string")})

In [None]:
checkpoint_dir_read_kafka = f"./spark-streaming-checkpoints-read-kafka"

streaming_query_read_kafka = (df_read_kafka_transformed
                        .writeStream
                        .format("console")
                        .outputMode("append")
                        .trigger(processingTime="1 second")
                        .option("checkpointLocation", checkpoint_dir_read_kafka)
                        .start())
# The following line will block for 60 seconds and the console output will be echoed in this notebook
# in the cell output. You can unblock earlier by interrupting the Jupyter kernel (menu Krenel -> Interrupt Kernel)
streaming_query_read_kafka.awaitTermination(60)

In [None]:
streaming_query_read_kafka.stop()
streaming_query_read_kafka.status

### Writing to Apache Kafka

The following streaming query reads key-value pairs form CSV files in a directory and writes those key-value pairs to a Kafka topic.

In [None]:
file_schema_write_kafka = "`word` string, `count` long"

df_write_kafka = spark.readStream.format("csv").schema(file_schema_write_kafka).option("header", "true").load("../data/counts")

In [None]:
checkpoint_dir_write_kafka = f"/tmp/spark-streaming-checkpoints-write-kafka-{uuid1()}"

streaming_query_write_kafka = (df_write_kafka
  .selectExpr(
    "cast(word as string) as key",
    "cast(count as string) as value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9093,localhost:9094,localhost:9095")
  .option("topic", "wordcounts")
  .outputMode("update")
  .option("checkpointLocation", checkpoint_dir_write_kafka)
  .start())


Check the outputted messages in [AKHQ](http://localhost:8086/ui/docker-kafka-server/topic/wordcounts/data?sort=Oldest&partition=All). If the counts are not written to the Kafka topic, check the terminal where you started the notebook for error logs.

In [None]:
streaming_query_write_kafka.stop()
streaming_query_write_kafka.status

### Custom Streaming Sources and Sinks

To demonstrate how to use `foreachBatch()` to write the output of a streaming query to arbitrary storage systems, we will asume that it is only possible to write to a filesystem using a batch connector. We will create a streaming query that will read word counts froma CSV and will write them to a filesystem in JSON format. Instead of using DataStreamWriter, we will use DataFrameWriter in combination with `foreachBatch()` to write streaming data.

In [None]:
file_schema_write_kafka = "`word` string, `count` long"

df_write_anywhere = spark.readStream.format("csv").schema(file_schema_write_kafka).option("header", "true").load("../data/counts")

In [None]:
def write_counts_to_filesystem(updated_df, batchId):
    updated_df.write.json(path="../data_output/streaming_anywhere", mode="append")

checkpoint_dir_write_anywhere = f"/tmp/spark-streaming-checkpoints-write-anywhere-{uuid1()}"

streaming_query_write_anywhere = (df_write_anywhere
                                 .writeStream
                                 .foreachBatch(write_counts_to_filesystem)
                                 .outputMode("update")
                                 .option("checkpointLocation", checkpoint_dir_write_anywhere)
                                 .start())

In [None]:
streaming_query_write_anywhere.stop()
streaming_query_write_anywhere.status

## Stateful Streaming Aggregations

### Aggregations Not Based on Time

#### Global aggregations

In [None]:
df_global_aggregations = (spark.readStream
                         .format("json")
                         .schema("`key` integer, `value` string")
                         .load("../data/streaming_json/"))

In [None]:
checkpoint_dir_global_aggregations = f"/tmp/spark-streaming-checkpoints-global-aggregations-{uuid1()}"

streaming_query_global_aggregations = (df_global_aggregations
                                       .groupBy()
                                       .count()
                                       .writeStream
                                       .format("console")
                                       .outputMode("update")
                                       .trigger(processingTime="1 second")
                                       .option("checkpointLocation", checkpoint_dir_global_aggregations)
                                       .start())
streaming_query_global_aggregations.awaitTermination(60)

Generate some JSON files for the streaming query to aggregate by running the following loop
```shell
for I in $(seq 9); do cp data/streaming_json/00.json data/streaming_json/$I.json; sleep 5; done
```
Clean up the generated files with
```shell
rm data/streaming_json/?.json
```

In [None]:
streaming_query_global_aggregations.stop()
streaming_query_global_aggregations.status

#### Grouped aggregations

In [None]:
checkpoint_dir_grouped_aggregations = f"/tmp/spark-streaming-checkpoints-grouped-aggregations-{uuid1()}"

streaming_query_grouped_aggregations = (df_global_aggregations
                                       .groupBy("value")
                                       .mean("key")
                                       .writeStream
                                       .format("console")
                                       .outputMode("complete")
                                       .trigger(processingTime="1 second")
                                       .option("checkpointLocation", checkpoint_dir_grouped_aggregations)
                                       .start())
streaming_query_grouped_aggregations.awaitTermination(60)

In [None]:
streaming_query_grouped_aggregations.stop()
streaming_query_grouped_aggregations.status

#### Multiple aggregations computed together

In [None]:
checkpoint_dir_multiple_aggregations = f"/tmp/spark-streaming-checkpoints-multiple-aggregations-{uuid1()}"

streaming_query_multiple_aggregations = (df_global_aggregations
                                       .groupBy("value")
                                       .agg(F.count("*"),
                                            F.mean("key").alias("baselineValue"),
                                            F.collect_set("key").alias("allValues"))
                                       .writeStream
                                       .format("console")
                                       .outputMode("complete")
                                       .trigger(processingTime="1 second")
                                       .option("checkpointLocation", checkpoint_dir_multiple_aggregations)
                                       .start())
streaming_query_multiple_aggregations.awaitTermination(60)

In [None]:
streaming_query_multiple_aggregations.stop()
streaming_query_multiple_aggregations.status

### Aggregations with Event-Time Windows

We reproduce the processing of streaming events described in the section "Handling late data with watermarks" and illustrated in Figure 8-10.

In [None]:
from collections import namedtuple

# We first crate a list of events as displayed in Figure 8-10.  An event is
# identified by its event time, delay, and sensor id.  
#
# `relative_event_time` are the minutes from the event time of the events
# as displayed in Figure 8-10, e.g., the event time of the fist event is 12:07,
# so the relative event time is 7.
# The relative event time allows us to select an arbitrary starting point,
# e.g., 21:00 instead of 12:00.
#
# `delay` is the difference between the processing time and the event time.
#
# `sensor_id` and `description` are self-explainatory.
Event = namedtuple("Event", ["relative_event_time", "delay", "sensor_id", "description"])
events_windowing = [
    Event(7, 0, "id1", "in window"),
    Event(8, 0, "id2", "in window"),
    Event(9, 4, "id3", "late arival"),
    Event(14, 0, "id2", "in window"),
    Event(15, 2, "id1", "in window"),
    Event(8, 9.5, "id2", "late arival"),
    Event(13, 5, "id3", "late arival"),
    Event(21, -2, "id2", "early arival"),
    Event(4, 18, "id1", "too late"),
    Event(17, 6, "id3", "late arival"),
]

In [None]:
# Write a tombstone to the `events` topic, so that the topic gets created and we can
# start a streaming query that will read events from it.
(spark.createDataFrame(data=[(None, None)], schema="`key` string, `value` string")
 .write
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9093,localhost:9094,localhost:9095")
 .option("topic", "events")
 .save())

In [None]:
# Define a DataFrame into which events from the Kafka topic will be read.
df_windowing = (spark
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9093,localhost:9094,localhost:9095")
                .option("subscribe", "events")
                .option("startingOffsets", "earliest")  # the default for streaming queries is "latest"
                .load())
# Specify a watermark and a grouping with a window in the same way as in the book,
# but use seconds for time intervals instead of minutes (for the experiment to run faster).
df_windowing_transformed = (df_windowing
                            .withColumns({"sensorId": F.col("key").cast("string"),
                                          "eventTime": F.col("value").cast("string").cast("timestamp")})
                            .withWatermark("eventTime", "10 seconds")
                            .groupBy("sensorId", F.window(timeColumn = "eventTime",
                                                          windowDuration = "10 seconds",
                                                          slideDuration="5 seconds",
                                                          startTime="0 seconds"))
                            .count()
                            .orderBy(["window", "sensorId"]))

In [None]:
# Define a streaming query that will output the events to the console.
checkpoint_dir_windowing = f"/tmp/spark-streaming-checkpoints-windowing-{uuid1()}"
streaming_query_windowing = (df_windowing_transformed
                        .writeStream
                        .format("console")
                        .outputMode("complete")
                        .trigger(processingTime="5 seconds")
                        .option("checkpointLocation", checkpoint_dir_windowing)
                        .option("truncate", False)
                        .start())
streaming_query_windowing.awaitTermination(10)

In [None]:
from time import sleep
from datetime import datetime, timedelta

# Wait until the next minute starts and seconds equal 0.
# This way the seconds in our window ranges will correspond
# to minutes in the window ranges in the book.
while (now := datetime.now()).second != 0:
    sleep(0.01)

# Write events to Kafka according to the schedule.
total_seconds_passed = 0
for event in events_windowing:
    wait_seconds = event.relative_event_time + event.delay - total_seconds_passed
    event_time = now + timedelta(seconds=event.relative_event_time)
    processing_time = event_time + timedelta(seconds=event.delay)
    sleep(wait_seconds)
    total_seconds_passed += wait_seconds
    kafka_message = (event.sensor_id, event_time.isoformat())
    # print(event)
    # print(kafka_message)
    # print(f"event time: {event_time.isoformat()}")
    # print(f"processing time: {processing_time.isoformat()}")
    # print(f"now: {datetime.now().isoformat()}")
    # print()
    (spark.createDataFrame(data=[kafka_message], schema="`key` string, `value` string")
     .write
     .format("kafka")
     .option("kafka.bootstrap.servers", "localhost:9093,localhost:9094,localhost:9095")
     .option("topic", "events")
     .save())

**NOTICE:**

* In the output cell, there should be four batches with aggregated values (as in the Figure 8-10). If you don't see the last batch in the output of the Jupyter cell, check the terminal in which you have started Jupyter. In the terminal all batches should be displayed.

* The "too late" event `[:04, id1]` may still be aggregated for windows `{:55, :05}` and `{:00, :10}`. This is because a watermark *does not guarantee that data arriving after the threshold is dropped.* It only *guarantees that data arrviving before the threshold is never dropped.* See "Semantic guarantees with watermarks" in the book.


* There are errors in Figure 8-10, that is why our final DataFrame looks different from that in the book. The event `[:15, id1]` will not be counted in the window `{:05, :15}`, because [the window ends are exclusive](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html#pyspark.sql.functions.window). So, the count for this sensor ID and window will be 1 and not 2 as in the book.

In [None]:
streaming_query_windowing.stop()
streaming_query_windowing.status

To restart the query, first, clear the `events` Kafka topic using [AKHQ](http://localhost:8086/ui/docker-kafka-server/topic/events/data?sort=Oldest&partition=All), then, restart the query by running the cell in which `streaming_query_windowing` is defined and the subsequent cell in which the events are sent to Kafka.

## Streaming Joins

### Stream-Static Joins

In [None]:
# Reproduce the stream-static join example close to that one from the book

# A static DataFrame of impressions (i.e., all the ads to be shown)
ad_ids = list(range(26))
df_impressions = spark.createDataFrame(data=[(i, chr(65+i)) for i in ad_ids],
                                       schema="`adId` long, `impressionDescription` string")
df_impressions.show()

In [None]:
import random

# Create a streaming DataFrame from the rate data source (timestamp-counter pairs)
# that we interpret as a stream of clicks.  The column with the counter values
# is renamed.  A new column with randm numbers from the range of `ad_ids` is added
# as `adId`, and we join the static `df_impressions` against it.
#
# Rate source docs: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources
df_clicks = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
# dd an `adId` column to `df_clicks` with random IDs from the `adId` range. 
df_impressions_clicks_join = (df_clicks
                              .withColumn("adId", F.round(F.rand()*max(ad_ids)).cast("integer"))
                              .join(df_impressions, "adId")
                              .select("timestamp", "adId", "impressionDescription"))

In [None]:
checkpoint_dir_impressions_clicks_join = f"/tmp/spark-streaming-checkpoints-impressions-clicks-join-{uuid1()}"

straming_query_impressions_clicks_join = (df_impressions_clicks_join
                                          .writeStream
                                          .format("console")
                                          .option("truncate", False)
                                          .option("checkpointLocation", checkpoint_dir_impressions_clicks_join)
                                          .outputMode("append")
                                          .trigger(processingTime="2 seconds")
                                          .start())
straming_query_impressions_clicks_join.awaitTermination(10)

In [None]:
straming_query_impressions_clicks_join.stop()
straming_query_impressions_clicks_join.status

### Stream-Stream Joins

In [None]:
import random

ad_ids = list(range(3))

impressions_with_watermark = (spark
                              .readStream
                              .format("rate")
                              .option("rowsPerSecond", 1)
                              .load()
                              .withColumn("impressionAdId", F.round(F.rand()*max(ad_ids)).cast("integer"))
                              .select("impressionAdId", F.col("timestamp").alias("impressionTime"))
                              .withWatermark("impressionTime", "2 hours"))
clicks_with_watermark = (spark
                              .readStream
                              .format("rate")
                              .option("rowsPerSecond", 2)
                              .option("rampUpTime", "5s")  # 5 seconds ramp up before the generating speed becomes as specified
                              .load()
                              .withColumn("clickAdId", F.round(F.rand()*max(ad_ids)).cast("integer"))
                              .select("clickAdId", F.col("timestamp").alias("clickTime"))
                              .withWatermark("clickTime", "2 hours"))

impressions_clicks_join = impressions_with_watermark.join(clicks_with_watermark,
                                                          on=F.expr("""
                                                              clickAdId = impressionAdId AND 
                                                              clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""),
                                                          how="inner")

In [None]:
checkpoint_dir_impressions_clicks_stream_stream_join = f"/tmp/spark-streaming-checkpoints-impressions-clicks-stream-stream-join-{uuid1()}"

straming_query_impressions_clicks_stream_stream_join = (impressions_clicks_join
                                          .writeStream
                                          .format("console")
                                          .option("truncate", False)
                                          .option("checkpointLocation", checkpoint_dir_impressions_clicks_stream_stream_join)
                                          .outputMode("append")
                                          .trigger(processingTime="2 seconds")
                                          .start())
straming_query_impressions_clicks_stream_stream_join.awaitTermination(30)

In [None]:
straming_query_impressions_clicks_stream_stream_join.stop()
straming_query_impressions_clicks_stream_stream_join.status