In [0]:
# 1. Setup Widgets for Batch Processing
dbutils.widgets.text("run_date", "2026-01-15")
v_date = dbutils.widgets.get("run_date")

from pyspark.sql.functions import col, to_timestamp, split, to_date, lit

# 2. Read from Bronze (Filtering by batch date)
# This ensures we only process the new data for the day
bronze_df = spark.table("ecommerce_prod.bronze.raw_events").filter(col("batch_date") == v_date)

# 3. Data Cleaning & Transformation
silver_df = (bronze_df
    # Convert event_time string to a real Timestamp
    .withColumn("event_time", to_timestamp(col("event_time")))
    # Create a Date column for efficient partitioning
    .withColumn("event_date", to_date(col("event_time")))
    # Split category_code into main and sub categories
    .withColumn("main_category", split(col("category_code"), r"\.").getItem(0))
    .withColumn("sub_category", split(col("category_code"), r"\.").getItem(1))
    # Remove exact duplicates (common in clickstream data)
    .dropDuplicates(["event_time", "user_id", "product_id"])
    # Add a processing timestamp for auditing
    .withColumn("processed_at", lit(v_date))
)

# 4. Write to Silver Table with Performance Options
# 'overwriteSchema' allows us to add the main_category columns if they don't exist
# 'partitionBy' ensures that future queries only read the specific dates needed
silver_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("event_date") \
    .saveAsTable("ecommerce_prod.silver.cleaned_events")

# 5. Optimize & Z-Order (The Task 3 Performance Boost)
# We run this to physically reorganize the 42M rows by 'brand'
spark.sql(f"OPTIMIZE ecommerce_prod.silver.cleaned_events ZORDER BY (brand)")

print(f"âœ… Silver layer complete and Z-Ordered for {v_date}")