#### Integration of Apache Kafka and Spark Streaming

Let's give some context to this notebook. An alien entity has entered Portugal and suddenly starts turning citizens into scotsmen! Fear grips the country. We need to find out what is happening and how it is affecting the population.

[To understand what is going on](https://www.youtube.com/watch?v=qxDJMn-534Y)
(This is a Monty **Python** sketch)

We have been charged by the Portuguese government to analyze how the population has already changed. We will use **Spark Streaming** to stream data from our Kafka cluster and analyze live-streamed epidemiological data.

We will work with streams, stream-stream joins and stream-static joins.

<img src="img/scottish_portugal.png" height="500" width="700"/>

Structured Streaming treats a ``stream`` of data as a table that is updated in real time. An underlying process then regularly checks for updates and updates the table, if necessary. The API around Structured Streaming is designed in such a way that what works on your DataFrame, should also work on your streamed DataFrame! 

``Spark Streaming`` is a subset of Spark's functionalities that allows us to work with event-based data, as with our Kafka cluster. We set some global variables and import Schema Types to **structure our data**.

In [3]:
import pyspark.sql.functions as F

from pyspark.sql.types import StructType, StringType, DoubleType, StructField, IntegerType, TimestampType
from pyspark.sql import SparkSession

KAFKA_BOOTSTRAP_SERVERS = "localhost:8098"
KAFKA_TOPIC = "scotsmen"

In [5]:
import findspark
findspark.init()

import pyspark

We initialize a Spark Session. We import the ``Spark SQL Kafka Connector`` as a dependency. 

In [22]:
# Initialize local spark session
spark = SparkSession \
    .builder \
    .appName("kafka_streaming") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .config("spark.driver.host","127.0.0.1") \
    .config("spark.driver.bindAddress","127.0.0.1") \
    .getOrCreate()

Using the ``subscribe-publish`` paradigm, we subscribe to the Kafka topic ``scotsmen``.

In [23]:
# Read from ``KAFKA_TOPIC``
streaming_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()    

In [24]:
# Instantiate the schema of the messages received
scotsmen_schema = StructType([
  StructField("district", StringType()),
  StructField("new_scotsmen", IntegerType()),
  StructField("timestamp", TimestampType())
])

In [25]:
# We select the 'value' column and cast it as a String
json_df = streaming_df.select(
    F.from_json(F.col("value").cast("string"), scotsmen_schema).alias("value"), 
    "timestamp"
    )

In [26]:
# We instantiate an SQL view to inspect our data
json_df.select("value.*").createOrReplaceTempView("scotsmen")

In [27]:
# Sample query from ``scotsmen`` table
scotsmen_query = spark.sql("SELECT * FROM scotsmen")

Note that, as with the non-streaming API, there are ``transformations`` and ``actions``. Execution of a query operation on Spark Streaming is lazy.

### Input Sources & Sinks

Spark Structured Streaming supports different input sources and sinks. Supported sinks are:
1. Kafka Streams
2. Files on a distributed file system (HDFS, S3). Spark will read files from a directory
3. A Socket Source

While input sources specify the origin of the data, sinks specify where the data will be written. Those sinks can be:
1. Kafka sink: Pushes data to Kafka
2. Files sink: Writes the output to a file (JSON, parquet, CSV etc.)
3. ForEach sink: Can be used to for each row of a DataFrame for custom storage logic
4. Console sink: Used for testing
5. Memory: Used for debugging

``Memory`` and ``Console``sinks are very similar. ``Memory`` mode makes the data available in an in-memory table for interactive inspection.

In [28]:
# This will print the results to the console
query = scotsmen_query.writeStream.outputMode("append").format("console").start()

24/12/03 10:12:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/nt/03y4p9md50gblp_0svv74zb80000gn/T/temporary-e40bba87-ac59-43e0-b96e-2ffe3b0b966e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/12/03 10:12:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/12/03 10:12:47 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


CodeCache: size=131072Kb used=34276Kb max_used=35295Kb free=96795Kb
 bounds [0x000000010d1f8000, 0x000000010f538000, 0x00000001151f8000]
 total_blobs=12479 nmethods=11478 adapters=913
 compilation: disabled (not enough contiguous free space left)
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+------------+---------+
|district|new_scotsmen|timestamp|
+--------+------------+---------+
+--------+------------+---------+



In [29]:
# Stopping the query
query.stop()

There are three different output modes available. Here, we used ``append``, which only adds new records to the sink. The other two are ``update`` and ``complete``. ``update`` mode updates the data in the sink, while ``complete`` mode replaces the data in the sink.

In [30]:
# Write this query to a memory table
memory_query = scotsmen_query \
    .writeStream \
    .outputMode("append") \
    .queryName("scotsmen_table") \
    .format("memory") \
    .start()

24/12/03 10:12:54 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/nt/03y4p9md50gblp_0svv74zb80000gn/T/temporary-f7fd2128-60af-41d1-ac26-535d6e9d1e37. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/12/03 10:12:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


IllegalArgumentException: Cannot start query with name scotsmen_table as a query with that name is already active in this SparkSession

In [17]:
spark.sql("SELECT district, sum(new_scotsmen) AS total_new_scotsmen FROM scotsmen_table GROUP BY district").show()

+--------+------------------+
|district|total_new_scotsmen|
+--------+------------------+
+--------+------------------+



#### Window Functions

In [18]:
# Convert JSON to DataFrame
new_scotsmen_df = json_df.select(
    F.col("value.district").alias("conversion_district"),
    F.col("value.timestamp").alias("conversion_timestamp"),
    F.col("value.new_scotsmen").alias("new_scotsmen")
)

In [19]:
# Watermarking ensures ensures that late events 
# (up to 30 seconds after their event timestamp) 
# are considered in the aggregation, but any event arriving after that will be ignored.
windowed_df = new_scotsmen_df.withWatermark("conversion_timestamp", "30 seconds") \
    .groupBy(
        F.window(new_scotsmen_df["conversion_timestamp"], "3 minute"),  # 3-minute window
        new_scotsmen_df["conversion_district"]  # Group by district
    ) \
    .agg(
        F.count("*").alias("event_count"),   # Count events in each window for each district
        F.sum("new_scotsmen").alias("total_new_scotsmen"),  # Sum of values for each district in each window
        F.avg("new_scotsmen").alias("average_new_scotsmen") # Compute the average value for each district in each window
    )

In [20]:
window_query = windowed_df \
    .writeStream \
    .outputMode("complete") \
    .queryName("new_scotsmen_aggregated") \
    .format("memory") \
    .start()

24/12/03 10:11:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/nt/03y4p9md50gblp_0svv74zb80000gn/T/temporary-78aa0eb1-1149-49df-b600-304e7f2e1229. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/12/03 10:11:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/12/03 10:11:29 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [21]:
spark.sql("SELECT * FROM new_scotsmen_aggregated").show()

+------+-------------------+-----------+------------------+--------------------+
|window|conversion_district|event_count|total_new_scotsmen|average_new_scotsmen|
+------+-------------------+-----------+------------------+--------------------+
+------+-------------------+-----------+------------------+--------------------+



#### Advanced Features

Structured Streaming supports ``Joins``. This means that you are able to (I) join a stream with a static DataFrame and (II) join two streams. This can be used to supplement streaming data with another data source.

Here, we will supplement our ``scotsmen`` table with the ``bag_pipes_sales`` table.

In [None]:
# Again, we need to read from Kafka.
# This time, we subscribe to the bagpipes topic
bagpipes_stream = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", "bagpipe") \
    .option("startingOffsets", "earliest") \
    .load()    

In [None]:
# Instantiate the schema of the messages received
bagpipes_schema = StructType([
  StructField("district", StringType()),
  StructField("bagpipe_sales", IntegerType()),
  StructField("timestamp", TimestampType())
])

In [None]:
# We select the 'value' column and cast it as a String
bagpipes_json_df = bagpipes_stream.select(
    F.from_json(F.col("value").cast("string"), bagpipes_schema).alias("value"), 
    )

In [None]:
# Convert the JSON to DataFrame and alias columns
bagpipe_df = bagpipes_json_df.select(
    F.col("value.district").alias("sales_district"),
    F.col("value.timestamp").alias("sale_timestamp"),
    F.col("value.bagpipe_sales").alias("bagpipe_sales")
)

In [None]:
# Add watermarking to both streams to handle late data
conversion_stream = new_scotsmen_df.withColumn("conversion_truncated_timestamp", F.date_trunc("minute", new_scotsmen_df["conversion_timestamp"]))
bagpipes_stream = bagpipe_df.withColumn("sales_truncated_timestamp", F.date_trunc("minute", bagpipe_df["sale_timestamp"]))

# Watermark the datasets
conversion_stream = conversion_stream.withWatermark("conversion_truncated_timestamp", "1 minute")
bagpipes_stream = bagpipes_stream.withWatermark("sales_truncated_timestamp", "1 minute")

# Alias the datasets
conversion_stream = conversion_stream.alias("s1")
bagpipes_stream = bagpipes_stream.alias("s2")

# Perform the join between the two windowed streams on 'district' and matching 
# windows by using a functional expression
joined_stream = conversion_stream \
    .join(
        bagpipes_stream,
        F.expr("""
            s1.conversion_district = s2.sales_district AND
            s2.sales_truncated_timestamp >= s1.conversion_truncated_timestamp AND
            s2.sales_truncated_timestamp <= s1.conversion_truncated_timestamp + interval 5 minute
        """)
    ) \
    .select(
        "s2.sales_truncated_timestamp",
        "s1.new_scotsmen",
        "s2.bagpipe_sales",
        "s1.conversion_district"
    )

# Create the window column before aggregation
joined_stream = joined_stream.withColumn("window", F.window("sales_truncated_timestamp", "1 hour"))

# Aggregate values per district
aggregated_stream = joined_stream \
    .groupBy(
        joined_stream.conversion_district,
        joined_stream.window
    ) \
    .agg(
        F.sum(joined_stream.new_scotsmen).alias("total_new_scotsmen"),
        F.sum(joined_stream.bagpipe_sales).alias("total_bagpipe_sales")
    )

In [None]:
# Output the results to the console for inspection
query = aggregated_stream \
    .writeStream \
    .queryName("new_scotsmen_bagpipe_sales_per_district") \
    .outputMode("append") \
    .format("memory") \
    .start()

In [None]:
# Showing the results
spark.sql("SELECT * FROM new_scotsmen_bagpipe_sales_per_district").show()

#### Static-Stream Joins

Apart from joining two streams, Spark also supports joining a stream with a static DataFrame. This can be used to supplement streaming data with another data source, such as a lookup table. Here  we will supplement our ``conversation_stream`` with the ``portugal_district_population2022.csv`` table.

In [None]:
population_schema = StructType([
  StructField("district", StringType()),
  StructField("pop", IntegerType())
])

population_df = spark \
    .read \
    .format("csv") \
    .option("header", True) \
    .schema(population_schema) \
    .load("portugal_district_population2022.csv")

In [None]:
# Join the conversion stream with the population data
joined_stream_population = conversion_stream.join(
    population_df,
    conversion_stream.conversion_district == population_df.district,
    "inner" 
) \
    .withColumn(
        "conversions_per_pop", F.col("new_scotsmen") / F.col("pop")
    )

# Select the columns you need
result_stream = joined_stream_population.select(
    "conversion_truncated_timestamp",
    "new_scotsmen",
    "conversion_district",
    "pop",
    "conversions_per_pop"
)

In [None]:
# Output the results to the console for testing
query = result_stream \
    .writeStream \
    .queryName("prop_pop_converted") \
    .outputMode("append") \
    .format("memory") \
    .start()

In [None]:
# Define the window duration and slide duration
window_duration = "1 hour"
slide_duration = "10 minutes"

# SQL query to compute the cumulative sum of the ratio
sql_query = f"""
SELECT
    window.start AS window_start,
    window.end AS window_end,
    SUM(conversions_per_pop) AS cumulative_ratio,
    conversion_district
FROM (
    SELECT
        conversions_per_pop,
        window(current_timestamp(), '{window_duration}', '{slide_duration}') AS window,
        conversion_district
    FROM prop_pop_converted
)
GROUP BY conversion_district, window
ORDER BY window_start
"""

# Execute the SQL query
result_df = spark.sql(sql_query)

# Show the result
result_df.show()

#### Simulating Streaming Datasets

It is also possible to "simulate" a streaming dataset by reading from a directory of CSV files. This can be useful for testing and debugging purposes. The dataset contains individual files that can be read as a batch of data.

In [None]:
# Define the weather schema
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("min_temperature", DoubleType(), True),
    StructField("max_temperature", DoubleType(), True),
    StructField("precipitation", DoubleType(), True)
])

# Path to the directory containing the CSV files
# Note that this must be a directory and not an individual file
input_path = "weather"

NUM_FILES_PER_TRIGGER = 3

# Read the streaming DataFrame from the directory
streaming_df = spark.readStream \
    .option("maxFilesPerTrigger", NUM_FILES_PER_TRIGGER) \
    .option("header", "true") \
    .format("csv") \
    .schema(schema) \
    .load(input_path)

# Define the query to process the streaming data
# It is possible to set the batch size 
# to control how frequently the streaming query processes new data.
# This is done using the trigger option in the writeStream method. 
# The trigger option allows you to specify the processing time interval, 
# which determines the batch size.
# maxFilesPerTrigger is the maximum number of files that will be
# processed in a single trigger.
query = streaming_df.writeStream \
    .trigger(processingTime='10 seconds') \
    .option("maxFilesPerTrigger", 5) \
    .outputMode("append") \
    .queryName("weather") \
    .format("memory") \
    .start()

In [None]:
# We can now query the static dataset just like we did before
spark.sql(
    """SELECT max_temperature, min_temperature, timestamp 
    FROM weather ORDER BY timestamp 
    DESC
    """
).show()