In [None]:
# ==================================================================
# SILVER TO GOLD PROCESSING
# ==================================================================
from pyspark.sql.window import Window

# Read from Silver Delta tables
events = spark.table("silver.fact_events")
products = spark.table("silver.dim_products")
users = spark.table("silver.dim_users")

# ================ GOLD FACT TABLES ================
# Daily sales fact table
gold_sales = events.filter(col("event_type") == "purchase") \
    .groupBy("event_date", "product_id", "user_id") \
    .agg(
        sum("price").alias("total_sales"),
        count("*").alias("order_count")
    ) \
    .join(products, "product_id", "left") \
    .join(users, "user_id", "left") \
    .select(
        "event_date", "product_id", "user_id",
        "brand", "category_name", "country",
        "total_sales", "order_count"
    )

# User behavior metrics
window_spec = Window.partitionBy("user_id").orderBy("event_date")

gold_user_behavior = events \
    .withColumn("first_activity", min("event_date").over(window_spec)) \
    .withColumn("last_activity", max("event_date").over(window_spec)) \
    .filter(col("event_type") == "purchase") \
    .groupBy("user_id") \
    .agg(
        count("*").alias("total_purchases"),
        sum("price").alias("total_spend"),
        first("country").alias("country"),
        min("first_activity").alias("first_purchase_date"),
        max("last_activity").alias("last_purchase_date")
    ) \
    .withColumn("days_since_last_purchase",
        datediff(current_date(), col("last_purchase_date"))
    ) \
    .withColumn("purchase_frequency",
        col("total_purchases") /
        (datediff(col("last_purchase_date"), col("first_purchase_date")) + 1)
    )

# Product performance
gold_product_performance = events \
    .groupBy("product_id", "category_name", "brand") \
    .agg(
        count(when(col("event_type") == "product_view", 1)).alias("view_count"),
        count(when(col("event_type") == "cart_add", 1)).alias("cart_add_count"),
        count(when(col("event_type") == "purchase", 1)).alias("purchase_count"),
        sum(when(col("event_type") == "purchase", col("price"))).alias("total_revenue")
    ) \
    .withColumn("conversion_rate",
        col("purchase_count") / col("view_count") * 100
    ) \
    .withColumn("cart_abandonment_rate",
        (col("cart_add_count") - col("purchase_count")) / col("cart_add_count") * 100
    )

# ================ WRITE GOLD LAYER ================
# Write to Gold layer (Delta format)
gold_sales.write.format("delta") \
    .partitionBy("event_date") \
    .mode("overwrite") \
    .save("/mnt/gold/fact_sales")

gold_user_behavior.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/gold/dim_customers")

gold_product_performance.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/gold/dim_product_performance")

# Create Gold Delta tables
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.fact_sales
    USING DELTA
    LOCATION '/mnt/gold/fact_sales'
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.dim_customers
    USING DELTA
    LOCATION '/mnt/gold/dim_customers'
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.dim_product_performance
    USING DELTA
    LOCATION '/mnt/gold/dim_product_performance'
""")

# ================ FEATURE ENGINEERING ================
# Customer RFM Segmentation
rfm = gold_user_behavior \
    .withColumn("recency",
        when(col("days_since_last_purchase") < 30, 5)
        .when(col("days_since_last_purchase") < 60, 4)
        .when(col("days_since_last_purchase") < 90, 3)
        .when(col("days_since_last_purchase") < 180, 2)
        .otherwise(1)
    ) \
    .withColumn("frequency",
        when(col("purchase_frequency") > 0.5, 5)
        .when(col("purchase_frequency") > 0.3, 4)
        .when(col("purchase_frequency") > 0.1, 3)
        .when(col("purchase_frequency") > 0.05, 2)
        .otherwise(1)
    ) \
    .withColumn("monetary",
        when(col("total_spend") > 1000, 5)
        .when(col("total_spend") > 500, 4)
        .when(col("total_spend") > 200, 3)
        .when(col("total_spend") > 50, 2)
        .otherwise(1)
    ) \
    .withColumn("rfm_score", col("recency") + col("frequency") + col("monetary")) \
    .withColumn("customer_segment",
        when(col("rfm_score") >= 13, "Champion")
        .when(col("rfm_score") >= 10, "Loyal")
        .when(col("rfm_score") >= 7, "Potential")
        .otherwise("At-Risk")
    )

rfm.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/gold/customer_segments")

# Materialize view for Synapse
spark.sql("""
    CREATE TABLE gold.customer_segments
    USING DELTA
    LOCATION '/mnt/gold/customer_se