For the Online Retail dataset, typical gold layer tables include aggregated business metrics like:

Revenue by Country / Month / Product
Top Customers (by spend)
Customer RFM (Recency, Frequency, Monetary) scoring
Product performance (best sellers, returns)

In [0]:
# ---- Parameters (CI/CD compatible) ----
dbutils.widgets.text("storage_account", "retaildatalaketest")
dbutils.widgets.text("container", "ecommerce")

storage_account = dbutils.widgets.get("storage_account")
container = dbutils.widgets.get("container")

base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net"

In [0]:
from pyspark.sql import functions as F

# silver_path = "abfss://ecommerce@retaildatalaketest.dfs.core.windows.net/silver/online_retail_silver"
# gold_path = "abfss://ecommerce@retaildatalaketest.dfs.core.windows.net/gold/"

silver_path = f"{base_path}/silver/online_retail_silver"
gold_path = f"{base_path}/gold"

# Read silver
silver_df = spark.read.format("delta").load(silver_path)

assert silver_df.count() > 0, "❌ Silver layer is empty"

# Step 1: Extract just the date part as string, then cast to DATE (simple & safe)
silver_clean = silver_df.withColumn(
    "InvoiceDate",
    F.to_date(F.col("InvoiceDate"), "M/d/yy H:mm")   # Handles 12/1/10 8:26 → 2010-12-01
)
silver_clean.limit(5).display()
# Step 2: Add TotalAmount
silver_with_amount = silver_clean.withColumn(
    "TotalAmount", 
    F.col("Quantity") * F.col("Price")
)

# --- Gold Metrics (Simple Aggregations) ---

# 1. Monthly Revenue by Country
monthly_revenue = (silver_with_amount
    .groupBy(
        F.date_format("InvoiceDate", "yyyy-MM").alias("YearMonth"),
        "Country"
    )
    .agg(F.round(F.sum("TotalAmount"), 2).alias("Revenue"))
    .orderBy("YearMonth", F.desc("Revenue"))
)

# 2. Top 20 Products by Revenue
top_products = (silver_with_amount
    .groupBy("StockCode", "Description")
    .agg(
        F.round(F.sum("TotalAmount"), 2).alias("TotalRevenue"),
        F.sum("Quantity").alias("TotalSold")
    )
    .orderBy(F.desc("TotalRevenue"))
    .limit(20)
)

# 3. Top 10 Customers by Spend
top_customers = (silver_with_amount
    .groupBy("CustomerID")
    .agg(
        F.round(F.sum("TotalAmount"), 2).alias("TotalSpend"),
        F.countDistinct("Invoice").alias("NumOrders")
    )
    .orderBy(F.desc("TotalSpend"))
    .limit(10)
)

# 4. Daily Summary (useful for dashboards)
# 4. Daily Summary - Now with year/month as real columns + partitioning
daily_summary = (silver_clean
    .withColumn("InvoiceDate", F.to_date("InvoiceDate"))  # Clean date
    .withColumn("year", F.year("InvoiceDate"))
    .withColumn("month", F.month("InvoiceDate"))
    .groupBy("year", "month", "InvoiceDate")
    .agg(
        F.countDistinct("Invoice").alias("NumOrders"),
        F.round(F.sum("TotalAmount"), 2).alias("DailyRevenue"),
        F.countDistinct("CustomerID").alias("UniqueCustomers")
    )
    .orderBy("InvoiceDate")
    .select("InvoiceDate", "year", "month", "NumOrders", "DailyRevenue", "UniqueCustomers")  # Explicit select
)


# Save as separate simple Delta tables in gold
monthly_revenue.write.format("delta").mode("overwrite").save(gold_path + "monthly_revenue")
top_products.write.format("delta").mode("overwrite").save(gold_path + "top_products")
top_customers.write.format("delta").mode("overwrite").save(gold_path + "top_customers")
daily_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .option("overwriteSchema", "true") \
    .save(f"{gold_path}/daily_summary")

assert monthly_revenue.count() > 0, "❌ monthly_revenue empty"
assert top_products.count() >= 10, "❌ insufficient top products"
assert top_customers.count() > 0, "❌ top customers empty"

# Preview
print("Monthly Revenue by Country (Top 10)")
monthly_revenue.limit(10).display()

print("Top Products")
top_products.display()

print("Top Customers")
top_customers.display()

print("Daily summary")
daily_summary.limit(5).display()


In [0]:
from pyspark.sql.functions import sum, col
silver_path = "abfss://ecommerce@retaildatalaketest.dfs.core.windows.net/silver/online_retail_silver"
gold_path = "abfss://ecommerce@retaildatalaketest.dfs.core.windows.net/gold/"

# Read silver
silver_df = spark.read.format("delta").load(silver_path)
gold_df = (
    silver_df
    .withColumn("TotalAmount", col("TotalAmount"))
    .groupBy("Country")
    .agg(sum("TotalAmount").alias("TotalRevenue"))
)

gold_df.write.format("delta").mode("overwrite").saveAsTable("gold")

gold_df.limit(5).display()
display(spark.sql("DESCRIBE EXTENDED gold"))