In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
# Define the path to the Delta table in the bronze layer
silver_table_path = "amazon.streamdb.streaming_cleaned"

# Read the streaming data from the Delta table
streaming_silver_df = spark.readStream \
    .table(silver_table_path)

In [0]:
streaming_silver_df.display()

timestamp,user_id,event_type,device_type,session_id,EventProcessedUtcTime,PartitionId,EventEnqueuedUtcTime,product_id,category,sub_category,price,search_term,product_id_cart_contents,quantity_cart_contents,price_cart_contents
1723429520.1133268,CUST-14631,Add to Cart,Mobile,S959-4745249960,2024-08-12T02:25:20.763695Z,0,2024-08-12T02:25:20.639Z,WW-561417,Women'S Clothing,western wear,,,,,
1723429520.9070451,CUST-17226,Browse Product,Desktop,S647-5737266877,2024-08-12T02:25:21.561912Z,0,2024-08-12T02:25:21.514Z,F-221971,home & kitchen,Furniture,,,,,
1723429520.9729657,CUST-13727,Search,Desktop,S806-7767844990,2024-08-12T02:25:21.562041Z,0,2024-08-12T02:25:21.529Z,,Accessories,,,handbags & clutches,,,
1723429522.1478453,CUST-11875,Apply Filter,Mobile,S128-5985264117,2024-08-12T02:25:22.873224Z,0,2024-08-12T02:25:22.67Z,,accessories,,,,,,
1723429524.0960815,CUST-08853,Browse Product,Desktop,S594-7571885355,2024-08-12T02:25:25.07663Z,0,2024-08-12T02:25:24.904Z,CS-117376,men's shoes,Casual Shoes,,,,,
1723429524.5287745,CUST-17226,Add to Cart,Desktop,S647-5737266877,2024-08-12T02:25:25.5142Z,0,2024-08-12T02:25:25.31Z,F-221971,Home & Kitchen,furniture,,,,,
1723429524.8170297,CUST-16063,AppLogin,Mobile,S485-7090286144,2024-08-12T02:25:25.51439Z,0,2024-08-12T02:25:25.389Z,,,,,,,,
1723429524.5831337,CUST-13727,Browse Product,Desktop,S806-7767844990,2024-08-12T02:25:25.514655Z,0,2024-08-12T02:25:25.404Z,PCA-426149,beauty & health,Personal Care Appliances,,,,,
1723429525.687539,CUST-11875,Apply Filter,Mobile,S128-5985264117,2024-08-12T02:25:26.404585Z,0,2024-08-12T02:25:26.232Z,,accessories,,,,,,
1723429529.3363063,CUST-17226,Browse Product,Desktop,S647-5737266877,2024-08-12T02:25:30.014005Z,0,2024-08-12T02:25:29.842Z,J-335697,men's clothing,Jeans,,,,,


#### Joins with Watermarking & Window

In [0]:
# Step 1: Apply watermarking and aggregate total clicks on user level
interaction_df_01 = streaming_silver_df \
    .withWatermark("EventProcessedUtcTime", "10 minutes") \
    .groupBy(
        col("user_id"),
        window(col("EventProcessedUtcTime"), "5 minutes")
    ) \
    .agg(
        count("*").alias("total_clicks")
    )

In [0]:
interaction_df_01.display()

user_id,window,total_clicks
CUST-05103,"List(2024-08-08T17:30:00Z, 2024-08-08T17:35:00Z)",13
CUST-17296,"List(2024-08-08T18:35:00Z, 2024-08-08T18:40:00Z)",16
CUST-17542,"List(2024-08-09T02:10:00Z, 2024-08-09T02:15:00Z)",9
CUST-04535,"List(2024-08-08T22:35:00Z, 2024-08-08T22:40:00Z)",11
CUST-05756,"List(2024-08-10T19:05:00Z, 2024-08-10T19:10:00Z)",7
CUST-09437,"List(2024-08-13T00:45:00Z, 2024-08-13T00:50:00Z)",11
CUST-08046,"List(2024-08-11T20:45:00Z, 2024-08-11T20:50:00Z)",14
CUST-16589,"List(2024-08-13T22:10:00Z, 2024-08-13T22:15:00Z)",20
CUST-11130,"List(2024-08-12T08:50:00Z, 2024-08-12T08:55:00Z)",11
CUST-03824,"List(2024-08-08T22:10:00Z, 2024-08-08T22:15:00Z)",16


In [0]:
checkpoint_path_gold_clicks = "/mnt/amazonopcheckpoint/chekpointlocgoldclicks"

interaction_df_01.writeStream \
    .trigger(processingTime="60 seconds")\
    .outputMode("complete") \
    .option("checkpointLocation", checkpoint_path_gold_clicks) \
    .toTable("amazon.streamdb.total_clicks_gold_fact")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f0318ececb0>

In [0]:
%sql
select * from amazon.streamdb.total_clicks_gold_fact;

user_id,window,total_clicks
CUST-01832,"List(2024-08-12T02:05:00Z, 2024-08-12T02:10:00Z)",9
CUST-01094,"List(2024-08-10T11:05:00Z, 2024-08-10T11:10:00Z)",2
CUST-06022,"List(2024-08-09T23:15:00Z, 2024-08-09T23:20:00Z)",16
CUST-12628,"List(2024-08-09T09:15:00Z, 2024-08-09T09:20:00Z)",15
CUST-05881,"List(2024-08-14T01:45:00Z, 2024-08-14T01:50:00Z)",6
CUST-11301,"List(2024-08-12T07:20:00Z, 2024-08-12T07:25:00Z)",11
CUST-03409,"List(2024-08-08T08:25:00Z, 2024-08-08T08:30:00Z)",5
CUST-09404,"List(2024-08-10T22:35:00Z, 2024-08-10T22:40:00Z)",2
CUST-05972,"List(2024-08-14T06:05:00Z, 2024-08-14T06:10:00Z)",16
CUST-16136,"List(2024-08-09T03:05:00Z, 2024-08-09T03:10:00Z)",18


In [0]:
# Step 2: Apply watermarking and aggregate total cart value on user level
session_df_01 = streaming_silver_df \
    .withWatermark("EventProcessedUtcTime", "10 minutes") \
    .groupBy(
        col("user_id"),
        window(col("EventProcessedUtcTime"), "5 minutes")
    ) \
    .agg(
        count("session_id").alias("total_sessions")
    )

In [0]:
session_df_01.display()

user_id,window,total_sessions
CUST-05103,"List(2024-08-08T17:30:00Z, 2024-08-08T17:35:00Z)",13
CUST-17296,"List(2024-08-08T18:35:00Z, 2024-08-08T18:40:00Z)",16
CUST-17542,"List(2024-08-09T02:10:00Z, 2024-08-09T02:15:00Z)",9
CUST-04535,"List(2024-08-08T22:35:00Z, 2024-08-08T22:40:00Z)",11
CUST-05756,"List(2024-08-10T19:05:00Z, 2024-08-10T19:10:00Z)",7
CUST-09437,"List(2024-08-13T00:45:00Z, 2024-08-13T00:50:00Z)",11
CUST-08046,"List(2024-08-11T20:45:00Z, 2024-08-11T20:50:00Z)",14
CUST-16589,"List(2024-08-13T22:10:00Z, 2024-08-13T22:15:00Z)",20
CUST-11130,"List(2024-08-12T08:50:00Z, 2024-08-12T08:55:00Z)",11
CUST-03824,"List(2024-08-08T22:10:00Z, 2024-08-08T22:15:00Z)",16


In [0]:
checkpoint_path_gold_sessions = "/mnt/amazonopcheckpoint/chekpointlocgoldsessions"

session_df_01.writeStream \
    .trigger(processingTime="60 seconds")\
    .outputMode("complete") \
    .option("checkpointLocation", checkpoint_path_gold_sessions) \
    .toTable("amazon.streamdb.total_sessions_gold_fact")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f032c170af0>

In [0]:
%sql
select * from amazon.streamdb.total_sessions_gold_fact;

user_id,window,total_sessions
CUST-01832,"List(2024-08-12T02:05:00Z, 2024-08-12T02:10:00Z)",9
CUST-01094,"List(2024-08-10T11:05:00Z, 2024-08-10T11:10:00Z)",2
CUST-06022,"List(2024-08-09T23:15:00Z, 2024-08-09T23:20:00Z)",16
CUST-12628,"List(2024-08-09T09:15:00Z, 2024-08-09T09:20:00Z)",15
CUST-05881,"List(2024-08-14T01:45:00Z, 2024-08-14T01:50:00Z)",6
CUST-11301,"List(2024-08-12T07:20:00Z, 2024-08-12T07:25:00Z)",11
CUST-03409,"List(2024-08-08T08:25:00Z, 2024-08-08T08:30:00Z)",5
CUST-09404,"List(2024-08-10T22:35:00Z, 2024-08-10T22:40:00Z)",2
CUST-05972,"List(2024-08-14T06:05:00Z, 2024-08-14T06:10:00Z)",16
CUST-16136,"List(2024-08-09T03:05:00Z, 2024-08-09T03:10:00Z)",18


#### Experiments - foreachBatch

In [0]:
checkpoint_path_gold_03 = "/mnt/amazonopcheckpoint/chekpointlocgoldtest03"

In [0]:
# Define the upsert function
def upsertToDelta(microBatchOutputDF, batchId):
    # Create a temporary view for the current micro-batch
    microBatchOutputDF.createOrReplaceTempView("updates")

    # Perform the MERGE operation
    microBatchOutputDF.sparkSession.sql(f"""
        MERGE INTO {delta_table} t
        USING updates s
        ON s.user_id = t.user_id AND s.window = t.window
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

In [0]:
delta_table = "amazon.streamdb.user_session_aggregates_gold_joins"

In [0]:
# Write the result to Delta with upsert logic
final_df.writeStream \
    .foreachBatch(upsertToDelta) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_path_gold_03) \
    .start()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3782660352236498>, line 6[0m
[1;32m      1[0m [38;5;66;03m# Write the result to Delta with upsert logic[39;00m
[1;32m      2[0m [43mfinal_df[49m[38;5;241;43m.[39;49m[43mwriteStream[49m[43m [49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43mforeachBatch[49m[43m([49m[43mupsertToDelta[49m[43m)[49m[43m [49m[43m\[49m
[1;32m      4[0m [43m    [49m[38;5;241;43m.[39;49m[43moutputMode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mupdate[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      5[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcheckpointLocation[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[43mcheckpoint_path_gold_03[49m[43m)[49m[43m [4

In [0]:
# Write the result to Delta with upsert logic
final_df.writeStream \
    .foreachBatch(upsertToDelta) \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path_gold_03) \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f033483f640>

In [0]:
# Write the result to Delta with upsert logic
final_df.writeStream \
    .foreachBatch(upsertToDelta) \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path_gold_03) \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f0908397cd0>