In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import os
import time

# Create SparkSession with Delta Lake support
builder = SparkSession.builder.appName("SensorDataWindow") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Define schema for the sensor data
schema = StructType([
    StructField("time", IntegerType(), True),  # time is in epoch seconds
    StructField("action", StringType(), True)
])

# Path to the folder containing the sensor data CSV files
sensor_data_path = "input/sensor-data/"

# Read the sensor data stream
sensor_data_df = (spark.readStream
                  .schema(schema)  # Define the schema explicitly
                  .json(sensor_data_path)  # Read JSON files as a stream
)

# Create a timestamp column from the epoch time (if it's not already a timestamp)
sensor_data_df = sensor_data_df.withColumn("timestamp", F.from_unixtime(F.col("time")))

# Define the checkpoint and output path for Delta
checkpoint_path = "streaming/sensor_data_tumbling/_checkpoint"
output_path = "delta/sensor_data_tumbling"

# Ensure the output path exists
os.makedirs(output_path, exist_ok=True)

# Apply tumbling window (every 5 minutes) and count the number of actions in each window
sensor_data_window_query = (sensor_data_df
                            .groupBy(F.window("timestamp", "5 minutes"), "action")  # Tumbling window on timestamp (every 5 minutes)
                            .count()  # Count the number of actions in each window
                            .withColumnRenamed("count", "action_count")
                            .writeStream
                            .outputMode("complete")  # Use complete mode to update the complete table in each trigger
                            .format("delta")
                            .queryName("sensor_data_tumbling_query")
                            .trigger(processingTime="5 seconds")  # Trigger every 5 seconds
                            .option("checkpointLocation", checkpoint_path)  # Checkpoint location
                            .start(output_path)  # Delta table output path
)

# Function to create table if not exists
def create_table_if_exists(output_path, table_name):
    data_exists = False
    for _i in range(60):  # Retry for 60 seconds
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for _f in files:
                if ".parquet" in _f:
                    if len(os.listdir(f"{output_path}/_delta_log")) > 0:
                        print("data exists")
                        data_exists = True
                        break
            if data_exists:
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{output_path}'")
                break
        except Exception as e:
            print(e)  # Uncomment if you want to see exceptions
            pass

# Ensure table creation
table_name = "sensor_data_tumbling"
create_table_if_exists(output_path, table_name)

# Wait for the streaming query to terminate
sensor_data_window_query.awaitTermination(timeout = 20)


data exists


False

#### Spark configuration note

In [3]:
df = spark.read.format("delta").load("delta/sensor_data_tumbling")
df.toPandas().sort_values(by=['window'], ascending=True)

Unnamed: 0,window,action,action_count
56,"(2016-07-26 02:45:00, 2016-07-26 02:50:00)",Open,32
377,"(2016-07-26 02:50:00, 2016-07-26 02:55:00)",Open,66
1104,"(2016-07-26 02:50:00, 2016-07-26 02:55:00)",Close,5
641,"(2016-07-26 02:55:00, 2016-07-26 03:00:00)",Open,81
859,"(2016-07-26 02:55:00, 2016-07-26 03:00:00)",Close,6
...,...,...,...
593,"(2016-07-28 06:25:00, 2016-07-28 06:30:00)",Close,14
1044,"(2016-07-28 06:30:00, 2016-07-28 06:35:00)",Close,15
831,"(2016-07-28 06:35:00, 2016-07-28 06:40:00)",Close,12
9,"(2016-07-28 06:40:00, 2016-07-28 06:45:00)",Close,3


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import os
import time

# Create SparkSession with Delta Lake support
builder = SparkSession.builder.appName("SensorDataWindow") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Define schema for the sensor data
schema = StructType([
    StructField("time", IntegerType(), True),  # time is in epoch seconds
    StructField("action", StringType(), True)
])

# Path to the folder containing the sensor data CSV files
sensor_data_path = "input/sensor-data/"

# Read the sensor data stream
sensor_data_df = (spark.readStream
                  .schema(schema)  # Define the schema explicitly
                  .json(sensor_data_path)  # Read JSON files as a stream
)

# Create a timestamp column from the epoch time (if it's not already a timestamp)
sensor_data_df = sensor_data_df.withColumn("timestamp", F.from_unixtime(F.col("time")))

# Define the checkpoint and output path for Delta
checkpoint_path = "streaming/sensor_data_sliding/_checkpoint"
output_path = "delta/sensor_data_sliding"

# Ensure the output path exists
os.makedirs(output_path, exist_ok=True)

# Apply tumbling window (every 5 minutes) and count the number of actions in each window
sensor_data_window_query = (sensor_data_df
                            .groupBy(F.window("timestamp", "5 minutes", "1 minute"), "action")  # Tumbling window on timestamp (every 5 minutes)
                            .count()  # Count the number of actions in each window
                            .withColumnRenamed("count", "action_count")
                            .writeStream
                            .outputMode("complete")  # Use complete mode to update the complete table in each trigger
                            .format("delta")
                            .queryName("sensor_data_tumbling_query")
                            .trigger(processingTime="5 seconds")  # Trigger every 5 seconds
                            .option("checkpointLocation", checkpoint_path)  # Checkpoint location
                            .start(output_path)  # Delta table output path
)

# Function to create table if not exists
def create_table_if_exists(output_path, table_name):
    data_exists = False
    for _i in range(60):  # Retry for 60 seconds
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for _f in files:
                if ".parquet" in _f:
                    if len(os.listdir(f"{output_path}/_delta_log")) > 0:
                        print("data exists")
                        data_exists = True
                        break
            if data_exists:
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{output_path}'")
                break
        except Exception as e:
            print(e)  # Uncomment if you want to see exceptions
            pass

# Ensure table creation
table_name = "sensor_data_sliding"
create_table_if_exists(output_path, table_name)

# Wait for the streaming query to terminate
sensor_data_window_query.awaitTermination(timeout = 20)


[Errno 2] No such file or directory: 'delta/sensor_data_sliding/_delta_log'
[Errno 2] No such file or directory: 'delta/sensor_data_sliding/_delta_log'
[Errno 2] No such file or directory: 'delta/sensor_data_sliding/_delta_log'
[Errno 2] No such file or directory: 'delta/sensor_data_sliding/_delta_log'
[Errno 2] No such file or directory: 'delta/sensor_data_sliding/_delta_log'
[Errno 2] No such file or directory: 'delta/sensor_data_sliding/_delta_log'
data exists


False

In [2]:
df = spark.read.format("delta").load("delta/sensor_data_sliding")
df.toPandas().sort_values(by=['window'], ascending=True)

Unnamed: 0,window,action,action_count
3352,"(2016-07-26 02:41:00, 2016-07-26 02:46:00)",Open,2
2324,"(2016-07-26 02:42:00, 2016-07-26 02:47:00)",Open,4
4249,"(2016-07-26 02:43:00, 2016-07-26 02:48:00)",Open,15
2726,"(2016-07-26 02:44:00, 2016-07-26 02:49:00)",Open,22
217,"(2016-07-26 02:45:00, 2016-07-26 02:50:00)",Open,32
...,...,...,...
966,"(2016-07-28 06:44:00, 2016-07-28 06:49:00)",Close,3
2283,"(2016-07-28 06:45:00, 2016-07-28 06:50:00)",Close,3
4967,"(2016-07-28 06:46:00, 2016-07-28 06:51:00)",Close,2
657,"(2016-07-28 06:47:00, 2016-07-28 06:52:00)",Close,2


### Structured Streaming
  
  
* Kafka 
* Aggregations
* Time windows
* Watermarking
* Joins

### Time windows

In [None]:
orders_cleaned_df.printSchema()

In [None]:
# commonly, you might not want an aggregation of a stream's whole history.
# for this purpose, let's use window from functions - NB this is "time windows" not "SQL-like (row) window function"

checkpoint_path = "streaming/orders_most_products_tumb/_checkpoint" 
table_name = "orders_most_products_tumb"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_tumb_query = (orders_cleaned_df
  .select("user_id",F.size("product_ids").alias("count_of_products"), "order_timestamp")
  .groupBy("user_id", F.window("order_timestamp", "5 minute")) # Aggregate by user, every 5 minute block. This is a "tumbling window"
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_tumb_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).orderBy(F.desc("window.end"),F.desc("product_count"))

In [None]:
# if we want to keep always the latest time window then we can use sliding windows

checkpoint_path = "streaming/orders_most_products_slide/_checkpoint" 
table_name = "orders_most_products_slide"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_slide_query = (orders_cleaned_df
  .select("user_id",F.size("product_ids").alias("count_of_products"), "order_timestamp")
  .groupBy("user_id", F.window("order_timestamp", "5 minute", "1 minute")) # Aggregate by user, every 5 minute block sliding by 1 minute.
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_slide_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).orderBy(F.desc("window.end"),F.desc("product_count"))

##### Non-Kafka part starts here

In [None]:
# let's try with a different, simpler dataset

input_path = "sensor-data"

json_schema = "time timestamp, action string"

In [None]:
# Let's create a dataframe and apply some transformations and aggregation

input_df = (spark
  .readStream                                 
  .schema(json_schema)                       
  .option("maxFilesPerTrigger", 1)            
  .json(input_path)                           
)

counts_df = (input_df
  .groupBy(F.col("action"),                     # Aggregate by action
           F.window(F.col("time"), "1 hour"))     # and by a 1 hour window
  .count()                                    # Count the actions
  .select(F.col("window.start").alias("start"), 
          F.col("count"),                       
          F.col("action"))                      
)

In [None]:
checkpoint_path = "streaming/counts/_checkpoint" 
table_name = "counts"
output_path = f"spark-warehouse/{table_name}"

counts_query = (counts_df
  .writeStream
  .outputMode("complete")
  .format("delta")
  .queryName("counts_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).orderBy(F.col("start"))

### Watermarking

In [None]:
# in actual use cases, the queries above would keep running for a very long time and the amount of windows would grow indefinitely
# keeping track of all the states puts pressure on memory
# also it may often be irrelevant if delayed data updates our figures

watermarked_stream = "watermarked_stream"

watermarked_df = (input_df
  .withWatermark("time", "2 hours")             # Specify a 2-hour watermark
  .groupBy(F.col("action"),                       # Aggregate by action...
           F.window(F.col("time"), "1 hour"))       # ...then by a 1 hour window
  .count()                                      # For each aggregate, produce a count
  .select(F.col("window.start").alias("start"),   # Elevate field to column
          F.col("count"),                         # Include count
          F.col("action"))                        # Include action
)
display(watermarked_df, streamName = watermarked_stream) # Start the stream and display it

# important note: watermarking guarantees that any event within the window gets in. It does not guarantee leaving anything out.

In [None]:
# in actual use cases, the queries above would keep running for a very long time and the amount of windows would grow indefinitely
# keeping track of all the states puts pressure on memory
# also it may often be irrelevant if delayed data updates our figures

checkpoint_path = "streaming/orders_most_products_slide_wm/_checkpoint" 
table_name = "orders_most_products_slide_wm"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_slide_wm_query = (orders_cleaned_df
  .withWatermark("order_timestamp", "20 minute")             # Specify a 20-minute watermark
  .select("user_id",F.size("product_ids").alias("count_of_products"), "order_timestamp")
  .groupBy("user_id", F.window("order_timestamp", "5 minute", "1 minute")) # Aggregate by user, every 5 minute block sliding by 1 minute.
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_slide_wm_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

# important note: watermarking guarantees that any event within the window gets in. It does not guarantee leaving anything out.

In [None]:
spark.table(table_name).orderBy(F.desc("window.end"),F.desc("product_count"))

In [None]:
# Let's import another dataset. Let's say we are interested in hourly monitoring of incoming traffic to our website

schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

hourlyEventsPath = "events20200703"

website_df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .json(hourlyEventsPath)
)

In [None]:
# this dataframe does not have a proper timestamp column. So we need to create one and use it for watermarking

events_df = (website_df
             .withColumn("createdAt", (F.col("event_timestamp") / 1e6).cast("timestamp"))
             .withWatermark("createdAt", "2 hours")
)             

In [None]:
# now we can do an aggregation

traffic_df = (events_df
             .groupBy("traffic_source"
                      , F.window(F.col("createdAt"), "1 hour"))
             .agg(F.approx_count_distinct("user_id").alias("active_users"))
             .select(F.col("traffic_source")
                     , F.col("active_users")
                     , F.hour(F.col("window.start")).alias("hour"))
             .sort("hour")
)


checkpoint_path = "streaming/traffic/_checkpoint" 
table_name = "traffic"
output_path = f"spark-warehouse/{table_name}"

traffic_query = (traffic_df
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("traffic_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name)

### Joining streams

In [None]:
# let's load in users dataset

users_df = spark.read.parquet("users.parquet")

In [None]:
# join works same way as with regular dataframes.
# note: this is streaming<->static join

joined_df = (events_df
            .join(users_df.drop("user_first_touch_timestamp"), "user_id")
            )

checkpoint_path = "streaming/join_static/_checkpoint" 
table_name = "join_static"
output_path = f"spark-warehouse/{table_name}"

join_static_query = (joined_df
  .writeStream
  .outputMode("append")
  .format("delta")
  .queryName("join_static_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).limit(5)

In [None]:
# let's read in users dataframe as a stream

# since we have created the dataframe from this data, we can cheat on getting the schema. Possible in development/debugging, not possible or recommended in production
users_schema = users_df.schema

users_stream_df = (spark
                   .readStream
                   .format("parquet")
                   .schema(users_schema)
                   .option("maxFilesPerTrigger", 1)
                   .parquet("users.parquet")
                  )

In [None]:
# let's do a stream to stream join

joined_streams_df = (events_df
            .join(users_stream_df.drop("user_first_touch_timestamp"), "user_id")
            )

checkpoint_path = "streaming/join_stream/_checkpoint" 
table_name = "join_stream"
output_path = f"spark-warehouse/{table_name}"

join_stream_query = (joined_df
  .writeStream
  .outputMode("append")
  .format("delta")
  .queryName("join_stream_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).limit(5)

In [None]:
for stream in spark.streams.active:
  stream.stop()

### Further reading

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html  
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withWatermark.html  
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html  
https://docs.databricks.com/spark/latest/structured-streaming/index.html

### Task 1

Create a streaming dataframe from the data in the following path:  
`flights200701stream`

The schema should contain
* DepartureAt (timestamp)
* UniqueCarrier (string)

Process only 1 file per trigger.  

Aggregate the data by count, using non-overlapping 30 minute windows.  
Ignore any data that is older than 6 hours.

The output should have 3 columns: startTime (window start time), UniqueCarrier, count.  

Save to a delta table, firing the trigger every 5 seconds.

Display the table, the output should be sorted ascending by startTime.

Once the stream has produced some output, call the stream shutdown function.

In [None]:
# Your answer


### Task 2

Join the Kafka streaming orders dataframe to the `product.csv` dataset.  
Note that Spark assumes that any streaming dataframes refer to a directory, not a specific file.

Aggregate the data by sum(price) (`total_price`) and a 2-minute tumbling window.  
Add a 10 minute watermark, and store the data in a delta table.

Create a view on top of the delta table with the following columns:
* product_id
* product_name
* n_minus_2_window_total_price
* n_minus_1_window_total_price
* current_window_total_price

The total_price columns need to be pivoted based on only the 3 most recent windows. 
The actual 
Order the dataset by descending `current_total_price`.

In [None]:
# Your answer
