In [0]:
# Databricks notebook: Gold Layer - Full Aggregations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# ================================
# 1. Read Silver Table
# ================================
silver_df = spark.table("Silver_Retail")
print(f"🔹 Silver record count: {silver_df.count()}")

# ================================
# 2. Gold Tables
# ================================

# A. Gold_Retail (Customer-level total spend & items)
gold_df = (
    silver_df.groupBy("Customer_Name", "City", "Customer_Category")
    .agg(
        F.sum("Total_Cost").alias("Total_Spent"),
        F.sum("Total_Items").alias("Items_Purchased")
    )
)
gold_df.write.format("delta").mode("overwrite").saveAsTable("Gold_Retail")
print("✅ Gold_Retail saved.")
gold_df.show(5, truncate=False)

# B. Gold_Customer_Spend
gold_customer = gold_df
gold_customer.write.format("delta").mode("overwrite").saveAsTable("Gold_Customer_Spend")
print("✅ Gold_Customer_Spend saved.")

# C. Gold_Sales_By_City_Month
gold_city_month = (
    silver_df.groupBy(
        F.col("City"),
        F.year("Date").alias("Year"),
        F.month("Date").alias("Month")
    )
    .agg(
        F.sum("Total_Cost").alias("Total_Sales"),
        F.sum("Total_Items").alias("Total_Items")
    )
)
gold_city_month.write.format("delta").mode("overwrite").saveAsTable("Gold_Sales_By_City_Month")
print("✅ Gold_Sales_By_City_Month saved.")

# D. Gold_Promotion_Effectiveness
gold_promotion = (
    silver_df.groupBy("Promotion")
    .agg(
        F.countDistinct("Transaction_ID").alias("Num_Transactions"),
        F.sum("Total_Cost").alias("Revenue")
    )
)
gold_promotion.write.format("delta").mode("overwrite").saveAsTable("Gold_Promotion_Effectiveness")
print("✅ Gold_Promotion_Effectiveness saved.")

# E. Gold_Customer_LTV
gold_ltv = (
    silver_df.groupBy("Customer_Name")
    .agg(
        F.sum("Total_Cost").alias("Lifetime_Spend"),
        F.countDistinct("Transaction_ID").alias("Num_Transactions")
    )
)
gold_ltv.write.format("delta").mode("overwrite").saveAsTable("Gold_Customer_LTV")
print("✅ Gold_Customer_LTV saved.")

# F. Gold_Basket_Analysis
# Need Fact table (exploded products)
fact_df = (
    silver_df.withColumn(
        "Product",
        F.explode(F.split(F.regexp_replace("Product", r"[\[\]']", ""), ",\s*"))
    ).withColumn("Product", F.trim(F.col("Product")))
)

basket = (
    fact_df.alias("a")
    .join(
        fact_df.alias("b"),
        (F.col("a.Transaction_ID") == F.col("b.Transaction_ID")) &
        (F.col("a.Product") < F.col("b.Product"))
    )
    .groupBy(F.col("a.Product").alias("Product_A"), F.col("b.Product").alias("Product_B"))
    .count()
    .orderBy(F.desc("count"))
)
basket.write.format("delta").mode("overwrite").saveAsTable("Gold_Basket_Analysis")
print("✅ Gold_Basket_Analysis saved.")

print("🎉 All Gold tables created successfully!")
