In [0]:
%run ./00_setup_and_config

In [None]:
import pyspark.sql.functions as F

In [0]:
df_silver_cdf = (
    spark.read
    .format("delta")
    .option("readChangeData", "true")
    .option("startingVersion", 0)
    .load(silver_delta_path)
)

### Key Metrics for each Customer
- Total Spend
- Distinct_Items_Purchased
- Total_Transactions
- First_Purchase_Date
- Last_Purchase_Date

In [None]:
changed_customers = (
    df_silver_cdf
    .filter(F.col("_change_type").isin("insert", "update_postimage", "delete"))
    .select("CustomerID")
    .distinct()
)

In [0]:
df_customer_stage = (
    spark.read.format("delta").load(silver_delta_path)
    .join(changed_customers, "CustomerID")
    .groupBy("CustomerID")
    .agg(
        F.sum("TotalPrice").alias("TotalSpend"),
        F.countDistinct("StockCode").alias("DistinctItemsPurchased"),
        F.countDistinct("InvoiceNo").alias("TotalTransactions"),
        F.min("InvoiceDate").alias("FirstPurchaseDate"),
        F.max("InvoiceDate").alias("LastPurchaseDate")
    )
)

In [0]:
df_customer_stage.createOrReplaceTempView("customer_stage")

spark.sql(f"""
MERGE INTO delta.`{gold_customer_path}` tgt
USING customer_stage src
ON tgt.CustomerID = src.CustomerID
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

### Key Metrics for each Product
- Total_Quantity_Sold
- Total_Product_Revenue
- Total_Unique_Orders_Containing_Product

In [0]:
changed_products = (
    df_silver_cdf
    .filter(F.col("_change_type").isin("insert", "update_postimage", "delete"))
    .select("StockCode")
    .distinct()
)

df_product_stage = (
    spark.read.format("delta").load(silver_delta_path)
    .join(changed_products, "StockCode")
    .groupBy("StockCode")
    .agg(
        F.sum("Quantity").alias("TotalQuantitySold"),
        F.sum("TotalPrice").alias("TotalProductRevenue"),
        F.countDistinct("InvoiceNo").alias("TotalUniqueOrdersContainingProduct")
    )
)

In [0]:
df_product_stage.createOrReplaceTempView("product_stage")

spark.sql(f"""
MERGE INTO delta.`{gold_product_path}` tgt
USING product_stage src
ON tgt.StockCode = src.StockCode
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

### Daily Revenue & Order Count Per Country

In [0]:
changed_dates = (
    df_silver_cdf
    .select("Invoice_Date", "Country")
    .distinct()
)

df_daily_stage = (
    spark.read.format("delta").load(silver_delta_path)
    .join(changed_dates, ["Invoice_Date", "Country"])
    .groupBy("Invoice_Date", "Country")
    .agg(
        F.sum("TotalPrice").alias("DailyRevenue"),
        F.countDistinct("InvoiceNo").alias("DailyOrderCount")
    )
)

In [0]:
df_daily_stage.createOrReplaceTempView("daily_stage")

spark.sql(f"""
MERGE INTO delta.`{gold_daily_sales_path}` tgt
USING daily_stage src
ON tgt.Invoice_Date = src.Invoice_Date
AND tgt.Country = src.Country
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")