# Download the libraries

In [None]:
! rm -rf jars
!mkdir jars
!wget -q -P jars https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar
!wget -q -P jars https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar
!wget -q -P jars https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.4.1/spark-token-provider-kafka-0-10_2.12-3.4.1.jar
!wget -q -P jars https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.18/scala-library-2.12.18.jar
!wget -q -P jars https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar

# Set up SparkSession

In [None]:
import os
from pyspark.sql import SparkSession

In [None]:
base_dir = os.getcwd() + '/jars'

spark = (SparkSession.builder
    .master('local[*]')
    .appName('Spark Structured Streaming example with Kafka')
    .config("spark.jars", 
            base_dir + '/kafka-clients-3.5.1.jar' + "," + 
            base_dir +'/spark-sql-kafka-0-10_2.12-3.4.1.jar' + "," + 
            base_dir + '/spark-token-provider-kafka-0-10_2.12-3.4.1.jar' + "," + 
            base_dir + '/scala-library-2.12.18.jar' + "," + 
            base_dir + '/commons-pool2-2.11.1.jar')
    .getOrCreate())

spark

# Define the schema for our data

In [None]:
from pyspark.sql.types import *

In [None]:
schema = StructType([
    StructField("VP", StructType([
      StructField("desi", StringType()),
      StructField("dir", StringType()),
      StructField("oper", IntegerType()),
      StructField("veh", IntegerType()),
      StructField("tst", TimestampType()),
      StructField("tsi", LongType()),
      StructField("spd", DoubleType()),
      StructField("hdg", IntegerType()),
      StructField("lat", DoubleType()),
      StructField("long", DoubleType()),
      StructField("acc", DoubleType()),
      StructField("dl", IntegerType()),
      StructField("odo", StringType()),
      StructField("drst", StringType()),
      StructField("oday", DateType()),
      StructField("jrn", IntegerType()),
      StructField("line", IntegerType()),
      StructField("start", StringType()),
      StructField("loc", StringType()),
      StructField("stop", LongType()),
      StructField("route", StringType()),
      StructField("occu", IntegerType())
    ]))
])

# Initialize the stream

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

We will read the data from the topic `vehicle-positions` in the Kafka cluster

In [None]:
kafka_source_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "vehicle-positions")
    .option("startingOffsets", "earliest")
    .load()
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))

In [None]:
from pyspark.sql.functions import *


In [None]:
vehicle_position_df = (kafka_source_df
    .select(from_json(col("value"), schema).alias("json")) 
    .select("json.VP.*"))

In [None]:
vehicle_position_df.printSchema()

<h3>Record Sample</h3> 

<code>
{
  "desi": "M1",
  "dir": "1",
  "oper": 50,
  "veh": 302,
  "tst": "2023-08-28T09:57:56Z",
  "tsi": 1693216676,
  "spd": 11.86,
  "hdg": 52,
  "lat": 60.1721918,
  "long": 24.94817722,
  "acc": null,
  "dl": null,
  "odo": null,
  "drst": null,
  "oday": "2023-08-28",
  "start": "12:26",
  "loc": "MAN",
  "stop": 1020603,
  "route": "31M1",
  "occu": 0,
  "seq": 1
}
</code>

### Perform streaming transformations

[window documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html#pyspark.sql.functions.window)<br>
[withWatermark documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withWatermark.html#pyspark.sql.DataFrame.withWatermark)

In [None]:
vehicle_position_window_df = (vehicle_position_df
      .withWatermark("tst", "1 milliseconds")
      .groupBy(
        window(col("tst"), "1 minutes", "1 minutes"), #window(timeColumn, windowDuration, slideDuration) (tumbling windows: an input can belong only to one window)
        col("route")
      ).count())

In [None]:
vehicle_position_window_df.printSchema()

# Start the streaming query

## Append mode

We are using the sink **memory**, it support only two output modes: **append** and **complete**.

In [None]:
query_append_trigger = (vehicle_position_window_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("append")
    .queryName("query_append_trigger")
    .start())

In [None]:
(spark.sql("select * from query_append_trigger")
    .filter(col('route') == '2113')
    .show(n = 100, truncate = False))

In [None]:
query_append_trigger.stop()

## Complete mode

This Dataframe is identical to `vehicle_position_window_df`, but I have removed the .withWatermark because it is useless in complete mode

In [None]:
vehicle_position_window_no_wm_df = (vehicle_position_df
      .groupBy(
        window(col("tst"), "1 minutes", "1 minutes"), #window(timeColumn, windowDuration, slideDuration) (tumbling windows: an input can belong only to one window)
        col("route")
      ).count())

In [None]:
query_complete = (vehicle_position_window_no_wm_df.writeStream
            .format("memory")
            .trigger(processingTime = '6 seconds') 
            .outputMode("complete")
            .queryName("query_complete")
            .start())

In [None]:
(spark.sql("select * from query_complete")
    .filter(col('route') == '2113')
    .show(n = 100, truncate = False))

In [None]:
query_complete.stop()

<h3> Thoughts on Complete mode </h3>
The complete mode never releases the intermediate state of the aggregation which means that the memory consumption will increase indefinitely for a query such this. 

### Thoughts on watermarking output mode and sink

- The watermarking can only be used in **update** and **append** mode. 
- In **complete** mode the old aggregation state is never dropped and we cannot use watermarking.
- Without the watermarking in the **append** mode the old aggregation state is never dropped.
- The sink format **memory** should be used only for debugging purposes and with low volumes of data since the output is entirely stored in the driver's memory
- The sink format **memory** does not support the **update** mode.

# Join Stream Stream

In [None]:
rate_source_df = (spark 
    .readStream 
    .format("rate")
    .load())

The input source `rate` generates one row per second with a timestamp and an increasing value:
<code>
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2023-10-31 14:16:43.267|0    |
|2023-10-31 14:16:44.267|1    |
|2023-10-31 14:16:45.267|2    |
|2023-10-31 14:16:46.267|3    |
|2023-10-31 14:16:47.267|4    |
|2023-10-31 14:16:51.267|8    |
|2023-10-31 14:16:48.267|5    |
|2023-10-31 14:16:52.267|9    |
|2023-10-31 14:16:49.267|6    |
|2023-10-31 14:16:50.267|7    |
+-----------------------+-----+
</code>

In [None]:
second_rate_source_df = (spark 
    .readStream 
    .format("rate")
    .load()
    .withColumn("animals", 
                when(col("value") % 4 == 0, lit("Cat"))
                .when(col("value") % 4 == 1, lit("Dog"))
                .when(col("value") % 4 == 2, lit("Mouse"))
                .otherwise(lit("Horse")))
    )

<code>
+-----------------------+-----+-------+
|timestamp              |value|animals|
+-----------------------+-----+-------+
|2023-11-02 09:51:49.535|0    |Cat    |
|2023-11-02 09:51:50.535|1    |Dog    |
|2023-11-02 09:51:51.535|2    |Mouse  |
|2023-11-02 09:51:52.535|3    |Horse  |
|2023-11-02 09:51:53.535|4    |Cat    |
|2023-11-02 09:51:57.535|8    |Cat    |
|2023-11-02 09:51:54.535|5    |Dog    |
|2023-11-02 09:51:58.535|9    |Dog    |
|2023-11-02 09:51:55.535|6    |Mouse  |
|2023-11-02 09:51:56.535|7    |Horse  |
+-----------------------+-----+-------+
</code>

To allow the state cleaning when performing a join stream-stream we need to specify two conditions:
- watermarking
- range condition on the event time

In [None]:
# Apply watermarks on event-time columns
rate_source_wm_df = (rate_source_df
    .withColumnRenamed("timestamp", "timestamp_rate_source")
    .withColumnRenamed("value", "value_rate_source")
    .withWatermark("timestamp_rate_source", "1 milliseconds"))

In [None]:
query_rate_source_wm = (rate_source_wm_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("append")
    .queryName("query_rate_source_wm")
    .start())

In [None]:
(spark.sql("select * from query_rate_source_wm")
    .show(n = 100, truncate = False))

In [None]:
query_rate_source_wm.stop()

In [None]:
# Apply watermarks on event-time columns
second_rate_source_wm_df = (second_rate_source_df
    .withColumn("timestamp_second_source", col("timestamp") + expr("INTERVAL 2 seconds")) #add 5 seconds 
    .withColumnRenamed("value", "value_second_source")
    .drop("timestamp")                        
    .withWatermark("timestamp_second_source", "10 milliseconds"))

In [None]:
second_rate_source_wm = (second_rate_source_wm_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("append")
    .queryName("second_rate_source_wm")
    .start())

In [None]:
(spark.sql("select * from second_rate_source_wm")
    .show(n = 100, truncate = False))

In [None]:
second_rate_source_wm.stop()

## Inner Join

In [None]:
# Join with event-time constraints
join_rate_sources_wm_df = second_rate_source_wm_df.join(
  rate_source_wm_df, 
  expr("""
    value_rate_source = value_second_source AND
    timestamp_second_source >= timestamp_rate_source AND
    timestamp_second_source <= timestamp_rate_source + interval 3 seconds
    """)
)

In [None]:
join_rate_sources_wm_query = (join_rate_sources_wm_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("append")
    .queryName("join_rate_sources_wm_query")
    .start())

In [None]:
(spark.sql("select * from join_rate_sources_wm_query")
    .show(n = 100, truncate = False))

In [None]:
join_rate_sources_wm_query.stop()

In [None]:
join_rate_sources_wm_query.exception()

In [None]:
join_rate_sources_wm_query.lastProgress

## Left outer join

In [None]:
rate_source_wm_filter_df = rate_source_wm_df.filter(col("value_rate_source") % 3 == 0 )

In [None]:
# Join with event-time constraints
join_rate_sources_wm_filter_df = second_rate_source_wm_df.join(
  rate_source_wm_filter_df, 
  expr("""
    value_rate_source = value_second_source AND
    timestamp_second_source >= timestamp_rate_source AND
    timestamp_second_source <= timestamp_rate_source + interval 3 seconds
    """),
    "leftOuter"
)

In [None]:
join_rate_sources_wm_filter_query = (join_rate_sources_wm_filter_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("append")
    .queryName("join_rate_sources_wm_filter_query")
    .start())

In [None]:
(spark.sql("select * from join_rate_sources_wm_filter_query")
    .show(n = 100, truncate = False))

In [None]:
join_rate_sources_wm_filter_query.stop()

# Join Stream-Static

In [None]:
operators_df = spark.read.csv("operators.csv", header = True, inferSchema = True)

In [None]:
operators_df.show(truncate = False)

In [None]:
operators_df.printSchema()

In [None]:
join_vehicle_operators_df = vehicle_position_df.join(operators_df, vehicle_position_df.oper == operators_df.id, "left_outer")

In [None]:
join_vehicle_operators_df.printSchema()

In [None]:
query_join_vechicle_operators_append = (join_vehicle_operators_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("append")
    .queryName("query_join_vechicle_operators_append")
    .start())

In [None]:
(spark.sql("select tst, route, oper, operator_name, country, city, address from query_join_vechicle_operators_append")
    .filter(col('route') == '2113')
    .show(n = 20, truncate = False))

In [None]:
query_join_vechicle_operators_append.stop()

## Aggregations after join

In [None]:
vehicle_position_operators_group_df = (join_vehicle_operators_df
      .withWatermark("tst", "1 milliseconds")
      .groupBy(
        window(col("tst"), "1 minutes", "1 minutes"),
        col("oper"),
        col("operator_name") 
      ).count())

In [None]:
vehicle_position_operators_query_append = (vehicle_position_operators_group_df.writeStream
        .format("memory")
        .trigger(processingTime = '6 seconds') 
        .outputMode("append")
        .queryName("vehicle_position_operators_append")
        .start())

In [None]:
(spark.sql("select * from vehicle_position_operators_append")
    .filter(col('oper') == '22')
    .show(truncate = False, n = 30))

In [None]:
vehicle_position_operators_query_append.stop()

## Sorting

In [None]:
vehicle_position_operators_group_sort_df = (join_vehicle_operators_df
      .groupBy(
        window(col("tst"), "1 minutes", "1 minutes"),
        col("oper"),
        col("operator_name") 
      ).count()
      .orderBy(desc('window')))

In [None]:
vehicle_position_operators_query_complete = (vehicle_position_operators_group_sort_df.writeStream
    .format("memory")
    .trigger(processingTime = '6 seconds') 
    .outputMode("complete")
    .queryName("vehicle_position_operators_complete")
    .start())

In [None]:
(spark.sql("select * from vehicle_position_operators_complete")
    .filter(col('oper') == '22')
    .show(truncate = False, n = 100))

In [None]:
vehicle_position_operators_query_complete.stop()

<h3> Thoughts on sorting </h3>
<b>Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode</b>. <br>
So we can only use the groupBy in Complete mode and not in Append mode.

<h1>Query mamagement</h1>

[documentation](https://spark.apache.org/docs/3.4.1/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html)

<h3>Get the unique identifier of the running query</h3>

In [None]:
vehicle_position_operators_query_complete.id

<h3>Get the run id of the query</h3>

In [None]:
vehicle_position_operators_query_complete.runId

<h3>Get the name of the auto-generated or user-specified name</h3>

In [None]:
vehicle_position_operators_query_complete.name

<h3>Print detailed explanations of the query</h3>

In [None]:
vehicle_position_operators_query_complete.explain()

<h3>Query recent progress</h3>

In [None]:
vehicle_position_operators_query_complete.recentProgress

<h3>Get the query last progress</h3>

In [None]:
vehicle_position_operators_query_complete.lastProgress

<h3>Get the list of currently active streaming queries</h3>

In [None]:
spark.streams.active

<h3>Get the query object from the id</h3>

In [None]:
query = spark.streams.get(vehicle_position_operators_query_complete.id)

In [None]:
query.name

<h3>Query active</h3>

In [None]:
vehicle_position_operators_query_complete.isActive

<h3>Query status</h3>

In [None]:
vehicle_position_operators_query_complete.status

<h3>Query exception (useful if the query has terminated with an exception)</h3>

In [None]:
vehicle_position_operators_query_complete.exception()

<h3>Await query termination</h3>

In [None]:
#query.awaitTermination([timeout])

<h3>Stop the query</h3>

In [None]:
vehicle_position_operators_query_complete.stop()

<h1>Output Sinks</h1>

<h2>CSV</h2>

In [None]:
vehicle_position_operators_filter_df = (join_vehicle_operators_df
      .withWatermark("tst", "1 seconds")
      .filter(col('oper').isin('22','90','6','30','12','50'))                                  
      .groupBy(
        window(col("tst"), "1 minutes", "1 minutes"),
        col("oper"),
        col("operator_name") 
      ).count())

In [None]:
vehicle_position_operators_filter_mod_df = (vehicle_position_operators_filter_df 
        .withColumn("window",vehicle_position_operators_filter_df.window.cast('string'))
        .coalesce(1)) # writes 1 file csv for each trigger

In [None]:
vehicle_position_operators_filter_mod = (vehicle_position_operators_filter_mod_df.writeStream
        .format("csv")                               # can be "orc", "json", "parquet", etc.
        .option("path", "csv")
        .option("header",True)
        .trigger(processingTime = '1 minutes') 
        .outputMode("append")
        .option("checkpointLocation", "checkpoint")
        .start())

In [None]:
vehicle_position_operators_filter_mod.exception()

In [None]:
vehicle_position_operators_filter_mod.status

In [None]:
vehicle_position_operators_filter_mod.isActive

In [None]:
vehicle_position_operators_filter_mod.stop()

In [None]:
# to delete folders
#! rm -rf csv

<h3>Thoughts on CSV sink</h3>
<ul>
    <li> It is not allowed to use the Complete mode so we cannot perform any sorting on the data. </li>
    <li> It is necessary to cast the column window to string because timestamp columns are not allowed when the sink is of type CSV </li>
    <li> It will be generated one CSV file for each trigger </li>
</ul>

<h2>Kafka Topic</h2>

In [None]:
vehicle_position_operators_kafka_df = (join_vehicle_operators_df
      .withWatermark("tst", "1 seconds")
      #.filter(col('oper').isin('22','90','6','30','12','50'))                                  
      .groupBy(
        window(col("tst"), "1 minutes", "1 minutes"),
        col("oper"),
        col("operator_name") 
      ).count()
      .select(col("oper").alias("key"), to_json(struct("window", "count", "operator_name")).alias("value")))  

In [None]:
vehicle_position_operators_kafka_df.printSchema()

In [None]:
# Just to check the output
vehicle_position_operators_kafka_query_memory = (vehicle_position_operators_kafka_df
    .writeStream
    .format("memory")
    .trigger(processingTime = '1 minutes') 
    .outputMode("append")
    .queryName("vehicle_position_operators_kafka_query_memory")
    .start())

In [None]:
(spark.sql("select * from vehicle_position_operators_kafka_query_memory")
    #.filter(col('key') == 22)
    .show(truncate = False, n = 100))

In [None]:
vehicle_position_operators_kafka_query_memory.stop()

Before continuing we must create the topic `operators-counts` by executing this line on a terminal: 
<br>
<br>
`docker exec -it broker kafka-topics --create --bootstrap-server broker:9092 --partitions 1 --replication-factor 1 --topic operators-counts`

In [None]:
vehicle_position_operators_kafka_query_kafka = (vehicle_position_operators_kafka_df 
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
      .writeStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "broker:29092") 
      .option("topic", "operators-counts")
      .option("checkpointLocation", "checkpoint-kafka")
      .start())

In [None]:
vehicle_position_operators_kafka_query_kafka.status

To check if the data are being written to the topic `operators-count` in the Kafka cluster we can start a simple consumer by executing on a terminal:
<br>
<br>
`docker exec -it broker kafka-console-consumer --bootstrap-server broker:9092 --from-beginning --topic operators-counts --property print.key=true`

If everything went fine you should see something like this:
<br>
<br>
<code>
130	{"window":{"start":"2023-08-30T10:15:00.000Z","end":"2023-08-30T10:16:00.000Z"},"count":60,"operator_name":"Klein, Jakubowski and Hermiston"}
60	{"window":{"start":"2023-08-30T10:28:00.000Z","end":"2023-08-30T10:29:00.000Z"},"count":13,"operator_name":"Heathcote-Schinner"}
50	{"window":{"start":"2023-08-30T10:24:00.000Z","end":"2023-08-30T10:25:00.000Z"},"count":2062,"operator_name":"Lemke-Waters"}
30	{"window":{"start":"2023-08-30T10:19:00.000Z","end":"2023-08-30T10:20:00.000Z"},"count":1380,"operator_name":"Lakin, Breitenberg and Morissette"}
59	{"window":{"start":"2023-08-30T10:23:00.000Z","end":"2023-08-30T10:24:00.000Z"},"count":240,"operator_name":"Schuster Group"}
12	{"window":{"start":"2023-08-30T10:25:00.000Z","end":"2023-08-30T10:26:00.000Z"},"count":10595,"operator_name":"Pollich-Kuhlman"}
40	{"window":{"start":"2023-08-30T10:16:00.000Z","end":"2023-08-30T10:17:00.000Z"},"count":4432,"operator_name":"Bogisich LLC"}
54	{"window":{"start":"2023-08-30T10:29:00.000Z","end":"2023-08-30T10:30:00.000Z"},"count":540,"operator_name":"Morar Inc"}
21	{"window":{"start":"2023-08-30T10:27:00.000Z","end":"2023-08-30T10:28:00.000Z"},"count":240,"operator_name":"Ledner-Boyer"}
60	{"window":{"start":"2023-08-30T10:20:00.000Z","end":"2023-08-30T10:21:00.000Z"},"count":6,"operator_name":"Heathcote-Schinner"}
</code>


In [None]:
vehicle_position_operators_kafka_query_kafka.stop()