In [0]:
%run ./00_setup_and_config

In [0]:
import pyspark.sql.functions as func
from pyspark.sql.types import StringType, StructField, StringType, TimestampType, LongType, DoubleType, ArrayType
from delta.tables import DeltaTable

In [0]:
silver_stream_df = spark \
    .readStream \
    .format("delta") \
    .load(silver_layer_path)

In [0]:
silver_stream_df.printSchema()

In [0]:
session_summary_df = silver_stream_df \
    .withWatermark("timestamp", "7 hours") \
    .groupBy(func.col('user_id'),func.col('session_id')) \
    .agg(
        func.min("timestamp").alias("session_start_time"),
        func.max("timestamp").alias("session_end_time"),
        func.count("*").alias("total_events_per_session"),
        func.approx_count_distinct("page_url").alias("unique_page_views"),
        func.sum(func.when(func.col("event_type") == "add_to_cart", 1).otherwise(0)).alias("add_to_cart_count"),
        func.sum(func.when(func.col("event_type") == "purchase", 1).otherwise(0)).alias("purchase_count"),
        func.sum(func.when(func.col("event_type") == "purchase", func.col("product_price")).otherwise(0.0)).alias("total_purchase_value"),
        func.collect_set(func.col("category")).alias("product_categories_viewed"),
        func.collect_set(func.col("product_id_flat")).alias("products_viewed_ids"), 
        func.collect_set(func.col("product_name_flat")).alias("products_viewed_names"), 
        func.collect_set(func.col("product_brand_flat")).alias("products_viewed_brands")
    ) \
    .withColumn("last_updated", func.current_timestamp())
    

In [0]:
def upsert_to_gold(microBatchOutputDF, batchId):
    """
    Function to perform an UPSERT (MERGE INTO) operation on the Gold Delta table.
    This function is called for each micro-batch of the streaming query.
    """
    # Ensure the Gold Delta table exists before trying to merge
    if not DeltaTable.isDeltaTable(spark, gold_layer_path):
        print(f"[{func.current_timestamp()}] Batch ID: {batchId} - Creating Gold Delta table at {gold_layer_path}")
        # If the table doesn't exist, simply write the first batch to create it.
        # We use mode("append") here as it's the first write to create the table structure.
        microBatchOutputDF.write.format("delta").mode("append").save(gold_layer_path)
    else:
        # Get the DeltaTable object for the target Gold table
        deltaTable = DeltaTable.forPath(spark, gold_layer_path)
        
        print(f"[{func.current_timestamp()}] Batch ID: {batchId} - Performing MERGE INTO on {gold_layer_path}")
        # Perform the MERGE INTO operation
        (deltaTable.alias("target")
         .merge(
             source=microBatchOutputDF.alias("source"),
             condition=func.expr("target.session_id = source.session_id AND target.user_id = source.user_id")
         )
         .whenMatchedUpdate(set={ 
             "session_start_time": func.col("source.session_start_time"),
             "session_end_time": func.col("source.session_end_time"),
             "total_events_per_session": func.col("source.total_events_per_session"),
             "unique_page_views": func.col("source.unique_page_views"),
             "add_to_cart_count": func.col("source.add_to_cart_count"),
             "purchase_count": func.col("source.purchase_count"),
             "total_purchase_value": func.col("source.total_purchase_value"),
             "product_categories_viewed": func.col("source.product_categories_viewed"),
             "products_viewed_ids": func.col("source.products_viewed_ids"),
             "products_viewed_names": func.col("source.products_viewed_names"),
             "products_viewed_brands": func.col("source.products_viewed_brands"),
             "last_updated": func.current_timestamp()
         })
         .whenNotMatchedInsertAll()
         .execute())


In [0]:
session_summary_df.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", gold_checkpoint_location) \
    .trigger(processingTime="30 seconds") \
    .foreachBatch(upsert_to_gold) \
    .queryName("ClickstreamGoldSessionSummary") \
    .start()

In [0]:
# display(session_summary_df)

In [0]:
# dbutils.fs.rm(gold_layer_path, True)
# dbutils.fs.rm(gold_checkpoint_location, True)
# print(f"Cleaned up previous Gold checkpoint at {gold_checkpoint_location} and output data at {gold_layer_path}.")