In [0]:
# STEP 1 (REVISED): RAW INGESTION FROM UNITY CATALOG VOLUME → BRONZE TABLE (no cleaning)

from pyspark.sql import functions as F

# ==== ⬇️ FILL THESE IN WITH YOUR EXACT UC NAMES AND FILE NAME ====
CATALOG = "retail_demand_catalog"     # e.g., retail_demand_catalogue
SCHEMA  = "retail_demand_schema"        # e.g., retail_demand_schema  (check spelling)
VOLUME  = "retail_demand_volume"       # e.g., retail_demand_volume  (check spelling)
FILE    = "Retail_Transactions_Dataset.csv"            # e.g., the actual CSV file name in the volume
# =================================================================

SOURCE_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/{FILE}"
BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.transactions_bronze"

# Use the correct catalog & schema
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

# Read CSV as raw strings, preserve complex fields like Product lists and commas
df_raw = (
    spark.read
         .option("header", "true")
         .option("inferSchema", "false")
         .option("multiLine", "true")
         .option("quote", "\"")
         .option("escape", "\"")
         .option("mode", "PERMISSIVE")
         .csv(SOURCE_PATH)
)

# Standardize column names: trim + replace spaces with underscores
std_cols = [c.strip().replace(" ", "_") for c in df_raw.columns]
df = df_raw.toDF(*std_cols)

# Force all columns to string to preserve raw text
for c in df.columns:
    df = df.withColumn(c, F.col(c).cast("string"))

# Write a managed Delta BRONZE table in UC (idempotent overwrite for dev)
(df.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable(BRONZE_TABLE)
)

print("Bronze table created:", BRONZE_TABLE)
display(spark.table(BRONZE_TABLE).limit(10))


Bronze table created: retail_demand_catalog.retail_demand_schema.transactions_bronze


Transaction_ID,Date,Customer_Name,Product,Total_Items,Total_Cost,Payment_Method,City,Store_Type,Discount_Applied,Customer_Category,Season,Promotion
1000000000,2022-01-21 06:27:29,Stacey Price,"['Ketchup', 'Shaving Cream', 'Light Bulbs']",3,71.65,Mobile Payment,Los Angeles,Warehouse Club,True,Homemaker,Winter,
1000000001,2023-03-01 13:01:21,Michelle Carlson,"['Ice Cream', 'Milk', 'Olive Oil', 'Bread', 'Potatoes']",2,25.93,Cash,San Francisco,Specialty Store,True,Professional,Fall,BOGO (Buy One Get One)
1000000002,2024-03-21 15:37:04,Lisa Graves,['Spinach'],6,41.49,Credit Card,Houston,Department Store,True,Professional,Winter,
1000000003,2020-10-31 09:59:47,Mrs. Patricia May,"['Tissues', 'Mustard']",1,39.34,Mobile Payment,Chicago,Pharmacy,True,Homemaker,Spring,
1000000004,2020-12-10 00:59:59,Susan Mitchell,['Dish Soap'],10,16.42,Debit Card,Houston,Specialty Store,False,Young Adult,Winter,Discount on Selected Items
1000000005,2021-10-07 12:37:26,Joshua Frazier,"['Toothpaste', 'Chicken']",3,72.24,Cash,Houston,Supermarket,True,Retiree,Spring,Discount on Selected Items
1000000006,2023-01-08 10:40:03,Victoria Garrett,"['Honey', 'BBQ Sauce', 'Soda', 'Olive Oil', 'Garden Hose']",4,5.28,Cash,Boston,Specialty Store,False,Student,Summer,Discount on Selected Items
1000000007,2020-09-03 12:39:59,Sydney Waller,"['Syrup', 'Trash Cans', 'Pancake Mix', 'Water', 'Mayonnaise']",5,21.77,Debit Card,Chicago,Specialty Store,False,Young Adult,Winter,Discount on Selected Items
1000000008,2021-04-05 06:32:18,Kimberly Morgan,['Insect Repellent'],4,55.25,Mobile Payment,Los Angeles,Warehouse Club,False,Homemaker,Fall,
1000000009,2021-07-08 10:08:59,Lori Conway,"['Soap', 'Baby Wipes', 'Soda']",7,31.21,Mobile Payment,Boston,Convenience Store,True,Young Adult,Winter,


In [0]:
# STEP 2: BRONZE → SILVER
# - Parse Product string into array<string> as Product_Array
# - Recalculate Total_Items as Total_Items_Calculated (size(Product_Array))

from pyspark.sql import functions as F

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"  
BRONZE  = f"{CATALOG}.{SCHEMA}.transactions_bronze"
SILVER  = f"{CATALOG}.{SCHEMA}.transactions_silver"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

dfb = spark.table(BRONZE)

# Clean and parse the Product column which looks like: "['Item A', 'Item B']"
# Steps (inline): strip brackets, remove quotes, split by comma, trim whitespace.
# Handle NULL/empty/[] gracefully to produce an empty array.
prod_str = F.coalesce(F.col("Product"), F.lit(""))
no_brackets = F.regexp_replace(prod_str, r"^\s*\[|\]\s*$", "")           # remove leading '[' and trailing ']'
no_quotes   = F.regexp_replace(no_brackets, r"['\"]", "")                # remove both single/double quotes
trimmed     = F.trim(no_quotes)

empty_array = F.expr("CAST(array() AS array<string>)")
product_array = F.when(F.length(trimmed) == 0, empty_array) \
                 .otherwise(F.transform(F.split(trimmed, r"\s*,\s*"), lambda x: F.trim(x)))

dfs = (
    dfb
    .withColumn("Product_Array", product_array)
    .withColumn("Total_Items_Calculated", F.size(F.col("Product_Array")))
)

# Persist SILVER table (managed Delta)
(dfs.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(SILVER)
)

print("Silver table created:", SILVER)

# Quick validation preview (helps you see mismatches for debugging)
display(
    spark.table(SILVER)
         .select("Transaction_ID","Product","Product_Array","Total_Items","Total_Items_Calculated")
         .limit(20)
)


Silver table created: retail_demand_catalog.retail_demand_schema.transactions_silver


Transaction_ID,Product,Product_Array,Total_Items,Total_Items_Calculated
1000000000,"['Ketchup', 'Shaving Cream', 'Light Bulbs']","List(Ketchup, Shaving Cream, Light Bulbs)",3,3
1000000001,"['Ice Cream', 'Milk', 'Olive Oil', 'Bread', 'Potatoes']","List(Ice Cream, Milk, Olive Oil, Bread, Potatoes)",2,5
1000000002,['Spinach'],List(Spinach),6,1
1000000003,"['Tissues', 'Mustard']","List(Tissues, Mustard)",1,2
1000000004,['Dish Soap'],List(Dish Soap),10,1
1000000005,"['Toothpaste', 'Chicken']","List(Toothpaste, Chicken)",3,2
1000000006,"['Honey', 'BBQ Sauce', 'Soda', 'Olive Oil', 'Garden Hose']","List(Honey, BBQ Sauce, Soda, Olive Oil, Garden Hose)",4,5
1000000007,"['Syrup', 'Trash Cans', 'Pancake Mix', 'Water', 'Mayonnaise']","List(Syrup, Trash Cans, Pancake Mix, Water, Mayonnaise)",5,5
1000000008,['Insect Repellent'],List(Insect Repellent),4,1
1000000009,"['Soap', 'Baby Wipes', 'Soda']","List(Soap, Baby Wipes, Soda)",7,3


In [0]:
# STEP 3 (REVISED): Normalize Season from Date using try_to_timestamp (tolerant & supports seconds)

from pyspark.sql import functions as F

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"   # ensure spelling matches your environment
SILVER  = f"{CATALOG}.{SCHEMA}.transactions_silver"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

df = spark.table(SILVER)

# 1) Robust timestamp parsing with multiple patterns (with and without seconds)
patterns = [
    "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm",
    "dd-MM-yyyy HH:mm:ss", "dd-MM-yyyy HH:mm",
    "MM-dd-yyyy HH:mm:ss", "MM-dd-yyyy HH:mm",
    "MM/dd/yyyy HH:mm:ss", "MM/dd/yyyy HH:mm",
    "dd/MM/yyyy HH:mm:ss", "dd/MM/yyyy HH:mm",
    "yyyy/MM/dd HH:mm:ss", "yyyy/MM/dd HH:mm"
]

parsed_candidates = [F.expr(f"try_to_timestamp(Date, '{p}')") for p in patterns]
parsed_ts = F.coalesce(*parsed_candidates)

df = (
    df.withColumn("Event_Timestamp", parsed_ts)
      .withColumn("Event_Date", F.to_date(F.col("Event_Timestamp")))
)

# 2) Standardize original Season text
season_raw = F.trim(F.lower(F.coalesce(F.col("Season"), F.lit(""))))
season_std = (
    F.when(season_raw.rlike(r"^$|^none$|^na$|^n/a$|^unknown$"), F.lit(None).cast("string"))
     .when(season_raw.rlike(r"^win"), F.lit("Winter"))
     .when(season_raw.rlike(r"^spr"), F.lit("Spring"))
     .when(season_raw.rlike(r"^sum"), F.lit("Summer"))
     .when(season_raw.rlike(r"^fal|^aut"), F.lit("Fall"))   # map Autumn → Fall
     .otherwise(F.lit(None).cast("string"))
)

# 3) Derive season from month (Northern Hemisphere)
m = F.month(F.col("Event_Timestamp"))
season_derived = (
    F.when(m.isNull(), F.lit(None).cast("string"))
     .when(m.isin(12, 1, 2), F.lit("Winter"))
     .when(m.isin(3, 4, 5), F.lit("Spring"))
     .when(m.isin(6, 7, 8), F.lit("Summer"))
     .otherwise(F.lit("Fall"))
)

# 4) Choose normalized season + provenance
season_normalized = (
    F.when(F.col("Event_Timestamp").isNotNull(),
           F.when(season_std.eqNullSafe(season_derived), season_std).otherwise(season_derived))
     .otherwise(season_std)
)

season_source = (
    F.when(F.col("Event_Timestamp").isNull() & season_std.isNotNull(), F.lit("original_date_missing"))
     .when(F.col("Event_Timestamp").isNull() & season_std.isNull(), F.lit("unknown"))
     .when(season_std.eqNullSafe(season_derived), F.lit("original"))
     .otherwise(F.lit("corrected_from_date"))
)

df_out = (
    df.withColumn("Season_Standardized", season_std)
      .withColumn("Season_Derived", season_derived)
      .withColumn("Season_Normalized", season_normalized)
      .withColumn("Season_Source", season_source)
)

# 5) Persist back to the same Silver table
(df_out.write
      .format("delta")
      .mode("overwrite")
      .option("overwriteSchema", "true")
      .saveAsTable(SILVER)
)

print("Updated Silver table with robust season normalization:", SILVER)

# 6) Quick validation
display(
    spark.table(SILVER)
         .groupBy("Season_Source")
         .count()
         .orderBy(F.desc("count"))
)

display(
    spark.table(SILVER)
         .select("Transaction_ID", "Date", "Season", "Season_Standardized",
                 "Season_Derived", "Season_Normalized", "Season_Source")
         .where(F.col("Season_Source").isin("corrected_from_date", "original_date_missing"))
         .limit(30)
)

display(
    spark.table(SILVER)
         .filter(F.col("Event_Timestamp").isNull())
         .select("Transaction_ID", "Date", "Season", "City", "Store_Type")
         .limit(30)
)


Updated Silver table with robust season normalization: retail_demand_catalog.retail_demand_schema.transactions_silver


Season_Source,count
corrected_from_date,750358
original,249642


Transaction_ID,Date,Season,Season_Standardized,Season_Derived,Season_Normalized,Season_Source
1000000001,2023-03-01 13:01:21,Fall,Fall,Spring,Spring,corrected_from_date
1000000002,2024-03-21 15:37:04,Winter,Winter,Spring,Spring,corrected_from_date
1000000003,2020-10-31 09:59:47,Spring,Spring,Fall,Fall,corrected_from_date
1000000005,2021-10-07 12:37:26,Spring,Spring,Fall,Fall,corrected_from_date
1000000006,2023-01-08 10:40:03,Summer,Summer,Winter,Winter,corrected_from_date
1000000007,2020-09-03 12:39:59,Winter,Winter,Fall,Fall,corrected_from_date
1000000008,2021-04-05 06:32:18,Fall,Fall,Spring,Spring,corrected_from_date
1000000009,2021-07-08 10:08:59,Winter,Winter,Summer,Summer,corrected_from_date
1000000010,2020-03-18 18:58:18,Fall,Fall,Spring,Spring,corrected_from_date
1000000011,2023-03-30 19:26:41,Fall,Fall,Spring,Spring,corrected_from_date


Transaction_ID,Date,Season,City,Store_Type


In [0]:
# STEP 4 (Simplified): Replace Total_Items with computed size(Product_Array)

from pyspark.sql import functions as F

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
SILVER  = f"{CATALOG}.{SCHEMA}.transactions_silver"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

df = spark.table(SILVER)

# Replace Total_Items with computed size(Product_Array)
df_out = (
    df.drop("Total_Items")  # remove old inaccurate column
      .withColumn("Total_Items", F.size(F.col("Product_Array")))
      .drop("Total_Items_Calculated")  # remove intermediate column
)

# Persist back to the same Silver table
(df_out.write
      .format("delta")
      .mode("overwrite")
      .option("overwriteSchema", "true")
      .saveAsTable(SILVER)
)

print("Silver table updated: Total_Items now computed from Product_Array")
display(spark.table(SILVER).select("Transaction_ID","Product_Array","Total_Items").limit(20))


Silver table updated: Total_Items now computed from Product_Array


Transaction_ID,Product_Array,Total_Items
1000000000,"List(Ketchup, Shaving Cream, Light Bulbs)",3
1000000001,"List(Ice Cream, Milk, Olive Oil, Bread, Potatoes)",5
1000000002,List(Spinach),1
1000000003,"List(Tissues, Mustard)",2
1000000004,List(Dish Soap),1
1000000005,"List(Toothpaste, Chicken)",2
1000000006,"List(Honey, BBQ Sauce, Soda, Olive Oil, Garden Hose)",5
1000000007,"List(Syrup, Trash Cans, Pancake Mix, Water, Mayonnaise)",5
1000000008,List(Insect Repellent),1
1000000009,"List(Soap, Baby Wipes, Soda)",3


In [0]:
# STEP 5: Type casting & normalization in one action

from pyspark.sql import functions as F, types as T

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
SILVER  = f"{CATALOG}.{SCHEMA}.transactions_silver"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

df = spark.table(SILVER)

# --- 1) Generic trim for all string-typed columns ---
for c, t in df.dtypes:
    if t == "string":
        df = df.withColumn(c, F.trim(F.col(c)))

# --- 2) Product_Array: ensure each element is trimmed (keep product names as-is otherwise) ---
if "Product_Array" in df.columns:
    df = df.withColumn(
        "Product_Array",
        F.when(
            F.col("Product_Array").isNotNull(),
            F.transform(F.col("Product_Array"), lambda x: F.trim(x))
        ).otherwise(F.col("Product_Array"))
    )

# --- 3) Total_Cost -> double (strip currency symbols/commas/whitespace) ---
# Remove everything except digits, dot, minus
if "Total_Cost" in df.columns:
    clean_cost = F.regexp_replace(F.col("Total_Cost"), r"[^0-9\.\-]", "")
    df = df.withColumn("Total_Cost", clean_cost.cast("double"))

# --- 4) Discount_Applied -> boolean (map common truthy/falsey) ---
if "Discount_Applied" in df.columns:
    da = F.lower(F.col("Discount_Applied"))
    df = df.withColumn(
        "Discount_Applied",
        F.when(da.isNull() | (da == ""), F.lit(None).cast("boolean"))
         .when(da.rlike(r"^(true|t|yes|y|1)$"), F.lit(True))
         .when(da.rlike(r"^(false|f|no|n|0)$"), F.lit(False))
         .otherwise(F.lit(None).cast("boolean"))
    )

# --- 5) Normalize Payment_Method into a standard set ---
# Standard mapping: Cash | Credit Card | Debit Card | Mobile Payment
def normalize_payment(col):
    lc = F.lower(col)
    return (
        F.when(lc.rlike(r"^cash$"), "Cash")
         .when(lc.rlike(r"^(credit\s*card|cc|credit)$"), "Credit Card")
         .when(lc.rlike(r"^(debit\s*card|debit)$"), "Debit Card")
         .when(lc.rlike(r"^(mobile(\s*pay(ment)?)?|apple\s*pay|google\s*pay|upi)$"), "Mobile Payment")
         .otherwise(F.initcap(F.coalesce(col, F.lit(""))))
    )

if "Payment_Method" in df.columns:
    df = df.withColumn("Payment_Method", normalize_payment(F.col("Payment_Method")))

# --- 6) Normalize City / Store_Type / Customer_Category / Promotion (trim + title case or mapped) ---
if "City" in df.columns:
    df = df.withColumn("City", F.when(F.col("City").isNotNull(), F.initcap(F.col("City"))).otherwise(F.col("City")))

if "Store_Type" in df.columns:
    df = df.withColumn("Store_Type", F.when(F.col("Store_Type").isNotNull(), F.initcap(F.col("Store_Type"))).otherwise(F.col("Store_Type")))

if "Customer_Category" in df.columns:
    df = df.withColumn("Customer_Category", F.when(F.col("Customer_Category").isNotNull(), F.initcap(F.col("Customer_Category"))).otherwise(F.col("Customer_Category")))

# Promotion mapping: None | BOGO | Discount on Selected Items | (else Title Case)
if "Promotion" in df.columns:
    promo_lc = F.lower(F.col("Promotion"))
    df = df.withColumn(
        "Promotion",
        F.when(promo_lc.rlike(r"^(none|na|n/a|null|no promo)$"), F.lit("None"))
         .when(promo_lc.rlike(r"^bogo"), F.lit("BOGO"))
         .when(promo_lc.rlike(r"discount"), F.lit("Discount on Selected Items"))
         .otherwise(F.initcap(F.coalesce(F.col("Promotion"), F.lit(""))))
    )

# --- 7) Season: keep only ONE canonical column ---
# Use Season_Normalized as Season; drop Season, Season_Standardized, Season_Derived, Season_Source
if "Season_Normalized" in df.columns:
    df = df.drop("Season") if "Season" in df.columns else df
    df = df.withColumnRenamed("Season_Normalized", "Season")
    drop_cols = [c for c in ["Season_Standardized", "Season_Derived", "Season_Source"] if c in df.columns]
    df = df.drop(*drop_cols)

# --- 8) Transaction_ID: ensure string (avoid scientific notation); Date columns already created in Step 3 ---
if "Transaction_ID" in df.columns:
    df = df.withColumn("Transaction_ID", F.col("Transaction_ID").cast("string"))

# --- 9) Persist back to the same SILVER table ---
(df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(SILVER)
)

print("Silver table normalized with canonical types and values:", SILVER)

# --- 10) Quick sanity: show schema + sample ---
print("Schema:")
print(spark.table(SILVER).printSchema())

display(
    spark.table(SILVER)
         .select("Transaction_ID","Event_Timestamp","City","Store_Type","Payment_Method",
                 "Discount_Applied","Total_Items","Total_Cost","Season","Promotion")
         .limit(20)
)


Silver table normalized with canonical types and values: retail_demand_catalog.retail_demand_schema.transactions_silver
Schema:
root
 |-- Transaction_ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Store_Type: string (nullable = true)
 |-- Discount_Applied: boolean (nullable = true)
 |-- Customer_Category: string (nullable = true)
 |-- Promotion: string (nullable = true)
 |-- Product_Array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Event_Timestamp: timestamp (nullable = true)
 |-- Event_Date: date (nullable = true)
 |-- Season: string (nullable = true)
 |-- Total_Items: integer (nullable = true)

None


Transaction_ID,Event_Timestamp,City,Store_Type,Payment_Method,Discount_Applied,Total_Items,Total_Cost,Season,Promotion
1000000000,2022-01-21T06:27:29.000Z,Los Angeles,Warehouse Club,Mobile Payment,True,3,71.65,Winter,
1000000001,2023-03-01T13:01:21.000Z,San Francisco,Specialty Store,Cash,True,5,25.93,Spring,BOGO
1000000002,2024-03-21T15:37:04.000Z,Houston,Department Store,Credit Card,True,1,41.49,Spring,
1000000003,2020-10-31T09:59:47.000Z,Chicago,Pharmacy,Mobile Payment,True,2,39.34,Fall,
1000000004,2020-12-10T00:59:59.000Z,Houston,Specialty Store,Debit Card,False,1,16.42,Winter,Discount on Selected Items
1000000005,2021-10-07T12:37:26.000Z,Houston,Supermarket,Cash,True,2,72.24,Fall,Discount on Selected Items
1000000006,2023-01-08T10:40:03.000Z,Boston,Specialty Store,Cash,False,5,5.28,Winter,Discount on Selected Items
1000000007,2020-09-03T12:39:59.000Z,Chicago,Specialty Store,Debit Card,False,5,21.77,Fall,Discount on Selected Items
1000000008,2021-04-05T06:32:18.000Z,Los Angeles,Warehouse Club,Mobile Payment,False,1,55.25,Spring,
1000000009,2021-07-08T10:08:59.000Z,Boston,Convenience Store,Mobile Payment,True,3,31.21,Summer,


In [0]:
# STEP 6 (CORRECTED): Build Gold Star Schema (dimensions + fact) WITHOUT line_amount

from pyspark.sql import functions as F, types as T

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
SILVER  = f"{CATALOG}.{SCHEMA}.transactions_silver"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

silver = spark.table(SILVER)

# Ensure Event_Date exists for dimensionality
silver = silver.withColumn("Event_Date", F.to_date("Event_Timestamp"))

# ==========================
# DIMENSIONS
# ==========================

# dim_date
m = F.month("Event_Date")
season_from_month = (
    F.when(m.isNull(), F.lit(None).cast("string"))
     .when(m.isin(12,1,2), "Winter")
     .when(m.isin(3,4,5), "Spring")
     .when(m.isin(6,7,8), "Summer")
     .otherwise("Fall")
)

dim_date = (
    silver
      .select("Event_Date")
      .where(F.col("Event_Date").isNotNull())
      .distinct()
      .withColumn("date_key", F.date_format("Event_Date", "yyyyMMdd").cast("int"))
      .withColumn("year", F.year("Event_Date"))
      .withColumn("quarter", F.quarter("Event_Date"))
      .withColumn("month", F.month("Event_Date"))
      .withColumn("day", F.dayofmonth("Event_Date"))
      .withColumn("week_of_year", F.weekofyear("Event_Date"))
      .withColumn("day_name", F.date_format("Event_Date", "EEEE"))
      .withColumn("month_name", F.date_format("Event_Date", "MMMM"))
      .withColumn("is_weekend", F.col("day_name").isin("Saturday","Sunday"))
      .withColumn("season", season_from_month)
      .select("date_key","Event_Date","year","quarter","month","day","week_of_year","day_name","month_name","is_weekend","season")
)
dim_date.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.dim_date")

# dim_product
dim_product = (
    silver
      .select(F.explode_outer("Product_Array").alias("product_name"))
      .where(F.col("product_name").isNotNull() & (F.col("product_name") != ""))
      .distinct()
      .withColumn("product_key", F.md5(F.lower(F.col("product_name"))))
      .select("product_key","product_name")
)
dim_product.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.dim_product")

# dim_store
dim_store = (
    silver
      .select(
          F.initcap(F.col("City")).alias("city"),
          F.initcap(F.col("Store_Type")).alias("store_type")
      )
      .where(F.col("city").isNotNull() & F.col("store_type").isNotNull())
      .distinct()
      .withColumn("store_key", F.md5(F.lower(F.concat_ws("|", "city", "store_type"))))
      .select("store_key","city","store_type")
)
dim_store.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.dim_store")

# dim_customer
dim_customer = (
    silver
      .select(
          F.col("Customer_Name").alias("customer_name"),
          F.initcap(F.col("Customer_Category")).alias("customer_category")
      )
      .where(F.col("customer_name").isNotNull())
      .distinct()
      .withColumn("customer_key", F.md5(F.lower(F.concat_ws("|", "customer_name", "customer_category"))))
      .select("customer_key","customer_name","customer_category")
)
dim_customer.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.dim_customer")

# dim_promotion
dim_promotion = (
    silver
      .select(
          F.when(F.lower("Promotion").rlike(r"^(none|na|n/a|null|no promo)$"), F.lit("None"))
           .otherwise(F.col("Promotion")).alias("promotion"),
          F.col("Discount_Applied").alias("discount_applied")
      )
      .distinct()
      .withColumn("promotion_key",
                  F.md5(F.lower(F.concat_ws("|",
                                            "promotion",
                                            F.coalesce(F.col("discount_applied").cast("string"), F.lit("null"))))))
      .select("promotion_key","promotion","discount_applied")
)
dim_promotion.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.dim_promotion")

# ==========================
# FACT (NO line_amount)
# ==========================

# We only keep rows where product_name is present to represent a product sale line
fact = (
    silver
      .where(F.col("Event_Date").isNotNull())
      .select(
          F.col("Transaction_ID").alias("transaction_id"),
          F.col("Event_Timestamp"),
          F.col("Event_Date"),
          F.initcap(F.col("City")).alias("City"),
          F.initcap(F.col("Store_Type")).alias("Store_Type"),
          F.col("Customer_Name"),
          F.initcap(F.col("Customer_Category")).alias("Customer_Category"),
          F.col("Promotion"),
          F.col("Discount_Applied"),
          F.col("Payment_Method"),
          F.explode_outer("Product_Array").alias("product_name")
      )
      .where(F.col("product_name").isNotNull() & (F.col("product_name") != ""))
      .withColumn("date_key", F.date_format("Event_Date","yyyyMMdd").cast("int"))
      .withColumn("product_key", F.md5(F.lower(F.col("product_name"))))
      .withColumn("store_key", F.md5(F.lower(F.concat_ws("|", F.col("City"), F.col("Store_Type")))))
      .withColumn("customer_key", F.md5(F.lower(F.concat_ws("|", "Customer_Name", "Customer_Category"))))
      .withColumn("promotion_norm",
                  F.when(F.lower("Promotion").rlike(r"^(none|na|n/a|null|no promo)$"), F.lit("None"))
                   .otherwise(F.col("Promotion")))
      .withColumn("promotion_key",
                  F.md5(F.lower(F.concat_ws("|",
                                            "promotion_norm",
                                            F.coalesce(F.col("Discount_Applied").cast("string"), F.lit("null"))))))
      .withColumn("quantity", F.lit(1))
      .select(
          "transaction_id","date_key","product_key","store_key","customer_key","promotion_key",
          "quantity",
          "Event_Timestamp","Payment_Method","Discount_Applied"
      )
)

fact.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.fact_sales")

print("Gold star schema created/updated WITHOUT line_amount:")
print(f" - {CATALOG}.{SCHEMA}.dim_date")
print(f" - {CATALOG}.{SCHEMA}.dim_product")
print(f" - {CATALOG}.{SCHEMA}.dim_store")
print(f" - {CATALOG}.{SCHEMA}.dim_customer")
print(f" - {CATALOG}.{SCHEMA}.dim_promotion")
print(f" - {CATALOG}.{SCHEMA}.fact_sales")

# Quick sanity checks
display(spark.table(f"{CATALOG}.{SCHEMA}.dim_date").orderBy("Event_Date").limit(10))
display(spark.table(f"{CATALOG}.{SCHEMA}.dim_product").limit(10))
display(spark.table(f"{CATALOG}.{SCHEMA}.dim_store").limit(10))
display(spark.table(f"{CATALOG}.{SCHEMA}.dim_customer").limit(10))
display(spark.table(f"{CATALOG}.{SCHEMA}.dim_promotion").limit(10))
display(spark.table(f"{CATALOG}.{SCHEMA}.fact_sales").limit(20))


Gold star schema created/updated WITHOUT line_amount:
 - retail_demand_catalog.retail_demand_schema.dim_date
 - retail_demand_catalog.retail_demand_schema.dim_product
 - retail_demand_catalog.retail_demand_schema.dim_store
 - retail_demand_catalog.retail_demand_schema.dim_customer
 - retail_demand_catalog.retail_demand_schema.dim_promotion
 - retail_demand_catalog.retail_demand_schema.fact_sales


date_key,Event_Date,year,quarter,month,day,week_of_year,day_name,month_name,is_weekend,season
20200101,2020-01-01,2020,1,1,1,1,Wednesday,January,False,Winter
20200102,2020-01-02,2020,1,1,2,1,Thursday,January,False,Winter
20200103,2020-01-03,2020,1,1,3,1,Friday,January,False,Winter
20200104,2020-01-04,2020,1,1,4,1,Saturday,January,True,Winter
20200105,2020-01-05,2020,1,1,5,1,Sunday,January,True,Winter
20200106,2020-01-06,2020,1,1,6,2,Monday,January,False,Winter
20200107,2020-01-07,2020,1,1,7,2,Tuesday,January,False,Winter
20200108,2020-01-08,2020,1,1,8,2,Wednesday,January,False,Winter
20200109,2020-01-09,2020,1,1,9,2,Thursday,January,False,Winter
20200110,2020-01-10,2020,1,1,10,2,Friday,January,False,Winter


product_key,product_name
992225258223be743b7dda0aff0c07e4,Vacuum Cleaner
8a88e13a0f98a69147202be3d299decf,Light Bulbs
777c75804b0ea9caddc24eee229e7126,Feminine Hygiene Products
76aba95cba4ed1280a1e567b178f58a2,Spinach
b7208774faea1d155062bdb773b5f221,Mop
53f8d64113036808686b056cc0b0f649,Tomatoes
a2ec80d0f293d905a85479b63eb7b914,Cereal Bars
9460370bb0ca1c98a779b1bcc6861c2c,Water
180fd182f9a0742f483619781ccc36c4,Salmon
e041c9fb6512a32b495e94782d0fc933,Trash Bags


store_key,city,store_type
81caee20c83fa3a044690f1160162975,San Francisco,Pharmacy
d75c781b099c6a8ee52a21e73eaa5c2a,Houston,Department Store
5aa48a4b3b693c59681ad61754798678,Seattle,Specialty Store
38a65553ea4724998402d8f0b70974da,Houston,Warehouse Club
c35f7b48a76d8ba8a14ecca8e3eba5f6,Miami,Supermarket
cfaec02aa1c5df5531c17e731cb0375f,Miami,Warehouse Club
3552d371a620fc7a3215216fe7c615e3,Atlanta,Specialty Store
e1cf036161b05e7832544ed6469b6545,San Francisco,Department Store
05fca3dbda658ee8db3d8ab483e9bf77,Miami,Pharmacy
fea215c9e971fb3b8fa1df6463a2b098,Los Angeles,Department Store


customer_key,customer_name,customer_category
b67811230f43589a79d86f203ba80d82,Lisa Graves,Professional
dd069750a6b5ca79eb7d376f6466d9bb,Victor Weeks,Teenager
984e2486c829d90cad3654b5c0b640f2,Krystal Mosley,Young Adult
31de3c80172c9db864002b093641a049,Joseph Pollard,Retiree
910a18d92dc903bf256fe79a75e9cd06,Joshua Williams,Professional
d8fcde8bc749c119ad5e95b836e448c6,David Welch,Middle-aged
0f4a7d846920f017add5f758ac5a6a9e,Tyler Owens,Retiree
5168f0f2c4250b9d4239d9cbf5069a69,Jasmine Miller,Professional
6cee5330a0a60c29546115d5c0a295e9,Maria Coleman,Professional
0afcf4c312383747cd2ddfe5a4aabd18,Alexa Jones,Retiree


promotion_key,promotion,discount_applied
5ac97c80320c88f1f4542ddcd63c23a2,BOGO,False
6cd3f2b8c9d131b596ec69f431cbe39e,,False
0697121004a9dcf6a42e53aad128ec3d,BOGO,True
87e534bee9303dd883373eae7ba128db,,True
3d48e76c1ff3987490d0948eef7cd051,Discount on Selected Items,False
60a5d6c8b702ad81a4e23b9db0356478,Discount on Selected Items,True


transaction_id,date_key,product_key,store_key,customer_key,promotion_key,quantity,Event_Timestamp,Payment_Method,Discount_Applied
1000000000,20220121,a2c9302603f371c4044b603fa8d96345,8cbceb80c3d1c19c60c3102bdb800f93,30d87b88ccd86bbca9def74cfd39efa8,87e534bee9303dd883373eae7ba128db,1,2022-01-21T06:27:29.000Z,Mobile Payment,True
1000000000,20220121,fb4daf5da9f923cc5b85633e917ea585,8cbceb80c3d1c19c60c3102bdb800f93,30d87b88ccd86bbca9def74cfd39efa8,87e534bee9303dd883373eae7ba128db,1,2022-01-21T06:27:29.000Z,Mobile Payment,True
1000000000,20220121,8a88e13a0f98a69147202be3d299decf,8cbceb80c3d1c19c60c3102bdb800f93,30d87b88ccd86bbca9def74cfd39efa8,87e534bee9303dd883373eae7ba128db,1,2022-01-21T06:27:29.000Z,Mobile Payment,True
1000000001,20230301,2d6cfc502a0f108b5f431a1e7aaac603,6bcb636114ea469b218629aeebbeea1e,e2fdc9ba2b777721950dbd76e201c417,0697121004a9dcf6a42e53aad128ec3d,1,2023-03-01T13:01:21.000Z,Cash,True
1000000001,20230301,ecbdb882ae865a07d87611437fda0772,6bcb636114ea469b218629aeebbeea1e,e2fdc9ba2b777721950dbd76e201c417,0697121004a9dcf6a42e53aad128ec3d,1,2023-03-01T13:01:21.000Z,Cash,True
1000000001,20230301,6e95889f3017bc7f1ababe3e9454c4ed,6bcb636114ea469b218629aeebbeea1e,e2fdc9ba2b777721950dbd76e201c417,0697121004a9dcf6a42e53aad128ec3d,1,2023-03-01T13:01:21.000Z,Cash,True
1000000001,20230301,dba7b12a19fe9d49fbb53d65c49bbce6,6bcb636114ea469b218629aeebbeea1e,e2fdc9ba2b777721950dbd76e201c417,0697121004a9dcf6a42e53aad128ec3d,1,2023-03-01T13:01:21.000Z,Cash,True
1000000001,20230301,e1a1f0be30cefc7d857ea6408cd00a12,6bcb636114ea469b218629aeebbeea1e,e2fdc9ba2b777721950dbd76e201c417,0697121004a9dcf6a42e53aad128ec3d,1,2023-03-01T13:01:21.000Z,Cash,True
1000000002,20240321,76aba95cba4ed1280a1e567b178f58a2,d75c781b099c6a8ee52a21e73eaa5c2a,b67811230f43589a79d86f203ba80d82,87e534bee9303dd883373eae7ba128db,1,2024-03-21T15:37:04.000Z,Credit Card,True
1000000003,20201031,79654ca5ba0bc2dbc0ee54a48bab4e31,fc575f87f6c5ba0688bf82a83026eaf3,6dffb2e12c9b1c9fbc2e42d16a79c69f,87e534bee9303dd883373eae7ba128db,1,2020-10-31T09:59:47.000Z,Mobile Payment,True


In [0]:
# STEP 7: Create daily product demand aggregate for forecasting

from pyspark.sql import functions as F

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
FACT    = f"{CATALOG}.{SCHEMA}.fact_sales"
DIM_PRODUCT = f"{CATALOG}.{SCHEMA}.dim_product"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

fact = spark.table(FACT)
dim_product = spark.table(DIM_PRODUCT)

# Join to get product_name for readability
fact_with_product = fact.join(dim_product, "product_key", "left")

# Aggregate: daily product demand
daily_product_demand = (
    fact_with_product
      .groupBy("date_key", "product_key", "product_name")
      .agg(F.sum("quantity").alias("total_quantity"))
      .withColumn("Event_Date", F.to_date(F.col("date_key").cast("string"), "yyyyMMdd"))
      .orderBy("Event_Date", "product_name")
)

# Save as Gold table
daily_product_demand.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.gold_daily_product_demand")

print("Gold aggregate created: gold_daily_product_demand")
display(spark.table(f"{CATALOG}.{SCHEMA}.gold_daily_product_demand").limit(20))


Gold aggregate created: gold_daily_product_demand


date_key,product_key,product_name,total_quantity,Event_Date
20200101,e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,21,2020-01-01
20200101,1f3870be274f6c49b3e31a0c6728957f,Apple,26,2020-01-01
20200101,ccd5280da762316e6587b75669341f5b,BBQ Sauce,25,2020-01-01
20200101,2a92a853a070ee65ee84c2628b4f25fa,Baby Wipes,21,2020-01-01
20200101,72b302bf297a228a75730123efef7c41,Banana,24,2020-01-01
20200101,1412835889f85cf13d127fe1b69bc046,Bath Towels,24,2020-01-01
20200101,34902903de8d4fee8e6afe868982f0dd,Beef,19,2020-01-01
20200101,dba7b12a19fe9d49fbb53d65c49bbce6,Bread,18,2020-01-01
20200101,5bfab488f433de37ce0c779dcf2074cb,Broom,20,2020-01-01
20200101,d74fdde2944f475adc4a85e349d4ee7b,Butter,19,2020-01-01


In [0]:
# STEP 8: Visualization — Daily demand per product (interactive with widgets)

from pyspark.sql import functions as F
from pyspark.sql.window import Window
import plotly.express as px

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
GOLD_DAILY = f"{CATALOG}.{SCHEMA}.gold_daily_product_demand"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

daily = spark.table(GOLD_DAILY)

# ------- Widgets (choose product & time window) -------
# Default product = highest total demand overall
default_product_row = (
    daily.groupBy("product_name")
         .agg(F.sum("total_quantity").alias("qty"))
         .orderBy(F.desc("qty"))
         .limit(1)
         .collect()
)
default_product = default_product_row[0]["product_name"] if default_product_row else ""

dbutils.widgets.removeAll()
dbutils.widgets.text("product_name", default_product, "Product to plot")
dbutils.widgets.dropdown("period", "all", ["all", "last_30d", "last_90d", "last_365d"], "Period")

product = dbutils.widgets.get("product_name")
period  = dbutils.widgets.get("period")

# ------- Filter for the selected product -------
df = daily.filter(F.col("product_name") == product)

# Handle empty selection gracefully
if df.limit(1).count() == 0:
    displayHTML(f"<h3>No data found for product: <code>{product}</code></h3>")
else:
    # Optional time filtering
    max_date = df.agg(F.max("Event_Date")).collect()[0][0]
    if max_date and period != "all":
        days_map = {"last_30d": 30, "last_90d": 90, "last_365d": 365}
        start_days = days_map.get(period, 365)
        df = df.filter(F.col("Event_Date") >= F.date_sub(F.lit(max_date), start_days))

    # 7-day moving average over ordered dates
    w = Window.orderBy("Event_Date").rowsBetween(-6, 0)
    df = (df
          .orderBy("Event_Date")
          .withColumn("ma7", F.avg("total_quantity").over(w)))

    # Convert to pandas for Plotly (fine for CE/sample data)
    pdf = df.select("Event_Date", "total_quantity", "ma7").toPandas()

    # Plotly line chart
    fig = px.line(
        pdf, x="Event_Date",
        y=["total_quantity", "ma7"],
        labels={"value": "Quantity", "variable": "Series", "Event_Date": "Date"},
        title=f"Daily Demand for {product}"
    )
    fig.update_traces(mode="lines")
    fig.update_layout(legend_title_text="Series")
    displayHTML(fig.to_html(include_plotlyjs="cdn"))





WARN WindowExpression: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.



In [0]:
# STEP 8 (Extended) - REVISED: Multi-viz dashboard without cache/persist

from pyspark.sql import functions as F, Window
import plotly.express as px
import plotly.graph_objects as go

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
FACT    = f"{CATALOG}.{SCHEMA}.fact_sales"
D_DATE  = f"{CATALOG}.{SCHEMA}.dim_date"
D_PROD  = f"{CATALOG}.{SCHEMA}.dim_product"
D_STORE = f"{CATALOG}.{SCHEMA}.dim_store"
D_PROMO = f"{CATALOG}.{SCHEMA}.dim_promotion"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

f = spark.table(FACT)
dd = spark.table(D_DATE)
dp = spark.table(D_PROD)
ds = spark.table(D_STORE)
dpr = spark.table(D_PROMO)

# -------- Widgets --------
max_date_key = f.agg(F.max("date_key")).collect()[0][0]
default_period = "all"

dbutils.widgets.removeAll()
dbutils.widgets.dropdown("period", default_period, ["all", "last_30d", "last_90d", "last_365d"], "Period")
dbutils.widgets.text("season", "All", "Season (All/Winter/Spring/Summer/Fall)")
dbutils.widgets.text("city", "All", "City (All or exact match)")
dbutils.widgets.text("top_n", "10", "Top/Bottom N")

period = dbutils.widgets.get("period")
season_filter = dbutils.widgets.get("season").strip()
city_filter = dbutils.widgets.get("city").strip()
top_n = int(dbutils.widgets.get("top_n") or "10")

# -------- Base joined frame with filters (no cache) --------
base = (
    f.join(dd.select("date_key","Event_Date","year","month","month_name","day_name","season"), "date_key", "left")
     .join(dp, "product_key", "left")
     .join(ds, "store_key", "left")
     .join(dpr.select("promotion_key","promotion"), "promotion_key", "left")
     .withColumn("Event_Date", F.to_date("Event_Date"))
)

# Time window filter
if period != "all" and max_date_key:
    max_date = base.agg(F.max("Event_Date")).collect()[0][0]
    if max_date:
        days_map = {"last_30d": 30, "last_90d": 90, "last_365d": 365}
        start_days = days_map.get(period, 365)
        base = base.filter(F.col("Event_Date") >= F.date_sub(F.lit(max_date), start_days))

# Season filter
if season_filter and season_filter.lower() != "all":
    base = base.filter(F.col("season") == F.initcap(F.lit(season_filter)))

# City filter
if city_filter and city_filter.lower() != "all":
    base = base.filter(F.col("city") == F.initcap(F.lit(city_filter)))

# ========== 1) TOP & BOTTOM SELLING PRODUCTS ==========
prod_agg = (base.groupBy("product_key","product_name")
                 .agg(F.sum("quantity").alias("qty"))
                 .orderBy(F.desc("qty")))

top_products = prod_agg.limit(top_n)
bottom_products = (prod_agg.orderBy("qty")
                            .filter(F.col("qty") > 0)
                            .limit(top_n))

pdf_top = top_products.toPandas()
pdf_bottom = bottom_products.toPandas()

fig_top = px.bar(pdf_top, x="product_name", y="qty", title=f"Top {top_n} Products by Quantity",
                 labels={"qty":"Total Quantity","product_name":"Product"})
fig_top.update_layout(xaxis_tickangle=-30, height=420)
displayHTML(fig_top.to_html(include_plotlyjs="cdn"))

fig_bottom = px.bar(pdf_bottom, x="product_name", y="qty", title=f"Bottom {top_n} Products by Quantity (qty>0)",
                    labels={"qty":"Total Quantity","product_name":"Product"})
fig_bottom.update_layout(xaxis_tickangle=-30, height=420)
displayHTML(fig_bottom.to_html(include_plotlyjs="cdn"))

# ========== 2) SEASON-WISE SELLERS ==========
season_tot = (base.groupBy("season")
                   .agg(F.sum("quantity").alias("qty"))
                   .orderBy(F.desc("qty")))
pdf_season_tot = season_tot.toPandas()
fig_season_tot = px.bar(pdf_season_tot, x="season", y="qty", title="Demand by Season (Total)",
                        labels={"qty":"Total Quantity","season":"Season"})
displayHTML(fig_season_tot.to_html(include_plotlyjs="cdn"))

w = Window.partitionBy("season").orderBy(F.desc("qty"))
season_prod = (base.groupBy("season","product_name")
                    .agg(F.sum("quantity").alias("qty")))
season_topN = (season_prod.withColumn("rn", F.row_number().over(w))
                         .filter(F.col("rn") <= top_n)
                         .orderBy("season", F.desc("qty")))
pdf_season_top = season_topN.select(
    F.concat_ws(" — ", "season", "product_name").alias("season_product"),
    "qty"
).toPandas()

fig_season_top = px.bar(pdf_season_top, x="season_product", y="qty",
                        title=f"Top {top_n} Products Within Each Season",
                        labels={"season_product":"Season — Product","qty":"Quantity"})
fig_season_top.update_layout(xaxis_tickangle=-30, height=500)
displayHTML(fig_season_top.to_html(include_plotlyjs="cdn"))

# ========== 3) MOST DISCOUNT-APPLIED PRODUCTS & PROMO CONVERSION ==========
disc_by_prod = (base.groupBy("product_name")
                     .agg(F.sum(F.when(F.col("Discount_Applied") == True, 1).otherwise(0)).alias("discounted_lines"),
                          F.count("*").alias("total_lines"))
                     .withColumn("discount_rate", F.round(F.col("discounted_lines")/F.col("total_lines"), 3))
                     .orderBy(F.desc("discounted_lines"))
                     .limit(top_n))

pdf_disc_prod = disc_by_prod.toPandas()
fig_disc_prod = px.bar(pdf_disc_prod, x="product_name", y="discounted_lines",
                       title=f"Top {top_n} Products by Discount Applications",
                       labels={"discounted_lines":"Discounted Lines","product_name":"Product"})
fig_disc_prod.update_layout(xaxis_tickangle=-30, height=420)
displayHTML(fig_disc_prod.to_html(include_plotlyjs="cdn"))

promo_conv = (base.groupBy("promotion")
                   .agg(F.sum(F.when(F.col("Discount_Applied") == True, 1).otherwise(0)).alias("discounted_lines"),
                        F.count("*").alias("total_lines"))
                   .withColumn("conversion_rate", F.round(F.col("discounted_lines")/F.col("total_lines"), 3))
                   .orderBy(F.desc("conversion_rate")))
pdf_promo = promo_conv.toPandas()
fig_promo = px.bar(pdf_promo, x="promotion", y="conversion_rate",
                   title="Promotion Conversion Rate (Discount Applied %)",
                   labels={"conversion_rate":"Rate","promotion":"Promotion"})
fig_promo.update_layout(xaxis_tickangle=-20, height=420)
displayHTML(fig_promo.to_html(include_plotlyjs="cdn"))

# ========== 4) CITY × MONTH HEATMAP ==========
city_month = (base.groupBy("city","month","month_name")
                   .agg(F.sum("quantity").alias("qty")))
pdf_cm = city_month.orderBy("city","month").toPandas()
if not pdf_cm.empty:
    pivot = pdf_cm.pivot(index="city", columns="month_name", values="qty").fillna(0)
    month_order = (pdf_cm[["month","month_name"]].drop_duplicates().sort_values("month")["month_name"].tolist())
    pivot = pivot.reindex(columns=month_order)
    fig_heat = go.Figure(data=go.Heatmap(
        z=pivot.values,
        x=pivot.columns,
        y=pivot.index,
        colorscale="Blues",
        colorbar_title="Qty"
    ))
    fig_heat.update_layout(title="City × Month Demand Heatmap", height=520)
    displayHTML(fig_heat.to_html(include_plotlyjs="cdn"))
else:
    displayHTML("<p><em>No data for City × Month heatmap after filters.</em></p>")

# ========== 5) DAY-OF-WEEK PROFILE ==========
dow = (base.groupBy("day_name")
            .agg(F.avg("quantity").alias("avg_qty_per_line"),
                 F.sum("quantity").alias("total_qty"))
            .orderBy("day_name"))
pdf_dow = dow.toPandas()
fig_dow = px.bar(pdf_dow, x="day_name", y="total_qty",
                 title="Total Quantity by Day of Week",
                 labels={"total_qty":"Total Quantity","day_name":"Day"})
displayHTML(fig_dow.to_html(include_plotlyjs="cdn"))

# ========== 6) PAYMENT METHOD MIX ==========
paymix = (base.groupBy("Payment_Method")
               .agg(F.sum("quantity").alias("qty"))
               .orderBy(F.desc("qty")))
pdf_pay = paymix.toPandas()
fig_pay = px.pie(pdf_pay, names="Payment_Method", values="qty", title="Payment Method Share (by Quantity)")
displayHTML(fig_pay.to_html(include_plotlyjs="cdn"))

displayHTML("""
<div style="margin-top:10px;padding:8px;border-left:4px solid #3b82f6;background:#f0f7ff;">
<b>Tips:</b>
<ul>
  <li>Use the widgets at the top to change <i>Period</i>, filter by <i>Season</i> or <i>City</i>, and set <i>Top/Bottom N</i>.</li>
  <li>Right-click → <i>Pin to dashboard</i> to build a presentable board.</li>
  <li>For larger data, use <i>last_90d</i> or <i>last_365d</i> to keep the plots responsive.</li>
</ul>
</div>
""")


In [0]:
# STEP 9: Forecast daily demand with Prophet (train/test + forecast plot)

import sys, subprocess
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from datetime import timedelta
import plotly.graph_objects as go

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
GOLD_DAILY = f"{CATALOG}.{SCHEMA}.gold_daily_product_demand"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

# ---------- Widgets ----------
# Default product = highest total demand
daily = spark.table(GOLD_DAILY)

if daily.limit(1).count() == 0:
    displayHTML("<h3>No data in gold_daily_product_demand. Please run Step 7 first.</h3>")
else:
    default_product_row = (daily.groupBy("product_name")
                                .agg(F.sum("total_quantity").alias("qty"))
                                .orderBy(F.desc("qty")).limit(1).collect())
    default_product = default_product_row[0]["product_name"] if default_product_row else ""

    dbutils.widgets.removeAll()
    dbutils.widgets.text("product_name", default_product, "Product to forecast")
    dbutils.widgets.text("horizon_days", "30", "Forecast horizon (days)")
    dbutils.widgets.text("test_days", "14", "Holdout (last N days)")
    product = dbutils.widgets.get("product_name")
    horizon_days = int(dbutils.widgets.get("horizon_days") or "30")
    test_days = int(dbutils.widgets.get("test_days") or "14")

    # ---------- Get series for product ----------
    ts_sdf = (daily
              .filter(F.col("product_name") == product)
              .select("Event_Date", "total_quantity")
              .orderBy("Event_Date"))

    if ts_sdf.limit(1).count() == 0:
        displayHTML(f"<h3>No time series found for product: <code>{product}</code></h3>")
    else:
        # Convert to pandas and fill any missing dates with 0 quantity
        pdf = ts_sdf.toPandas().sort_values("Event_Date")
        pdf["Event_Date"] = pd.to_datetime(pdf["Event_Date"])
        if pdf["Event_Date"].isnull().any():
            pdf = pdf.dropna(subset=["Event_Date"])
        if len(pdf) == 0:
            displayHTML(f"<h3>No valid dates for: <code>{product}</code></h3>")
        else:
            full_idx = pd.date_range(pdf["Event_Date"].min(), pdf["Event_Date"].max(), freq="D")
            pdf_full = (pdf.set_index("Event_Date")
                          .reindex(full_idx)
                          .fillna({"total_quantity": 0})
                          .rename_axis("Event_Date")
                          .reset_index())
            pdf_full.columns = ["ds", "y"]  # Prophet expects ds (date), y (value)

            # Adjust test_days if series is short
            if len(pdf_full) < (test_days + 7):
                test_days = max(3, min(7, len(pdf_full)//4))

            # Train / Test split
            train = pdf_full.iloc[:-test_days] if test_days > 0 else pdf_full.copy()
            test = pdf_full.iloc[-test_days:] if test_days > 0 else pd.DataFrame(columns=pdf_full.columns)

            # ---------- Import / install Prophet ----------
            try:
                from prophet import Prophet
            except Exception as e:
                # Try to install prophet silently
                print("Installing prophet ...")
                subprocess.check_call([sys.executable, "-m", "pip", "install", "prophet==1.1.5", "-q"])
                from prophet import Prophet

            # Build & fit model (simple, robust defaults)
            m = Prophet(
                daily_seasonality=True,      # daily can help with short series after zero-filling
                weekly_seasonality=True,
                yearly_seasonality=True,
                seasonality_mode="additive",
                changepoint_prior_scale=0.5  # a bit flexible
            )
            # Fit
            m.fit(train)

            # Future dataframe for horizon
            future = m.make_future_dataframe(periods=horizon_days, freq="D")
            fcst = m.predict(future)

            # ---------- Metrics on holdout ----------
            def smape(y_true, y_pred):
                denom = (np.abs(y_true) + np.abs(y_pred))
                mask = denom > 0
                return np.mean(2.0 * np.abs(y_pred - y_true)[mask] / denom[mask]) if mask.any() else np.nan

            mae = rmse = smape_val = np.nan
            if test_days > 0 and len(test) > 0:
                # Align on test dates
                merged = test.merge(fcst[["ds", "yhat"]], on="ds", how="left")
                y_true = merged["y"].values
                y_pred = merged["yhat"].values
                mae = float(np.mean(np.abs(y_pred - y_true)))
                rmse = float(np.sqrt(np.mean((y_pred - y_true) ** 2)))
                smape_val = float(smape(y_true, y_pred))

            # ---------- Plotly chart: history + forecast ----------
            hist = fcst[fcst["ds"] <= train["ds"].max()]
            fut = fcst[fcst["ds"] > train["ds"].max()]

            fig = go.Figure()

            # Actuals (all)
            fig.add_trace(go.Scatter(
                x=pdf_full["ds"], y=pdf_full["y"], mode="lines+markers",
                name="Actual (history)", line=dict(color="#2563eb")
            ))

            # Forecast (yhat)
            fig.add_trace(go.Scatter(
                x=fcst["ds"], y=fcst["yhat"], mode="lines",
                name="Forecast (yhat)", line=dict(color="#10b981", dash="solid", width=3)
            ))

            # Uncertainty intervals
            fig.add_trace(go.Scatter(
                x=fcst["ds"], y=fcst["yhat_upper"], mode="lines",
                line=dict(width=0), showlegend=False
            ))
            fig.add_trace(go.Scatter(
                x=fcst["ds"], y=fcst["yhat_lower"], mode="lines",
                fill='tonexty', fillcolor="rgba(16,185,129,0.15)",
                line=dict(width=0), name="Forecast interval"
            ))

            # Vertical line at train/test boundary
            cutoff = train["ds"].max()
            fig.add_vline(x=cutoff, line_color="gray", line_dash="dot")
            fig.add_annotation(x=cutoff, y=max(pdf_full["y"].max(), (fcst["yhat"].max() if len(fcst) else 0)),
                               text="Train/Test split", showarrow=False, yshift=20)

            title = f"Prophet Forecast — {product} (horizon={horizon_days}d, holdout={test_days}d)"
            fig.update_layout(
                title=title,
                xaxis_title="Date",
                yaxis_title="Quantity",
                legend_title_text="Series",
                height=520
            )

            # Metrics panel
            metrics_html = f"""
            <div style="margin:10px 0;padding:10px;border-left:4px solid #22c55e;background:#f0fff4;">
              <b>Backtest metrics (last {test_days} days)</b><br/>
              MAE: <code>{mae:.3f}</code> &nbsp;&nbsp;
              RMSE: <code>{rmse:.3f}</code> &nbsp;&nbsp;
              sMAPE: <code>{(smape_val*100 if not np.isnan(smape_val) else np.nan):.2f}%</code>
            </div>
            """
            displayHTML(metrics_html)
            displayHTML(fig.to_html(include_plotlyjs="cdn"))

# Optional tweaks you can try (in the same cell after it works once)
# Increase horizon_days to 60–90 if you have longer history.
# Tune changepoint_prior_scale (lower = smoother, higher = more responsive).
# If the series is very spiky with many zeros, consider seasonality_mode="multiplicative" or adding holiday/season regressors later.


18:58:54 - cmdstanpy - INFO - Chain [1] start processing
18:58:54 - cmdstanpy - INFO - Chain [1] done processing


In [0]:
# STEP 10: Batch Prophet forecasts for multiple products → save forecasts + metrics Gold tables

import sys, subprocess
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from datetime import datetime

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
GOLD_DAILY = f"{CATALOG}.{SCHEMA}.gold_daily_product_demand"
OUT_FORECAST = f"{CATALOG}.{SCHEMA}.gold_product_forecast"
OUT_METRICS  = f"{CATALOG}.{SCHEMA}.gold_product_forecast_metrics"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

# Widgets
daily = spark.table(GOLD_DAILY)
if daily.limit(1).count() == 0:
    displayHTML("<h3>No data in gold_daily_product_demand. Please run Step 7 first.</h3>")
else:
    # Defaults
    default_topN = "20"
    default_horizon = "30"
    default_test_days = "14"
    default_min_history = "21"  # require at least 3 weeks of data to forecast

    dbutils.widgets.removeAll()
    dbutils.widgets.text("top_n_products", default_topN, "Top N products to forecast")
    dbutils.widgets.text("horizon_days", default_horizon, "Forecast horizon (days)")
    dbutils.widgets.text("test_days", default_test_days, "Holdout (last N days)")
    dbutils.widgets.text("min_history_days", default_min_history, "Min history days to include product")

    top_n = int(dbutils.widgets.get("top_n_products") or default_topN)
    horizon_days = int(dbutils.widgets.get("horizon_days") or default_horizon)
    test_days = int(dbutils.widgets.get("test_days") or default_test_days)
    min_history_days = int(dbutils.widgets.get("min_history_days") or default_min_history)

    # Select top N products by historical demand
    prod_rank = (daily.groupBy("product_key","product_name")
                      .agg(F.sum("total_quantity").alias("qty"),
                           F.countDistinct("Event_Date").alias("history_days"))
                      .filter(F.col("history_days") >= min_history_days)
                      .orderBy(F.desc("qty"))
                      .limit(top_n)
                      .collect())

    products = [(r["product_key"], r["product_name"]) for r in prod_rank]

    if not products:
        displayHTML(f"<h3>No products meet min_history_days={min_history_days}. Lower the threshold and retry.</h3>")
    else:
        # Try Prophet import / install
        try:
            from prophet import Prophet
        except Exception:
            print("Installing prophet (one-time) ...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", "prophet==1.1.5", "-q"])
            from prophet import Prophet

        all_fcst_rows = []
        all_metric_rows = []
        run_ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

        for pkey, pname in products:
            # Pull product series
            sdf = (daily.filter(F.col("product_key") == pkey)
                         .select("Event_Date", "total_quantity")
                         .orderBy("Event_Date"))
            pdf = sdf.toPandas().sort_values("Event_Date")
            if pdf.empty:
                continue

            pdf["Event_Date"] = pd.to_datetime(pdf["Event_Date"])
            pdf = pdf.dropna(subset=["Event_Date"])
            if pdf.empty:
                continue

            # Fill missing dates with zeros
            full_idx = pd.date_range(pdf["Event_Date"].min(), pdf["Event_Date"].max(), freq="D")
            pdf_full = (pdf.set_index("Event_Date")
                          .reindex(full_idx)
                          .fillna({"total_quantity": 0})
                          .rename_axis("ds")
                          .reset_index()
                          .rename(columns={"total_quantity": "y"}))

            # Adjust test_days for very short series
            local_test_days = test_days
            if len(pdf_full) < (local_test_days + 7):
                local_test_days = max(3, min(7, len(pdf_full)//4))

            train = pdf_full.iloc[:-local_test_days] if local_test_days > 0 else pdf_full.copy()
            test  = pdf_full.iloc[-local_test_days:] if local_test_days > 0 else pd.DataFrame(columns=pdf_full.columns)

            # Fit Prophet
            m = Prophet(
                daily_seasonality=True,
                weekly_seasonality=True,
                yearly_seasonality=True,
                seasonality_mode="additive",
                changepoint_prior_scale=0.5
            )
            m.fit(train)

            # Predict future
            future = m.make_future_dataframe(periods=horizon_days, freq="D")
            fcst = m.predict(future)  # contains ds, yhat, yhat_lower, yhat_upper

            # Compute simple backtest metrics
            def smape(y_true, y_pred):
                denom = (np.abs(y_true) + np.abs(y_pred))
                mask = denom > 0
                return float(np.mean(2.0 * np.abs(y_pred - y_true)[mask] / denom[mask])) if mask.any() else float("nan")

            mae = rmse = smape_val = float("nan")
            if local_test_days > 0 and not test.empty:
                merged = test.merge(fcst[["ds","yhat"]], on="ds", how="left")
                y_true = merged["y"].values
                y_pred = merged["yhat"].values
                if len(y_true) and len(y_pred):
                    mae = float(np.mean(np.abs(y_pred - y_true)))
                    rmse = float(np.sqrt(np.mean((y_pred - y_true) ** 2)))
                    smape_val = smape(y_true, y_pred)

            # Accumulate forecast rows
            # Mark whether each row is in history or future
            hist_end = train["ds"].max()
            fcst["is_future"] = fcst["ds"] > hist_end
            fcst["product_key"] = pkey
            fcst["product_name"] = pname
            fcst["run_timestamp_utc"] = run_ts
            fcst_subset = fcst[["product_key","product_name","ds","yhat","yhat_lower","yhat_upper","is_future","run_timestamp_utc"]]
            all_fcst_rows.append(fcst_subset)

            # Accumulate metrics
            all_metric_rows.append({
                "product_key": pkey,
                "product_name": pname,
                "run_timestamp_utc": run_ts,
                "train_end": pd.to_datetime(hist_end),
                "test_days": int(local_test_days),
                "mae": mae,
                "rmse": rmse,
                "smape": smape_val
            })

        # Combine and write outputs
        if all_fcst_rows:
            pdf_all_fcst = pd.concat(all_fcst_rows, ignore_index=True)
            sdf_all_fcst = spark.createDataFrame(pdf_all_fcst)\
                                .withColumn("ds", F.to_date("ds"))\
                                .withColumn("is_future", F.col("is_future").cast("boolean"))\
                                .withColumn("yhat", F.col("yhat").cast("double"))\
                                .withColumn("yhat_lower", F.col("yhat_lower").cast("double"))\
                                .withColumn("yhat_upper", F.col("yhat_upper").cast("double"))
            (sdf_all_fcst.write
                .mode("overwrite")
                .format("delta")
                .option("overwriteSchema","true")
                .saveAsTable(OUT_FORECAST))

            print(f"Wrote forecasts → {OUT_FORECAST}")
            display(spark.table(OUT_FORECAST).orderBy("product_name","ds").limit(20))
        else:
            displayHTML("<h3>No forecasts generated.</h3>")

        if all_metric_rows:
            pdf_metrics = pd.DataFrame(all_metric_rows)
            sdf_metrics = spark.createDataFrame(pdf_metrics)\
                               .withColumn("train_end", F.to_date("train_end"))\
                               .withColumn("test_days", F.col("test_days").cast("int"))\
                               .withColumn("mae", F.col("mae").cast("double"))\
                               .withColumn("rmse", F.col("rmse").cast("double"))\
                               .withColumn("smape", F.col("smape").cast("double"))
            (sdf_metrics.write
                .mode("overwrite")
                .format("delta")
                .option("overwriteSchema","true")
                .saveAsTable(OUT_METRICS))

            print(f"Wrote metrics → {OUT_METRICS}")
            display(spark.table(OUT_METRICS).orderBy(F.desc("smape")).limit(20))
        else:
            displayHTML("<p><em>No metrics generated.</em></p>")



datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).

18:58:58 - cmdstanpy - INFO - Chain [1] start processing
18:58:58 - cmdstanpy - INFO - Chain [1] done processing
18:58:59 - cmdstanpy - INFO - Chain [1] start processing
18:58:59 - cmdstanpy - INFO - Chain [1] done processing
18:59:00 - cmdstanpy - INFO - Chain [1] start processing
18:59:00 - cmdstanpy - INFO - Chain [1] done processing
18:59:01 - cmdstanpy - INFO - Chain [1] start processing
18:59:01 - cmdstanpy - INFO - Chain [1] done processing
18:59:02 - cmdstanpy - INFO - Chain [1] start processing
18:59:02 - cmdstanpy - INFO - Chain [1] done processing
18:59:03 - cmdstanpy - INFO - Chain [1] start processing
18:59:03 - cmdstanpy - INFO - Chain [1] done processing
18:59:05 - cmdstanpy - INFO - Chain [1] start processing
18:59:05 - cmdstanpy - INFO - Chain [1] done processing
18:59:06 - cmdstanpy - I

Wrote forecasts → retail_demand_catalog.retail_demand_schema.gold_product_forecast


product_key,product_name,ds,yhat,yhat_lower,yhat_upper,is_future,run_timestamp_utc
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-01,24.18574537137844,18.627348841797232,30.896053288658784,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-02,23.68305142575953,17.247927046908146,29.62251083093836,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-03,23.768327794073738,18.087852867506705,30.54375612104195,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-04,23.31347082341647,17.056189293745785,28.89893379428145,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-05,23.360457715656405,17.35793452676442,29.50440059186379,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-06,24.01000751056888,18.116987918000195,29.61369150416079,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-07,23.724714633609626,17.282574437908746,29.630156633445367,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-08,24.07136673221393,17.66391922607091,30.11527942759776,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-09,23.466887241302075,17.397406655307826,29.671976895513048,False,2025-08-27 18:58:57
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2020-01-10,23.456161377252137,17.00260699277628,29.812118211725764,False,2025-08-27 18:58:57


Wrote metrics → retail_demand_catalog.retail_demand_schema.gold_product_forecast_metrics


product_key,product_name,run_timestamp_utc,train_end,test_days,mae,rmse,smape
7f3b6b9097e3d420cc518fee0618e60b,Mustard,2025-08-27 18:58:57,2024-05-04,14,5.580235172591777,6.404333284358628,0.2419095787487315
fe01d67a002dfa0f3ac084298142eccd,Orange,2025-08-27 18:58:57,2024-05-04,14,4.567652077800388,5.484485376714722,0.2164885119042323
cbf4e0b7971051760907c327e975f4e5,Soap,2025-08-27 18:58:57,2024-05-04,14,4.624373106473172,5.464847865157615,0.2006114061878121
e80f9290d4767ae136c9c44e5a2c6ffc,Air Freshener,2025-08-27 18:58:57,2024-05-04,14,4.191138959650036,5.477909277873991,0.188844585700471
142daa03fdadf46ee70a698ac364130b,Laundry Detergent,2025-08-27 18:58:57,2024-05-04,14,4.2858040851660295,4.785001130077219,0.1836918431955309
623fcd3761e7dc71ac7784cc058e9dc9,Yogurt,2025-08-27 18:58:57,2024-05-04,14,4.566776535013349,6.038563115332859,0.1787967221158154
2a92a853a070ee65ee84c2628b4f25fa,Baby Wipes,2025-08-27 18:58:57,2024-05-04,14,4.145099457435821,5.283636479841548,0.172113275804509
186a8b72fe696268c330f725f52be9ff,Carrots,2025-08-27 18:58:57,2024-05-04,14,3.779216325665944,5.190550039053893,0.1718443902658191
02cb3332853d4e8482bd85102a392be3,Toothpaste,2025-08-27 18:58:57,2024-05-04,14,7.78176736713876,9.149531134743553,0.1711267960617455
8289b549e3cb5290de041270228a4ea8,Deodorant,2025-08-27 18:58:57,2024-05-04,14,4.0079642306152055,4.937726647828309,0.1701187738340041


In [0]:
# STEP 11: Visualization — Actuals + Prophet Forecast (latest run) per product

from pyspark.sql import functions as F
import plotly.graph_objects as go
import pandas as pd

CATALOG = "retail_demand_catalog"
SCHEMA  = "retail_demand_schema"
GOLD_DAILY   = f"{CATALOG}.{SCHEMA}.gold_daily_product_demand"
GOLD_FCST    = f"{CATALOG}.{SCHEMA}.gold_product_forecast"
GOLD_METRICS = f"{CATALOG}.{SCHEMA}.gold_product_forecast_metrics"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

# ----- Check tables exist -----
tables_ok = True
for t in [GOLD_DAILY, GOLD_FCST]:
    try:
        _ = spark.table(t).limit(1).count()
    except Exception as e:
        tables_ok = False

if not tables_ok:
    displayHTML("<h3>Required tables not found. Please complete Steps 7 and 10 first.</h3>")
else:
    daily = spark.table(GOLD_DAILY)
    fcst_all = spark.table(GOLD_FCST)
    metrics_all = None
    try:
        metrics_all = spark.table(GOLD_METRICS)
    except Exception:
        metrics_all = None  # metrics are optional

    # ----- Build product dropdown (top 50 by total demand) -----
    prod_rows = (daily.groupBy("product_name")
                       .agg(F.sum("total_quantity").alias("qty"))
                       .orderBy(F.desc("qty"))
                       .limit(50)
                       .collect())
    prod_list = [r["product_name"] for r in prod_rows] or []
    default_product = prod_list[0] if prod_list else ""

    dbutils.widgets.removeAll()
    dbutils.widgets.dropdown("product_name", default_product, prod_list or [default_product or ""])
    dbutils.widgets.dropdown("view", "All", ["All", "History+Forecast", "Forecast only"], "View")

    product = dbutils.widgets.get("product_name")
    view = dbutils.widgets.get("view")

    # ----- Pull actuals -----
    sdf_actuals = (daily
                   .filter(F.col("product_name") == product)
                   .select(F.col("Event_Date").alias("ds"),
                           F.col("total_quantity").alias("y"))
                   .orderBy("ds"))

    if sdf_actuals.limit(1).count() == 0:
        displayHTML(f"<h3>No data for product: <code>{product}</code></h3>")
    else:
        pdf_actuals = sdf_actuals.toPandas()
        pdf_actuals["ds"] = pd.to_datetime(pdf_actuals["ds"])

        # ----- Find latest forecast run for this product -----
        latest_run_row = (fcst_all
                          .filter(F.col("product_name") == product)
                          .agg(F.max("run_timestamp_utc").alias("run_ts"))
                          .collect())
        if not latest_run_row or latest_run_row[0]["run_ts"] is None:
            displayHTML(f"<h3>No forecast found for product: <code>{product}</code>. Run Step 10.</h3>")
        else:
            latest_run = latest_run_row[0]["run_ts"]

            fcst = (fcst_all
                    .filter((F.col("product_name") == product) &
                            (F.col("run_timestamp_utc") == F.lit(latest_run)))
                    .select("ds", "yhat", "yhat_lower", "yhat_upper", "is_future")
                    .orderBy("ds"))
            pdf_fcst = fcst.toPandas()
            pdf_fcst["ds"] = pd.to_datetime(pdf_fcst["ds"])

            # Determine train end (first day marked as future minus 1)
            if "is_future" in pdf_fcst.columns and pdf_fcst["is_future"].any():
                first_future_idx = pdf_fcst.index[pdf_fcst["is_future"]].min()
                hist_end = pdf_fcst.loc[first_future_idx - 1, "ds"] if first_future_idx > 0 else pdf_actuals["ds"].max()
            else:
                hist_end = pdf_actuals["ds"].max()

            # ----- Build Plotly figure -----
            fig = go.Figure()

            # Actuals (history)
            if view in ("All", "History+Forecast"):
                fig.add_trace(go.Scatter(
                    x=pdf_actuals["ds"], y=pdf_actuals["y"],
                    mode="lines+markers", name="Actuals",
                    line=dict(color="#2563eb")
                ))

            # Fitted on history (if available in forecast rows)
            if view in ("All", "History+Forecast"):
                fitted = pdf_fcst[pdf_fcst["ds"] <= hist_end]
                if not fitted.empty:
                    fig.add_trace(go.Scatter(
                        x=fitted["ds"], y=fitted["yhat"],
                        mode="lines", name="Fitted (yhat)",
                        line=dict(color="#10b981", dash="dot", width=2)
                    ))

            # Future forecast
            fut = pdf_fcst[pdf_fcst["ds"] > hist_end]
            if not fut.empty:
                fig.add_trace(go.Scatter(
                    x=fut["ds"], y=fut["yhat"],
                    mode="lines", name="Forecast (yhat)",
                    line=dict(color="#10b981", width=3)
                ))
                # Confidence band
                fig.add_trace(go.Scatter(
                    x=fut["ds"], y=fut["yhat_upper"], mode="lines",
                    line=dict(width=0), showlegend=False
                ))
                fig.add_trace(go.Scatter(
                    x=fut["ds"], y=fut["yhat_lower"], mode="lines",
                    fill='tonexty', fillcolor="rgba(16,185,129,0.15)",
                    line=dict(width=0), name="Forecast interval"
                ))

            # Vertical split line
            fig.add_vline(x=hist_end, line_color="gray", line_dash="dot")
            fig.add_annotation(x=hist_end, y=(max(pdf_actuals["y"].max(), pdf_fcst["yhat"].max()) if len(pdf_fcst) else pdf_actuals["y"].max()),
                               text="Training horizon", showarrow=False, yshift=16)

            fig.update_layout(
                title=f"Daily Demand — {product} (latest run: {latest_run} UTC)",
                xaxis_title="Date",
                yaxis_title="Quantity",
                legend_title_text="Series",
                height=520
            )

            # ----- Metrics panel (latest run) -----
            metrics_html = ""
            if metrics_all is not None:
                met = (metrics_all
                       .filter((F.col("product_name") == product) &
                               (F.col("run_timestamp_utc") == F.lit(latest_run)))
                       .select("mae","rmse","smape","test_days")
                       .limit(1)
                       .toPandas())
                if not met.empty:
                    mae = met.loc[0, "mae"]
                    rmse = met.loc[0, "rmse"]
                    smape = met.loc[0, "smape"]
                    test_days = met.loc[0, "test_days"]
                    metrics_html = f"""
                    <div style="margin:10px 0;padding:10px;border-left:4px solid #22c55e;background:#f0fff4;">
                      <b>Backtest metrics (last {int(test_days)} days)</b><br/>
                      MAE: <code>{mae:.3f}</code> &nbsp;&nbsp;
                      RMSE: <code>{rmse:.3f}</code> &nbsp;&nbsp;
                      sMAPE: <code>{(smape*100 if pd.notnull(smape) else float('nan')):.2f}%</code>
                    </div>
                    """

            if metrics_html:
                displayHTML(metrics_html)
            displayHTML(fig.to_html(include_plotlyjs="cdn"))
