In [0]:
# ============================================================
# CELL 1: INFRASTRUCTURE CONNECTION & AUTHENTICATION
# ============================================================
# Compatible with: Shared Clusters / Unity Catalog
# ============================================================

# 1. Retrieve Service Principal credentials from Key Vault
client_id     = dbutils.secrets.get(scope="shopsmart-scope", key="datalake-sp-client-id")
client_secret = dbutils.secrets.get(scope="shopsmart-scope", key="datalake-sp-client-secret")
tenant_id     = dbutils.secrets.get(scope="shopsmart-scope", key="datalake-sp-tenant-id")

storage_account_name = "dlsshopsmartdev123"

# 2. Configure Spark for OAuth 2.0 (this is the ONLY method needed)
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

# 3. Define base paths
BRONZE = f"abfss://bronze@{storage_account_name}.dfs.core.windows.net"
SILVER = f"abfss://silver@{storage_account_name}.dfs.core.windows.net"
GOLD   = f"abfss://gold@{storage_account_name}.dfs.core.windows.net"

print(f"✅ Authentication configured for: {storage_account_name}")
print(f"   📁 Bronze: {BRONZE}")
print(f"   📁 Silver: {SILVER}")
print(f"   📁 Gold:   {GOLD}")

✅ Authentication configured for: dlsshopsmartdev123
   📁 Bronze: abfss://bronze@dlsshopsmartdev123.dfs.core.windows.net
   📁 Silver: abfss://silver@dlsshopsmartdev123.dfs.core.windows.net
   📁 Gold:   abfss://gold@dlsshopsmartdev123.dfs.core.windows.net


In [0]:
# ============================================================
# CELL 2: FIND FILES & VERIFY BRONZE DATA ACCESS
# ============================================================
# Strategy: Use spark.read directly (works on shared clusters)
# We'll try multiple path patterns to find where az cli put files
# ============================================================

print("=" * 60)
print("🔍 FINDING YOUR BRONZE DATA FILES")
print("=" * 60)

# ----------------------------------------------------------
# Your az cli could have placed files at different paths.
# Let's try all possibilities for orders.csv first
# ----------------------------------------------------------

test_paths = [
    ("Pattern 1", f"{BRONZE}/source1_orders_pg/orders.csv"),
    ("Pattern 2", f"{BRONZE}/output_data/source1_orders_pg/orders.csv"),
    ("Pattern 3", f"{BRONZE}/orders.csv"),
    ("Pattern 4", f"{BRONZE}/source1_orders_pg/"),
    ("Pattern 5", f"{BRONZE}/output_data/"),
]

found_base = None

print("\n🔎 Searching for orders.csv...\n")

for name, path in test_paths:
    try:
        if path.endswith(".csv"):
            df = spark.read.option("header", True).csv(path)
            count = df.count()
            print(f"   ✅ {name}: FOUND! ({count} rows)")
            print(f"      Path: {path}")
            found_base = path.replace("/orders.csv", "").replace("/source1_orders_pg", "")
            break
        else:
            # Try reading any CSV in this directory
            df = spark.read.option("header", True).csv(path)
            count = df.count()
            print(f"   ✅ {name}: FOUND directory! ({count} rows)")
            print(f"      Path: {path}")
            found_base = path.replace("/source1_orders_pg/", "").replace("/output_data/", "")
            break
    except Exception as e:
        print(f"   ❌ {name}: Not here")
        # print(f"      {str(e)[:80]}")

if found_base:
    print(f"\n🎯 BASE PATH FOUND: {found_base}")
else:
    print(f"\n⚠️ Could not find files automatically.")
    print(f"   Let's try listing the bronze root...")
    
    # Last resort: try to read anything from bronze
    try:
        # Try if files are directly in bronze root
        df_test = spark.read.text(f"{BRONZE}/*")
        print(f"   Found {df_test.count()} lines of text in bronze root")
    except Exception as e:
        print(f"   ❌ Bronze root also empty or inaccessible")
        print(f"   Error: {str(e)[:100]}")

print("\n" + "=" * 60)
print("📋 SHARE THIS OUTPUT — I'll give you the exact next step")
print("=" * 60)

🔍 FINDING YOUR BRONZE DATA FILES

🔎 Searching for orders.csv...

   ✅ Pattern 1: FOUND! (2000 rows)
      Path: abfss://bronze@dlsshopsmartdev123.dfs.core.windows.net/source1_orders_pg/orders.csv

🎯 BASE PATH FOUND: abfss://bronze@dlsshopsmartdev123.dfs.core.windows.net

📋 SHARE THIS OUTPUT — I'll give you the exact next step


In [0]:
# ============================================================
# CELL 3: BRONZE DATA VERIFICATION & PROFILING
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# 3.1 Define all Bronze source paths
# ----------------------------------------------------------
BRONZE_ORDERS      = BRONZE + "/source1_orders_pg/orders.csv"
BRONZE_ORDER_ITEMS = BRONZE + "/source1_orders_pg/order_items.csv"
BRONZE_CUSTOMERS   = BRONZE + "/source2_customers_api/customers.json"
BRONZE_PRODUCTS    = BRONZE + "/source3_products_mongo/products.json"
BRONZE_CLICKSTREAM = BRONZE + "/source4_clickstream_eventhub/clickstream.json"
BRONZE_INVENTORY   = BRONZE + "/source5_inventory_csv/inventory.csv"
BRONZE_PAYMENTS    = BRONZE + "/source6_payments_api/payments.json"

# ----------------------------------------------------------
# 3.2 Read ALL Bronze datasets
# ----------------------------------------------------------
print("=" * 65)
print("READING ALL BRONZE DATASETS")
print("=" * 65)

df_orders_raw = spark.read.option("header", True).option("inferSchema", True).csv(BRONZE_ORDERS)
orders_count = df_orders_raw.count()
print("Orders loaded:       " + str(orders_count) + " rows")

df_items_raw = spark.read.option("header", True).option("inferSchema", True).csv(BRONZE_ORDER_ITEMS)
items_count = df_items_raw.count()
print("Order Items loaded:  " + str(items_count) + " rows")

df_customers_raw = spark.read.option("multiLine", True).json(BRONZE_CUSTOMERS)
cust_count = df_customers_raw.count()
print("Customers loaded:    " + str(cust_count) + " rows")

df_products_raw = spark.read.option("multiLine", True).json(BRONZE_PRODUCTS)
prod_count = df_products_raw.count()
print("Products loaded:     " + str(prod_count) + " rows")

df_clicks_raw = spark.read.json(BRONZE_CLICKSTREAM)
clicks_count = df_clicks_raw.count()
print("Clickstream loaded:  " + str(clicks_count) + " rows")

df_inventory_raw = spark.read.option("header", True).option("inferSchema", True).csv(BRONZE_INVENTORY)
inv_count = df_inventory_raw.count()
print("Inventory loaded:    " + str(inv_count) + " rows")

df_payments_raw = spark.read.option("multiLine", True).json(BRONZE_PAYMENTS)
pay_count = df_payments_raw.count()
print("Payments loaded:     " + str(pay_count) + " rows")

READING ALL BRONZE DATASETS
Orders loaded:       2000 rows
Order Items loaded:  4904 rows
Customers loaded:    500 rows
Products loaded:     50 rows
Clickstream loaded:  3000 rows
Inventory loaded:    150 rows
Payments loaded:     2000 rows


In [0]:
# ============================================================
# CELL 3b: PROFILE EACH DATASET
# ============================================================

# ----- ORDERS -----
print("=" * 65)
print("SOURCE 1a: ORDERS - Data Issues")
print("=" * 65)

null_status = df_orders_raw.filter(col("order_status").isNull()).count()
neg_amount = df_orders_raw.filter(col("total_amount") < 0).count()
distinct_orders = df_orders_raw.select("order_id").distinct().count()
dup_orders = orders_count - distinct_orders

pct_null = str(int(null_status * 100 / orders_count))

print("  NULL order_status:     " + str(null_status) + " rows (" + pct_null + " pct)")
print("  Negative total_amount: " + str(neg_amount) + " rows")
print("  Duplicate order_ids:   " + str(dup_orders) + " rows")

# ----- CUSTOMERS -----
print("\n" + "=" * 65)
print("SOURCE 2: CUSTOMERS - Data Issues")
print("=" * 65)

null_emails = df_customers_raw.filter(col("email").isNull()).count()
pct_email = str(int(null_emails * 100 / cust_count))

print("  NULL email:    " + str(null_emails) + " rows (" + pct_email + " pct)")
print("  PII ALERT:     email has plaintext data")
print("  PII ALERT:     phone has plaintext data")
print("  NESTED STRUCT: address needs flattening")
print("  NESTED STRUCT: preferences needs flattening")

# ----- PRODUCTS -----
print("\n" + "=" * 65)
print("SOURCE 3: PRODUCTS - Data Issues")
print("=" * 65)

null_prices = df_products_raw.filter(col("price").isNull()).count()
zero_prices = df_products_raw.filter(col("price") == 0).count()
print("  NULL prices:   " + str(null_prices))
print("  Zero prices:   " + str(zero_prices))
print("  NESTED STRUCT: attributes needs flattening")

# ----- INVENTORY -----
print("\n" + "=" * 65)
print("SOURCE 5: INVENTORY - Data Issues")
print("=" * 65)

neg_stock = df_inventory_raw.filter(col("quantity_on_hand") < 0).count()
pct_neg = str(int(neg_stock * 100 / inv_count))

print("  Negative stock: " + str(neg_stock) + " rows (" + pct_neg + " pct)")

# ----- CLICKSTREAM -----
print("\n" + "=" * 65)
print("SOURCE 4: CLICKSTREAM - Data Issues")
print("=" * 65)

null_cust_clicks = df_clicks_raw.filter(col("customer_id").isNull()).count()
pct_anon = str(int(null_cust_clicks * 100 / clicks_count))

print("  NULL customer_id: " + str(null_cust_clicks) + " rows (" + pct_anon + " pct)")
print("  NESTED STRUCT:    geo_location needs flattening")

# ----- PAYMENTS -----
print("\n" + "=" * 65)
print("SOURCE 6: PAYMENTS - Data Issues")
print("=" * 65)
print("  Risk categorization needed")
print("  Date standardization needed")

SOURCE 1a: ORDERS - Data Issues
  NULL order_status:     52 rows (2 pct)
  Negative total_amount: 0 rows
  Duplicate order_ids:   0 rows

SOURCE 2: CUSTOMERS - Data Issues
  NULL email:    53 rows (10 pct)
  PII ALERT:     email has plaintext data
  PII ALERT:     phone has plaintext data
  NESTED STRUCT: address needs flattening
  NESTED STRUCT: preferences needs flattening

SOURCE 3: PRODUCTS - Data Issues
  NULL prices:   0
  Zero prices:   0
  NESTED STRUCT: attributes needs flattening

SOURCE 5: INVENTORY - Data Issues
  Negative stock: 4 rows (2 pct)

SOURCE 4: CLICKSTREAM - Data Issues
  NULL customer_id: 626 rows (20 pct)
  NESTED STRUCT:    geo_location needs flattening

SOURCE 6: PAYMENTS - Data Issues
  Risk categorization needed
  Date standardization needed


In [0]:
# ============================================================
# CELL 3c: SCHEMAS & SUMMARY
# ============================================================

# Show all schemas
print("=" * 65)
print("ALL SCHEMAS")
print("=" * 65)

print("\n--- Orders Schema ---")
df_orders_raw.printSchema()

print("\n--- Order Items Schema ---")
df_items_raw.printSchema()

print("\n--- Customers Schema ---")
df_customers_raw.printSchema()

print("\n--- Products Schema ---")
df_products_raw.printSchema()

print("\n--- Clickstream Schema ---")
df_clicks_raw.printSchema()

print("\n--- Inventory Schema ---")
df_inventory_raw.printSchema()

print("\n--- Payments Schema ---")
df_payments_raw.printSchema()

# Show distributions using Spark (safer than print math)
print("\n" + "=" * 65)
print("KEY DISTRIBUTIONS")
print("=" * 65)

print("\nOrders - Status distribution:")
df_orders_raw.groupBy("order_status").count().orderBy(desc("count")).show()

print("\nProducts - Category distribution:")
df_products_raw.groupBy("category").count().orderBy(desc("count")).show()

print("\nClickstream - Event type distribution:")
df_clicks_raw.groupBy("event_type").count().orderBy(desc("count")).show()

print("\nPayments - Status distribution:")
df_payments_raw.groupBy("status").count().orderBy(desc("count")).show()

print("\nPayments - Risk score stats:")
df_payments_raw.select(
    min("risk_score").alias("min_risk"),
    max("risk_score").alias("max_risk"),
    avg("risk_score").alias("avg_risk")
).show()

print("\nInventory - Negative stock samples:")
df_inventory_raw.filter(col("quantity_on_hand") < 0).show(5, truncate=False)

# Summary
print("\n" + "=" * 65)
print("SUMMARY")
print("=" * 65)
print("  Orders:       " + str(orders_count) + " rows | Issues: " + str(null_status) + " null status, " + str(dup_orders) + " dupes")
print("  Order Items:  " + str(items_count) + " rows | Issues: type casting")
print("  Customers:    " + str(cust_count) + " rows | Issues: " + str(null_emails) + " null emails, PII, nested")
print("  Products:     " + str(prod_count) + " rows | Issues: nested attributes")
print("  Clickstream:  " + str(clicks_count) + " rows | Issues: " + str(null_cust_clicks) + " null cust_id, nested")
print("  Inventory:    " + str(inv_count) + " rows | Issues: " + str(neg_stock) + " negative stock")
print("  Payments:     " + str(pay_count) + " rows | Issues: risk categorization")
print("\n[DONE] Bronze profiling complete!")
print("[NEXT] Cell 4 - Silver Layer Transformations")

ALL SCHEMAS

--- Orders Schema ---
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- shipping_amount: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- shipping_address_id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)


--- Order Items Schema ---
root
 |-- item_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount_percent: double (nullable = true)
 |-- item_status: string (nullable = true)
 |-- created_at: timestamp (nullable = true)


--- Customers Schema ---
root
 |-- address: stru

In [0]:
# ============================================================
# CELL 4: SILVER LAYER - ORDERS TRANSFORMATION
# ============================================================
# Bronze Issues Fixed:
#   1. 52 NULL order_status -> quarantined
#   2. Deduplication on order_id
#   3. Data type standardization
#   4. Derived columns (time parts, net_amount, flags)
#   5. Written as Delta to Silver layer
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

# ----------------------------------------------------------
# Step 1: Read Bronze Orders
# ----------------------------------------------------------
df_orders_bronze = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(BRONZE + "/source1_orders_pg/orders.csv")

bronze_count = df_orders_bronze.count()
print("STEP 1: Bronze Orders read - " + str(bronze_count) + " rows")

# ----------------------------------------------------------
# Step 2: Add Bronze audit columns
# ----------------------------------------------------------
df_orders_bronze = df_orders_bronze \
    .withColumn("_bronze_loaded_at", current_timestamp()) \
    .withColumn("_source_file", lit("source1_orders_pg/orders.csv"))

# ----------------------------------------------------------
# Step 3: Separate GOOD vs QUARANTINE records
# ----------------------------------------------------------
# Rule: order_status must NOT be null (52 rows will be quarantined)
df_orders_good = df_orders_bronze.filter(col("order_status").isNotNull())
df_orders_quarantine = df_orders_bronze.filter(col("order_status").isNull())

good_count = df_orders_good.count()
quarantine_count = df_orders_quarantine.count()

print("STEP 3: Data Quality Split")
print("  Good records:        " + str(good_count))
print("  Quarantined records: " + str(quarantine_count))

# Save quarantine records
df_orders_quarantine \
    .withColumn("_quarantine_reason", lit("NULL order_status")) \
    .withColumn("_quarantine_timestamp", current_timestamp()) \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(BRONZE + "/quarantine/orders")

print("  Quarantine saved to: bronze/quarantine/orders")

# ----------------------------------------------------------
# Step 4: Deduplicate on order_id (keep latest by updated_at)
# ----------------------------------------------------------
from pyspark.sql.window import Window

window_dedup = Window.partitionBy("order_id").orderBy(col("updated_at").desc())

df_orders_deduped = df_orders_good \
    .withColumn("_row_num", row_number().over(window_dedup)) \
    .filter(col("_row_num") == 1) \
    .drop("_row_num")

dedup_count = df_orders_deduped.count()
dupes_removed = good_count - dedup_count
print("STEP 4: Deduplication - " + str(dupes_removed) + " duplicates removed, " + str(dedup_count) + " remaining")

# ----------------------------------------------------------
# Step 5: Standardize and clean columns
# ----------------------------------------------------------
df_orders_clean = df_orders_deduped \
    .withColumn("order_id", trim(col("order_id"))) \
    .withColumn("customer_id", trim(col("customer_id"))) \
    .withColumn("order_status", upper(trim(col("order_status")))) \
    .withColumn("payment_method", lower(trim(col("payment_method")))) \
    .withColumn("channel", lower(trim(col("channel")))) \
    .withColumn("total_amount", col("total_amount").cast("double")) \
    .withColumn("discount_amount", coalesce(col("discount_amount").cast("double"), lit(0.0))) \
    .withColumn("shipping_amount", coalesce(col("shipping_amount").cast("double"), lit(0.0)))

# ----------------------------------------------------------
# Step 6: Add derived business columns
# ----------------------------------------------------------
df_orders_enriched = df_orders_clean \
    .withColumn("net_amount", 
        round(col("total_amount") - col("discount_amount"), 2)) \
    .withColumn("gross_with_shipping", 
        round(col("total_amount") + col("shipping_amount"), 2)) \
    .withColumn("discount_pct",
        when(col("total_amount") > 0,
            round(col("discount_amount") / col("total_amount") * 100, 2))
        .otherwise(lit(0.0))) \
    .withColumn("has_discount",
        when(col("discount_amount") > 0, lit(True)).otherwise(lit(False))) \
    .withColumn("has_free_shipping",
        when(col("shipping_amount") == 0, lit(True)).otherwise(lit(False))) \
    .withColumn("order_year", year("order_date")) \
    .withColumn("order_month", month("order_date")) \
    .withColumn("order_day", dayofmonth("order_date")) \
    .withColumn("order_hour", hour("order_date")) \
    .withColumn("order_day_of_week", dayofweek("order_date")) \
    .withColumn("day_name", date_format("order_date", "EEEE")) \
    .withColumn("is_weekend",
        when(dayofweek("order_date").isin(1, 7), lit(True)).otherwise(lit(False))) \
    .withColumn("is_cancelled",
        when(col("order_status") == "CANCELLED", lit(True)).otherwise(lit(False))) \
    .withColumn("is_returned",
        when(col("order_status") == "RETURNED", lit(True)).otherwise(lit(False)))

# ----------------------------------------------------------
# Step 7: Add Silver metadata
# ----------------------------------------------------------
df_orders_silver = df_orders_enriched \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

# ----------------------------------------------------------
# Step 8: Write to Silver as Delta (partitioned by year, month)
# ----------------------------------------------------------
silver_orders_path = SILVER + "/orders"

df_orders_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_year", "order_month") \
    .option("overwriteSchema", True) \
    .save(silver_orders_path)

# ----------------------------------------------------------
# Step 9: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(silver_orders_path)
final_count = df_verify.count()
final_cols = len(df_verify.columns)

print("")
print("=" * 65)
print("SILVER ORDERS - COMPLETE")
print("=" * 65)
print("  Source (Bronze):     " + str(bronze_count) + " rows")
print("  Quarantined:         " + str(quarantine_count) + " rows (null status)")
print("  Duplicates removed:  " + str(dupes_removed) + " rows")
print("  Final Silver:        " + str(final_count) + " rows")
print("  Columns:             " + str(final_cols))
print("  Format:              Delta Lake")
print("  Partitioned by:      order_year, order_month")
print("  Path:                " + silver_orders_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample data:")
df_verify.select(
    "order_id", "customer_id", "order_date", "order_status",
    "total_amount", "net_amount", "discount_pct",
    "channel", "is_weekend", "day_name"
).show(5, truncate=False)

print("\n  Status distribution:")
df_verify.groupBy("order_status").count().orderBy(desc("count")).show()

print("\n  Channel distribution:")
df_verify.groupBy("channel").count().orderBy(desc("count")).show()

print("\n  Records by year-month:")
df_verify.groupBy("order_year", "order_month").count().orderBy("order_year", "order_month").show(15)

print("[DONE] Silver Orders complete!")
print("[NEXT] Cell 5 - Silver Order Items")

STEP 1: Bronze Orders read - 2000 rows
STEP 3: Data Quality Split
  Good records:        1948
  Quarantined records: 52
  Quarantine saved to: bronze/quarantine/orders
STEP 4: Deduplication - 0 duplicates removed, 1948 remaining

SILVER ORDERS - COMPLETE
  Source (Bronze):     2000 rows
  Quarantined:         52 rows (null status)
  Duplicates removed:  0 rows
  Final Silver:        1948 rows
  Columns:             30
  Format:              Delta Lake
  Partitioned by:      order_year, order_month
  Path:                abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/orders

  Schema:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- shipping_amount: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- channel: string (nullable = true)
 

In [0]:
# ============================================================
# CELL 5: SILVER LAYER - ORDER ITEMS TRANSFORMATION
# ============================================================
# 
# WHAT IS THIS TABLE?
# -------------------
# Order Items is the "line item" detail of each order.
# One order can have multiple items (1 order -> many items).
# Example: Order ORD001 has 3 items (headphones, cable, case)
#
# This is called a "parent-child" relationship:
#   orders (parent) --< order_items (child)
#   Linked by: order_id
#
# WHY IS THIS IMPORTANT?
# ----------------------
# In the Gold layer, this becomes our FACT TABLE (fact_sales).
# Every analytics query (revenue, quantity, top products) 
# needs this data at the line-item level.
#
# BRONZE ISSUES TO FIX:
# - Possible duplicate item_ids
# - Need derived columns (line_total, discount_amount)
# - Data type standardization
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.window import Window

# ----------------------------------------------------------
# Step 1: Read Bronze Order Items
# ----------------------------------------------------------
# WHY inferSchema=True?
# Without it, all columns come as strings.
# With it, Spark automatically detects:
#   quantity -> integer
#   unit_price -> double
#   created_at -> timestamp
# This saves us manual casting for most columns.

df_items_bronze = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(BRONZE + "/source1_orders_pg/order_items.csv")

bronze_count = df_items_bronze.count()
print("STEP 1: Bronze Order Items read - " + str(bronze_count) + " rows")


# ----------------------------------------------------------
# Step 2: Deduplicate on item_id
# ----------------------------------------------------------
# WHY DEDUPLICATE?
# In real production systems, duplicates happen due to:
#   - API retries (network timeout -> retry -> same record sent twice)
#   - CDC (Change Data Capture) replaying events
#   - ETL pipeline re-runs
#
# dropDuplicates(["item_id"]) keeps FIRST occurrence and 
# removes any row where item_id appears more than once.
#
# In production, you'd use a Window function to keep the 
# LATEST version (like we did for orders). But for items,
# dropDuplicates is sufficient since items don't get updated.

df_items_deduped = df_items_bronze.dropDuplicates(["item_id"])
dedup_count = df_items_deduped.count()
dupes = bronze_count - dedup_count
print("STEP 2: Deduplication - " + str(dupes) + " duplicates removed")


# ----------------------------------------------------------
# Step 3: Clean and standardize
# ----------------------------------------------------------
# WHY TRIM?
# Data from source systems often has leading/trailing spaces.
# "PROD001 " != "PROD001" -- this breaks JOINs later!
# trim() removes those invisible spaces.
#
# WHY LOWER for item_status?
# Source might have "Shipped", "SHIPPED", "shipped" 
# We standardize to one format for consistent GROUP BY.
#
# WHY COALESCE for discount_percent?
# coalesce(value, default) returns:
#   - value if it's NOT null
#   - default (0.0) if value IS null
# This prevents null propagation in calculations.
# Example: 100 * null = null (bad!)
#          100 * 0.0 = 0.0 (good!)

df_items_clean = df_items_deduped \
    .withColumn("item_id", trim(col("item_id"))) \
    .withColumn("order_id", trim(col("order_id"))) \
    .withColumn("product_id", trim(col("product_id"))) \
    .withColumn("item_status", lower(trim(col("item_status")))) \
    .withColumn("quantity", col("quantity").cast("int")) \
    .withColumn("unit_price", col("unit_price").cast("double")) \
    .withColumn("discount_percent", coalesce(col("discount_percent").cast("double"), lit(0.0)))


# ----------------------------------------------------------
# Step 4: Add derived columns
# ----------------------------------------------------------
# WHY PRE-CALCULATE THESE?
# In the Gold layer and dashboards, these calculations 
# would be repeated millions of times. Pre-computing them 
# in Silver means:
#   1. Faster queries (no runtime math)
#   2. Consistent results (everyone uses same formula)
#   3. Single source of truth
#
# FORMULAS:
#   line_total = quantity * unit_price
#     Example: 2 headphones * $79.99 = $159.98
#
#   discount_amount = line_total * discount_percent / 100
#     Example: $159.98 * 10% = $15.99
#
#   net_line_total = line_total - discount_amount
#     Formula: quantity * unit_price * (1 - discount/100)
#     Example: $159.98 * (1 - 0.10) = $143.98
#
# round(value, 2) ensures we get exactly 2 decimal places
# for currency values. Without it: $143.982000001

df_items_enriched = df_items_clean \
    .withColumn("line_total",
        round(col("quantity") * col("unit_price"), 2)) \
    .withColumn("discount_amount",
        round(col("quantity") * col("unit_price") * col("discount_percent") / 100, 2)) \
    .withColumn("net_line_total",
        round(col("quantity") * col("unit_price") * (1 - col("discount_percent") / 100), 2)) \
    .withColumn("has_discount",
        when(col("discount_percent") > 0, lit(True)).otherwise(lit(False)))


# ----------------------------------------------------------
# Step 5: Data Quality - remove rows with null keys
# ----------------------------------------------------------
# WHY FILTER INSTEAD OF QUARANTINE?
# For the main orders table, we used a formal quarantine 
# process (save bad records separately). That's because 
# orders are high-value business records.
#
# For order_items, we do a simpler approach:
#   - Filter out clearly invalid rows
#   - Count them for monitoring
#   - In production, you'd quarantine these too
#
# RULES:
#   1. item_id must not be null (primary key)
#   2. order_id must not be null (foreign key to orders)
#   3. product_id must not be null (foreign key to products)
#   4. quantity must be > 0 (can't sell 0 or negative items)
#   5. unit_price must be > 0 (can't have free/negative price)
#
# The & operator means AND - ALL conditions must be true.
# The | operator means OR - ANY condition being true = bad record.

df_items_good = df_items_enriched.filter(
    col("item_id").isNotNull() &
    col("order_id").isNotNull() &
    col("product_id").isNotNull() &
    (col("quantity") > 0) &
    (col("unit_price") > 0)
)

df_items_bad = df_items_enriched.filter(
    col("item_id").isNull() |
    col("order_id").isNull() |
    col("product_id").isNull() |
    (col("quantity") <= 0) |
    (col("unit_price") <= 0)
)

good_count = df_items_good.count()
bad_count = df_items_bad.count()
print("STEP 5: Quality check - " + str(good_count) + " good, " + str(bad_count) + " bad")


# ----------------------------------------------------------
# Step 6: Add metadata and write
# ----------------------------------------------------------
# WHY ADD METADATA COLUMNS?
# _silver_processed_at: When was this row processed?
#   - Useful for debugging ("when did this data arrive?")
#   - Useful for incremental loads ("give me everything 
#     processed after yesterday")
#
# _silver_version: Track schema/logic changes
#   - Version "1.0" = current transformation logic
#   - If we change business rules later, bump to "2.0"
#   - Helps answer: "which version of logic created this?"
#
# WHY DELTA FORMAT?
# Delta Lake gives us:
#   1. ACID transactions (write either fully succeeds or fails)
#   2. Schema enforcement (wrong data types get rejected)
#   3. Time travel (query data as it was yesterday)
#   4. UPSERT support (merge new + existing data)
#   5. Audit history (who changed what, when)
#
# WHY mode("overwrite")?
# For this project, we rebuild Silver from scratch each run.
# In production, you'd use MERGE (upsert) for incremental loads.
#
# WHY overwriteSchema=True?
# If we add/remove columns between runs, Delta would normally
# reject the write. This flag says "trust me, update the schema."

df_items_silver = df_items_good \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

silver_items_path = SILVER + "/order_items"

df_items_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(silver_items_path)


# ----------------------------------------------------------
# Step 7: Verify
# ----------------------------------------------------------
# WHY READ BACK AND VERIFY?
# "Trust but verify" - we read back what we just wrote to:
#   1. Confirm the write succeeded
#   2. Check row counts match expectations
#   3. Validate schema is correct
#   4. Show sample data for visual inspection
#
# This is a PRODUCTION BEST PRACTICE.
# Many pipelines silently write corrupt or empty data.
# Verification catches these issues immediately.

df_verify = spark.read.format("delta").load(silver_items_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("SILVER ORDER ITEMS - COMPLETE")
print("=" * 65)
print("  Source (Bronze):    " + str(bronze_count) + " rows")
print("  Duplicates removed: " + str(dupes) + " rows")
print("  Quality rejected:   " + str(bad_count) + " rows")
print("  Final Silver:       " + str(final_count) + " rows")
print("  Path:               " + silver_items_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample data:")
df_verify.select(
    "item_id", "order_id", "product_id", "quantity",
    "unit_price", "discount_percent", "line_total", "net_line_total"
).show(5, truncate=False)

# WHY SHOW DISTRIBUTIONS?
# This is "sanity checking" - does the data LOOK right?
# If item_status shows "xyz_garbage", we know something is wrong.
# If one product has 99% of sales, that's suspicious.

print("\n  Item status distribution:")
df_verify.groupBy("item_status").count().orderBy(desc("count")).show()

print("\n  Top 10 products by quantity sold:")
df_verify.groupBy("product_id") \
    .agg(
        sum("quantity").alias("total_qty"),
        sum("net_line_total").alias("total_revenue"),
        count("*").alias("order_count")
    ) \
    .orderBy(desc("total_qty")) \
    .show(10, truncate=False)

print("[DONE] Silver Order Items complete!")
print("[NEXT] Cell 6 - Silver Customers (PII Masking)")

STEP 1: Bronze Order Items read - 4904 rows
STEP 2: Deduplication - 0 duplicates removed
STEP 5: Quality check - 4904 good, 0 bad

SILVER ORDER ITEMS - COMPLETE
  Source (Bronze):    4904 rows
  Duplicates removed: 0 rows
  Quality rejected:   0 rows
  Final Silver:       4904 rows
  Path:               abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/order_items

  Schema:
root
 |-- item_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount_percent: double (nullable = true)
 |-- item_status: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- line_total: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- net_line_total: double (nullable = true)
 |-- has_discount: boolean (nullable = true)
 |-- _silver_processed_at: timestamp (nullable = true)
 |-- _silver_version: string (nullabl

In [0]:
# ============================================================
# CELL 6: SILVER LAYER - CUSTOMERS TRANSFORMATION
# ============================================================
#
# WHY IS THIS THE MOST IMPORTANT SILVER TABLE?
# ---------------------------------------------
# 1. PII MASKING (Privacy compliance)
#    - GDPR (Europe), CCPA (California), DPDPA (India)
#    - Storing plaintext emails/phones is a LEGAL RISK
#    - Companies get fined millions for PII exposure
#    - In interviews: "How do you handle PII?" is very common
#
# 2. NESTED JSON FLATTENING
#    - Source JSON has nested structs (address, preferences)
#    - Analytics tools (SQL, Power BI) can't query nested data
#    - We must flatten into individual columns
#
# 3. DATA ENRICHMENT
#    - Derive age from date_of_birth
#    - Create age_group for segmentation
#    - Calculate customer tenure
#
# BRONZE ISSUES TO FIX:
#   - 53 null emails (10%) -> keep but flag
#   - Plaintext email -> SHA256 hash
#   - Plaintext phone -> mask (show last 4 only)
#   - Nested address struct -> flatten to columns
#   - Nested preferences struct -> flatten
#   - date_of_birth is string -> convert to date
#   - registration_date is string -> convert to date
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Read Bronze Customers
# ----------------------------------------------------------
# WHY multiLine=True?
# Your customers.json is a JSON ARRAY format:
#   [ {"customer_id": "CUST001", ...}, {"customer_id": "CUST002", ...} ]
#
# This is ONE json structure spanning MULTIPLE lines.
# Without multiLine=True, Spark expects one JSON per line
# and would fail or read garbage.
#
# Compare with clickstream.json which is JSON-Lines format:
#   {"event_id": "EVT001", ...}
#   {"event_id": "EVT002", ...}
# That one does NOT need multiLine=True.

df_cust_bronze = spark.read \
    .option("multiLine", True) \
    .json(BRONZE + "/source2_customers_api/customers.json")

bronze_count = df_cust_bronze.count()
print("STEP 1: Bronze Customers read - " + str(bronze_count) + " rows")
print("  Columns: " + str(df_cust_bronze.columns))


# ----------------------------------------------------------
# Step 2: Flatten nested STRUCT columns
# ----------------------------------------------------------
# WHAT IS A STRUCT IN SPARK?
# A struct is a column that CONTAINS other columns inside it.
# In your schema:
#   address: struct
#     |-- city: string
#     |-- state: string
#     |-- zip: string
#     |-- street: string
#     |-- country: string
#
# To access nested fields, use DOT notation: col("address.city")
# But for analytics, we want flat columns: address_city, address_state
#
# WHY FLATTEN?
# 1. SQL queries can't easily filter on nested fields
# 2. Power BI / Tableau can't read nested structs
# 3. JOIN conditions need flat columns
# 4. GROUP BY on nested fields is complex and slow
#
# WHAT ABOUT PREFERENCES?
# preferences has arrays inside (categories, communication)
# We convert arrays to comma-separated strings for simplicity.
# Example: ["Electronics", "Fashion"] -> "Electronics,Fashion"

df_cust_flat = df_cust_bronze \
    .withColumn("address_street", col("address.street")) \
    .withColumn("address_city", col("address.city")) \
    .withColumn("address_state", col("address.state")) \
    .withColumn("address_zip", col("address.zip")) \
    .withColumn("address_country", col("address.country")) \
    .withColumn("pref_categories", concat_ws(",", col("preferences.categories"))) \
    .withColumn("pref_communication", concat_ws(",", col("preferences.communication"))) \
    .drop("address", "preferences")

print("STEP 2: Nested structs flattened")
print("  address -> address_street, address_city, address_state, address_zip, address_country")
print("  preferences -> pref_categories, pref_communication")


# ----------------------------------------------------------
# Step 3: PII MASKING
# ----------------------------------------------------------
# THIS IS CRITICAL FOR INTERVIEWS!
#
# TECHNIQUE 1: SHA-256 HASHING (for email)
#   sha2("john@email.com", 256) -> "a1b2c3d4e5f6..."
#   - One-way: cannot reverse hash back to email
#   - Deterministic: same email always gives same hash
#   - WHY deterministic matters: we can still JOIN and 
#     GROUP BY on the hash! Two orders from same email 
#     will have the same hash.
#   - We also extract the DOMAIN (gmail.com, yahoo.com)
#     because domain is not PII but useful for analytics
#
# TECHNIQUE 2: MASKING (for phone)
#   "+1-555-0123" -> "***-***-0123"
#   - Shows last 4 digits (enough for customer service)
#   - Hides the rest
#   - substring(phone, -4, 4) gets last 4 characters
#
# TECHNIQUE 3: NAME MASKING
#   "John" -> "J****"
#   - First initial kept (useful for display)
#   - Rest replaced with asterisks
#
# WHAT DO WE DROP?
# Original email and phone columns are DROPPED entirely.
# They should NEVER exist in Silver/Gold layers.
# Only Bronze (raw) retains the original PII.

df_cust_masked = df_cust_flat \
    .withColumn("email_hash",
        when(col("email").isNotNull(), sha2(lower(trim(col("email"))), 256))
        .otherwise(lit(None))) \
    .withColumn("email_domain",
        when(col("email").isNotNull(), regexp_extract(col("email"), "@(.+)$", 1))
        .otherwise(lit(None))) \
    .withColumn("phone_masked",
        when(col("phone").isNotNull(),
            concat(lit("***-***-"), substring(regexp_replace(col("phone"), "[^0-9]", ""), -4, 4)))
        .otherwise(lit(None))) \
    .withColumn("first_name_initial", substring(col("first_name"), 1, 1)) \
    .withColumn("last_name_initial", substring(col("last_name"), 1, 1)) \
    .drop("email", "phone")

print("STEP 3: PII Masking applied")
print("  email  -> sha256 hash + domain extracted (original DROPPED)")
print("  phone  -> masked to ***-***-XXXX (original DROPPED)")
print("  names  -> initials extracted (originals KEPT for now)")

# NOTE ON NAMES: In a real production system, you might also
# drop first_name and last_name entirely, keeping only initials.
# For this project, we keep them to demonstrate the choice.


# ----------------------------------------------------------
# Step 4: Data type conversions and enrichment
# ----------------------------------------------------------
# WHY CONVERT date_of_birth FROM STRING TO DATE?
# As string: "1990-05-15" - Spark treats it as text
#   - Can't do date math (calculate age)
#   - Can't filter by date ranges
#   - Sorts incorrectly ("9" > "10" in string sort!)
# As date: 1990-05-15 - Spark treats it as a date
#   - Can calculate age: datediff(today, dob) / 365
#   - Can filter: WHERE dob > '1990-01-01'
#   - Sorts correctly
#
# AGE CALCULATION:
#   datediff(current_date, date_of_birth) gives days
#   Divide by 365.25 (accounts for leap years)
#   cast to int (drop decimals: 34.7 -> 34)
#
# AGE GROUPS: 
#   Used for customer segmentation and marketing
#   "Show me revenue by age group" is a common business ask
#
# CUSTOMER TENURE:
#   How long has this person been a customer?
#   New customers (< 90 days) behave differently than 
#   loyal customers (2+ years). This drives business strategy.

df_cust_enriched = df_cust_flat \
    .withColumn("customer_id", trim(col("customer_id"))) \
    .withColumn("date_of_birth", to_date(col("date_of_birth"))) \
    .withColumn("registration_date", to_date(col("registration_date"))) \
    .withColumn("gender",
        when(upper(col("gender")).isin("M", "MALE"), lit("Male"))
        .when(upper(col("gender")).isin("F", "FEMALE"), lit("Female"))
        .otherwise(lit("Other"))) \
    .withColumn("loyalty_tier", initcap(trim(col("loyalty_tier")))) \
    .withColumn("age",
        floor(datediff(current_date(), col("date_of_birth")) / 365.25).cast("int")) \
    .withColumn("age_group",
        when(col("age") < 25, lit("18-24"))
        .when(col("age") < 35, lit("25-34"))
        .when(col("age") < 45, lit("35-44"))
        .when(col("age") < 55, lit("45-54"))
        .when(col("age") < 65, lit("55-64"))
        .otherwise(lit("65+"))) \
    .withColumn("customer_tenure_days",
        datediff(current_date(), col("registration_date"))) \
    .withColumn("tenure_category",
        when(col("customer_tenure_days") < 90, lit("New (< 3 months)"))
        .when(col("customer_tenure_days") < 365, lit("Growing (3-12 months)"))
        .when(col("customer_tenure_days") < 730, lit("Established (1-2 years)"))
        .otherwise(lit("Loyal (2+ years)")))

# WAIT - we need to apply masking to the ENRICHED dataframe!
# Let me combine steps 3 and 4 properly:

df_cust_silver_prep = df_cust_enriched \
    .withColumn("email_hash",
        when(col("email").isNotNull(), sha2(lower(trim(col("email"))), 256))
        .otherwise(lit(None))) \
    .withColumn("email_domain",
        when(col("email").isNotNull(), regexp_extract(col("email"), "@(.+)$", 1))
        .otherwise(lit(None))) \
    .withColumn("phone_masked",
        when(col("phone").isNotNull(),
            concat(lit("***-***-"), substring(regexp_replace(col("phone"), "[^0-9]", ""), -4, 4)))
        .otherwise(lit(None))) \
    .withColumn("first_name_initial", substring(col("first_name"), 1, 1)) \
    .withColumn("last_name_initial", substring(col("last_name"), 1, 1)) \
    .drop("email", "phone")

print("STEP 4: Data types converted and columns enriched")


# ----------------------------------------------------------
# Step 5: Deduplicate on customer_id
# ----------------------------------------------------------
df_cust_deduped = df_cust_silver_prep.dropDuplicates(["customer_id"])
dedup_count = df_cust_deduped.count()
dupes = bronze_count - dedup_count
print("STEP 5: Deduplication - " + str(dupes) + " duplicates removed")


# ----------------------------------------------------------
# Step 6: Data Quality check
# ----------------------------------------------------------
# We keep customers with null emails (they are valid customers
# who just didn't provide email). But we FLAG them.

df_cust_final = df_cust_deduped \
    .withColumn("has_email", col("email_hash").isNotNull()) \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0")) \
    .withColumn("_pii_masked", lit(True))

null_email_count = df_cust_final.filter(col("has_email") == False).count()
print("STEP 6: Quality flags added - " + str(null_email_count) + " customers without email")


# ----------------------------------------------------------
# Step 7: Write to Silver
# ----------------------------------------------------------
silver_customers_path = SILVER + "/customers"

df_cust_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(silver_customers_path)


# ----------------------------------------------------------
# Step 8: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(silver_customers_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("SILVER CUSTOMERS - COMPLETE")
print("=" * 65)
print("  Source (Bronze):    " + str(bronze_count) + " rows")
print("  Duplicates removed: " + str(dupes) + " rows")
print("  Final Silver:       " + str(final_count) + " rows")
print("  PII Masked:         email (sha256), phone (masked), names (initials)")
print("  Nested Flattened:   address (5 cols), preferences (2 cols)")
print("  Path:               " + silver_customers_path)

print("\n  Schema:")
df_verify.printSchema()

# Show PII masking result
print("\n  PII Masking verification (compare with bronze):")
df_verify.select(
    "customer_id", "first_name", "first_name_initial",
    "email_hash", "email_domain", "phone_masked"
).show(5, truncate=False)

# Show flattened address
print("\n  Flattened address verification:")
df_verify.select(
    "customer_id", "address_city", "address_state", "address_zip"
).show(5, truncate=False)

# Show enriched columns
print("\n  Enriched columns verification:")
df_verify.select(
    "customer_id", "age", "age_group", "gender",
    "loyalty_tier", "customer_tenure_days", "tenure_category"
).show(5, truncate=False)

# Distributions
print("\n  Gender distribution:")
df_verify.groupBy("gender").count().orderBy(desc("count")).show()

print("\n  Age group distribution:")
df_verify.groupBy("age_group").count().orderBy("age_group").show()

print("\n  Loyalty tier distribution:")
df_verify.groupBy("loyalty_tier").count().orderBy(desc("count")).show()

print("\n  Tenure category distribution:")
df_verify.groupBy("tenure_category").count().orderBy(desc("count")).show()

print("\n  Email coverage:")
df_verify.groupBy("has_email").count().show()

print("[DONE] Silver Customers complete!")
print("[NEXT] Cell 7 - Silver Products")

STEP 1: Bronze Customers read - 500 rows
  Columns: ['address', 'customer_id', 'date_of_birth', 'email', 'first_name', 'gender', 'last_name', 'loyalty_tier', 'phone', 'preferences', 'registration_date']
STEP 2: Nested structs flattened
  address -> address_street, address_city, address_state, address_zip, address_country
  preferences -> pref_categories, pref_communication
STEP 3: PII Masking applied
  email  -> sha256 hash + domain extracted (original DROPPED)
  phone  -> masked to ***-***-XXXX (original DROPPED)
  names  -> initials extracted (originals KEPT for now)
STEP 4: Data types converted and columns enriched
STEP 5: Deduplication - 0 duplicates removed
STEP 6: Quality flags added - 53 customers without email

SILVER CUSTOMERS - COMPLETE
  Source (Bronze):    500 rows
  Duplicates removed: 0 rows
  Final Silver:       500 rows
  PII Masked:         email (sha256), phone (masked), names (initials)
  Nested Flattened:   address (5 cols), preferences (2 cols)
  Path:             

In [0]:
# ============================================================
# CELL 7: SILVER LAYER - PRODUCTS TRANSFORMATION
# ============================================================
#
# WHAT IS THIS TABLE?
# -------------------
# The product catalog - master data for all items we sell.
# This becomes dim_product in the Gold layer (Star Schema).
#
# "Master data" means it changes slowly (new products added,
# prices updated occasionally). Unlike orders which grow fast.
# In data modeling, this is called a "Slowly Changing Dimension".
#
# BRONZE ISSUES TO FIX:
#   - Nested "attributes" struct (color array, battery_life, connectivity)
#   - created_at and updated_at are strings -> convert to timestamps
#   - Derive: price_tier, profit_margin, margin_percentage
#   - No null prices in our data, but we handle it defensively
#
# WHY IS THIS TABLE IMPORTANT?
# Every revenue query needs product info:
#   "Revenue by category" = JOIN fact_sales WITH dim_product
#   "Top brands by margin" = product table only
#   "Products to discontinue" = low rating + low margin
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Read Bronze Products
# ----------------------------------------------------------
df_prod_bronze = spark.read \
    .option("multiLine", True) \
    .json(BRONZE + "/source3_products_mongo/products.json")

bronze_count = df_prod_bronze.count()
print("STEP 1: Bronze Products read - " + str(bronze_count) + " rows")


# ----------------------------------------------------------
# Step 2: Flatten nested attributes struct
# ----------------------------------------------------------
# Your products have this nested structure:
#   attributes: struct
#     |-- battery_life: string ("20 hours")
#     |-- color: array of strings (["Black", "White"])
#     |-- connectivity: string ("Bluetooth 5.0")
#
# We extract each field into its own column.
#
# FOR THE COLOR ARRAY:
# arrays can't be used in GROUP BY or WHERE easily.
# concat_ws(",", array) converts ["Black","White"] -> "Black,White"
# This is called "denormalization" - trading storage for query speed.

df_prod_flat = df_prod_bronze \
    .withColumn("attr_battery_life", col("attributes.battery_life")) \
    .withColumn("attr_colors", concat_ws(",", col("attributes.color"))) \
    .withColumn("attr_connectivity", col("attributes.connectivity")) \
    .drop("attributes")

print("STEP 2: Nested attributes flattened")
print("  attributes.battery_life -> attr_battery_life")
print("  attributes.color (array) -> attr_colors (comma-separated string)")
print("  attributes.connectivity -> attr_connectivity")


# ----------------------------------------------------------
# Step 3: Data type conversions
# ----------------------------------------------------------
# WHY CONVERT created_at and updated_at?
# They come as strings like "2024-01-15T10:30:00Z"
# As strings, we can't:
#   - Calculate "products added last month"
#   - Sort by creation date properly
#   - Filter by date ranges
#
# to_timestamp() parses the string into a proper timestamp type.

df_prod_typed = df_prod_flat \
    .withColumn("product_id", trim(col("product_id"))) \
    .withColumn("product_name", trim(col("product_name"))) \
    .withColumn("category", initcap(trim(col("category")))) \
    .withColumn("sub_category", initcap(trim(col("sub_category")))) \
    .withColumn("brand", initcap(trim(col("brand")))) \
    .withColumn("supplier_id", trim(col("supplier_id"))) \
    .withColumn("price", col("price").cast("double")) \
    .withColumn("cost_price", col("cost_price").cast("double")) \
    .withColumn("weight_kg", col("weight_kg").cast("double")) \
    .withColumn("rating", col("rating").cast("double")) \
    .withColumn("review_count", col("review_count").cast("int")) \
    .withColumn("is_active", col("is_active").cast("boolean")) \
    .withColumn("created_at", to_timestamp(col("created_at"))) \
    .withColumn("updated_at", to_timestamp(col("updated_at")))

print("STEP 3: Data types standardized")


# ----------------------------------------------------------
# Step 4: Derive business columns
# ----------------------------------------------------------
# PROFIT MARGIN:
#   profit_margin = price - cost_price
#   Example: Selling at $100, cost is $60 -> margin = $40
#
# MARGIN PERCENTAGE:
#   margin_pct = (price - cost) / price * 100
#   Example: ($100 - $60) / $100 * 100 = 40%
#   This tells us what % of revenue is profit
#   Healthy e-commerce margin: 30-50%
#
# PRICE TIER:
#   Categorize products by price range for analysis
#   "What % of revenue comes from Premium products?"
#   Business teams think in tiers, not exact prices
#
# RATING CATEGORY:
#   Convert numeric rating to human-readable label
#   Used in dashboards: "Show me all Poor-rated products"
#
# PRODUCT AGE:
#   How long has this product been in our catalog?
#   Old products with low sales might need to be discontinued

df_prod_enriched = df_prod_typed \
    .withColumn("profit_margin",
        round(col("price") - col("cost_price"), 2)) \
    .withColumn("margin_pct",
        when(col("price") > 0,
            round((col("price") - col("cost_price")) / col("price") * 100, 2))
        .otherwise(lit(0.0))) \
    .withColumn("price_tier",
        when(col("price") < 50, lit("Budget"))
        .when(col("price") < 200, lit("Mid-Range"))
        .when(col("price") < 500, lit("Premium"))
        .otherwise(lit("Luxury"))) \
    .withColumn("rating_category",
        when(col("rating") >= 4.5, lit("Excellent"))
        .when(col("rating") >= 4.0, lit("Very Good"))
        .when(col("rating") >= 3.0, lit("Good"))
        .when(col("rating") >= 2.0, lit("Average"))
        .otherwise(lit("Poor"))) \
    .withColumn("product_age_days",
        datediff(current_date(), col("created_at")))

print("STEP 4: Business columns derived")
print("  profit_margin, margin_pct, price_tier")
print("  rating_category, product_age_days")


# ----------------------------------------------------------
# Step 5: Deduplicate and quality check
# ----------------------------------------------------------
df_prod_deduped = df_prod_enriched.dropDuplicates(["product_id"])
dedup_count = df_prod_deduped.count()
dupes = bronze_count - dedup_count

# Quality: ensure product_id and price are valid
df_prod_good = df_prod_deduped.filter(
    col("product_id").isNotNull() &
    col("price").isNotNull() &
    (col("price") > 0)
)

df_prod_bad = df_prod_deduped.filter(
    col("product_id").isNull() |
    col("price").isNull() |
    (col("price") <= 0)
)

good_count = df_prod_good.count()
bad_count = df_prod_bad.count()

print("STEP 5: Quality check")
print("  Duplicates removed: " + str(dupes))
print("  Good records:       " + str(good_count))
print("  Bad records:        " + str(bad_count))


# ----------------------------------------------------------
# Step 6: Add metadata and write
# ----------------------------------------------------------
df_prod_silver = df_prod_good \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

silver_products_path = SILVER + "/products"

df_prod_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(silver_products_path)


# ----------------------------------------------------------
# Step 7: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(silver_products_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("SILVER PRODUCTS - COMPLETE")
print("=" * 65)
print("  Source (Bronze):    " + str(bronze_count) + " rows")
print("  Duplicates removed: " + str(dupes))
print("  Quality rejected:   " + str(bad_count))
print("  Final Silver:       " + str(final_count) + " rows")
print("  Path:               " + silver_products_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample data:")
df_verify.select(
    "product_id", "product_name", "category", "brand",
    "price", "cost_price", "profit_margin", "margin_pct", "price_tier"
).show(5, truncate=False)

# Flattened attributes
print("\n  Flattened attributes sample:")
df_verify.select(
    "product_id", "attr_battery_life", "attr_colors", "attr_connectivity"
).show(5, truncate=False)

# Category distribution
print("\n  Category distribution:")
df_verify.groupBy("category").agg(
    count("*").alias("product_count"),
    round(avg("price"), 2).alias("avg_price"),
    round(avg("margin_pct"), 2).alias("avg_margin_pct")
).orderBy(desc("product_count")).show()

# Price tier distribution
print("\n  Price tier distribution:")
df_verify.groupBy("price_tier").agg(
    count("*").alias("count"),
    round(avg("price"), 2).alias("avg_price"),
    round(avg("rating"), 2).alias("avg_rating")
).orderBy("avg_price").show()

# Rating distribution
print("\n  Rating category distribution:")
df_verify.groupBy("rating_category").count().orderBy(desc("count")).show()

# Active vs Inactive
print("\n  Active status:")
df_verify.groupBy("is_active").count().show()

print("[DONE] Silver Products complete!")
print("[NEXT] Cell 8 - Silver Inventory")

STEP 1: Bronze Products read - 50 rows
STEP 2: Nested attributes flattened
  attributes.battery_life -> attr_battery_life
  attributes.color (array) -> attr_colors (comma-separated string)
  attributes.connectivity -> attr_connectivity
STEP 3: Data types standardized
STEP 4: Business columns derived
  profit_margin, margin_pct, price_tier
  rating_category, product_age_days
STEP 5: Quality check
  Duplicates removed: 0
  Good records:       50
  Bad records:        0

SILVER PRODUCTS - COMPLETE
  Source (Bronze):    50 rows
  Duplicates removed: 0
  Quality rejected:   0
  Final Silver:       50 rows
  Path:               abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/products

  Schema:
root
 |-- brand: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cost_price: double (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- product_id: string (nullable = true)
 |-- 

In [0]:
# ============================================================
# CELL 8: SILVER LAYER - INVENTORY TRANSFORMATION
# ============================================================
#
# WHAT IS THIS TABLE?
# -------------------
# Daily snapshot of stock levels per product per warehouse.
# Each row says: "On this date, warehouse WH001 had 500 
# units of PROD001, with 45 reserved for pending orders."
#
# This is a SNAPSHOT table (not transactional).
# Every day, a new snapshot is taken of current stock levels.
#
# WHY IS INVENTORY DATA IMPORTANT?
# - "Which products are out of stock?" -> lost revenue
# - "Which warehouses need restocking?" -> supply chain
# - "What's our inventory turnover rate?" -> efficiency
# - Demand forecasting needs inventory context
#
# BRONZE ISSUES TO FIX:
#   - 4 rows with NEGATIVE stock levels (data error from source)
#     Negative stock is physically impossible.
#     Could be caused by:
#       a) Overselling (sold more than available)
#       b) System sync delay between warehouses
#       c) Data entry error
#     Our fix: Set to 0 and FLAG the row
#
#   - Need derived columns:
#     quantity_available = on_hand - reserved
#     stock_status (Out of Stock / Low / Medium / In Stock)
#     needs_reorder flag
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Read Bronze Inventory
# ----------------------------------------------------------
df_inv_bronze = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(BRONZE + "/source5_inventory_csv/inventory.csv")

bronze_count = df_inv_bronze.count()
print("STEP 1: Bronze Inventory read - " + str(bronze_count) + " rows")


# ----------------------------------------------------------
# Step 2: Fix negative stock levels
# ----------------------------------------------------------
# WHY NOT JUST DELETE NEGATIVE ROWS?
# Deleting loses information. Instead, we:
#   1. FLAG the row (_had_negative_stock = true)
#   2. SET the value to 0 (physically correct)
#   3. Keep the row (the product+warehouse combo is valid)
#
# This way, analysts can:
#   - Find all flagged rows: WHERE _had_negative_stock = true
#   - Investigate root cause with the operations team
#   - Still get accurate stock totals (0 instead of -28)
#
# greatest(value, 0) returns whichever is larger.
#   greatest(-28, 0) -> 0
#   greatest(500, 0) -> 500
# This is a clean way to "floor" a value at 0.

neg_count = df_inv_bronze.filter(col("quantity_on_hand") < 0).count()
print("STEP 2: Found " + str(neg_count) + " negative stock rows")

df_inv_fixed = df_inv_bronze \
    .withColumn("_had_negative_stock",
        when(col("quantity_on_hand") < 0, lit(True)).otherwise(lit(False))) \
    .withColumn("quantity_on_hand",
        when(col("quantity_on_hand") < 0, lit(0))
        .otherwise(col("quantity_on_hand")))

print("  Negative values set to 0 and flagged")


# ----------------------------------------------------------
# Step 3: Clean and standardize
# ----------------------------------------------------------
# WHY TRIM product_id?
# Your data generator doesn't add trailing spaces, but in 
# real systems, CSV files often have trailing spaces.
# If product_id = "PROD001 " (with space), then joining to 
# products table where product_id = "PROD001" would FAIL.
# trim() is defensive coding - costs nothing, prevents bugs.

df_inv_clean = df_inv_fixed \
    .withColumn("product_id", trim(col("product_id"))) \
    .withColumn("warehouse_id", trim(col("warehouse_id"))) \
    .withColumn("quantity_on_hand", col("quantity_on_hand").cast("int")) \
    .withColumn("quantity_reserved", col("quantity_reserved").cast("int")) \
    .withColumn("reorder_point", col("reorder_point").cast("int")) \
    .withColumn("reorder_quantity", col("reorder_quantity").cast("int")) \
    .withColumn("last_restock_date", col("last_restock_date").cast("date")) \
    .withColumn("snapshot_date", col("snapshot_date").cast("date"))

print("STEP 3: Data types standardized")


# ----------------------------------------------------------
# Step 4: Derive business columns
# ----------------------------------------------------------
# QUANTITY AVAILABLE:
#   = on_hand - reserved
#   "Reserved" means items allocated to pending orders
#   but not yet shipped. They're physically in the warehouse
#   but spoken for.
#   Available = what we can actually sell right now.
#
#   greatest(..., 0) prevents negative available 
#   (if reserved > on_hand due to timing)
#
# STOCK STATUS:
#   Business teams don't think in numbers.
#   They think: "Is it in stock or not?"
#   We categorize into 4 buckets:
#     Out of Stock: available = 0 -> URGENT, losing sales
#     Low Stock: available <= reorder_point -> ORDER NOW
#     Medium Stock: available <= 2x reorder_point -> PLAN ORDER
#     In Stock: plenty of stock -> NO ACTION
#
# NEEDS REORDER:
#   Simple boolean: should purchasing team place an order?
#   When on_hand drops to/below reorder_point, flag it.
#   In a real system, this could trigger an automated 
#   purchase order via an API.
#
# DAYS SINCE RESTOCK:
#   How many days since the last restock?
#   If it's been 90+ days and stock is low, something 
#   is wrong with the supply chain.

df_inv_enriched = df_inv_clean \
    .withColumn("quantity_available",
        greatest(col("quantity_on_hand") - col("quantity_reserved"), lit(0))) \
    .withColumn("stock_status",
        when(col("quantity_available") == 0, lit("Out of Stock"))
        .when(col("quantity_available") <= col("reorder_point"), lit("Low Stock"))
        .when(col("quantity_available") <= col("reorder_point") * 2, lit("Medium Stock"))
        .otherwise(lit("In Stock"))) \
    .withColumn("needs_reorder",
        when(col("quantity_on_hand") <= col("reorder_point"), lit(True))
        .otherwise(lit(False))) \
    .withColumn("days_since_restock",
        datediff(current_date(), col("last_restock_date"))) \
    .withColumn("stock_value",
        col("quantity_on_hand").cast("double") * lit(50.0))

# NOTE on stock_value: In a real system, we'd JOIN with products
# to get actual cost_price. Here we use a placeholder average
# cost of $50. In the Gold layer, we'll do the proper JOIN.

print("STEP 4: Business columns derived")
print("  quantity_available, stock_status, needs_reorder")
print("  days_since_restock, stock_value")


# ----------------------------------------------------------
# Step 5: Deduplicate
# ----------------------------------------------------------
# Inventory has a COMPOSITE KEY:
#   product_id + warehouse_id + snapshot_date
# This means "one row per product per warehouse per day"
# If we get duplicates, keep just one.

df_inv_deduped = df_inv_enriched.dropDuplicates(
    ["product_id", "warehouse_id", "snapshot_date"]
)
dedup_count = df_inv_deduped.count()
dupes = bronze_count - dedup_count + 0  # adding 0 to avoid issues
print("STEP 5: Deduplication - " + str(dupes) + " duplicates removed")


# ----------------------------------------------------------
# Step 6: Quality check
# ----------------------------------------------------------
df_inv_good = df_inv_deduped.filter(
    col("product_id").isNotNull() &
    col("warehouse_id").isNotNull() &
    col("snapshot_date").isNotNull()
)

bad_count = df_inv_deduped.count() - df_inv_good.count()
print("STEP 6: Quality check - " + str(bad_count) + " bad records")


# ----------------------------------------------------------
# Step 7: Add metadata and write
# ----------------------------------------------------------
df_inv_silver = df_inv_good \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

silver_inventory_path = SILVER + "/inventory"

df_inv_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(silver_inventory_path)


# ----------------------------------------------------------
# Step 8: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(silver_inventory_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("SILVER INVENTORY - COMPLETE")
print("=" * 65)
print("  Source (Bronze):     " + str(bronze_count) + " rows")
print("  Negative stock fixed: " + str(neg_count) + " rows")
print("  Duplicates removed:  " + str(dupes))
print("  Quality rejected:    " + str(bad_count))
print("  Final Silver:        " + str(final_count) + " rows")
print("  Path:                " + silver_inventory_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample data:")
df_verify.select(
    "product_id", "warehouse_id", "quantity_on_hand",
    "quantity_reserved", "quantity_available",
    "stock_status", "needs_reorder"
).show(10, truncate=False)

# Show the fixed negative stock rows
print("\n  Previously negative stock (now fixed to 0):")
df_verify.filter(col("_had_negative_stock") == True).select(
    "product_id", "warehouse_id", "quantity_on_hand",
    "stock_status", "_had_negative_stock"
).show(truncate=False)

# Stock status summary
print("\n  Stock status distribution:")
df_verify.groupBy("stock_status").agg(
    count("*").alias("count"),
    round(avg("quantity_available"), 0).alias("avg_available")
).orderBy("count").show()

# Warehouse summary
print("\n  Stock by warehouse:")
df_verify.groupBy("warehouse_id").agg(
    count("*").alias("products"),
    sum("quantity_on_hand").alias("total_on_hand"),
    sum("quantity_available").alias("total_available"),
    sum(col("needs_reorder").cast("int")).alias("need_reorder_count")
).orderBy("warehouse_id").show()

# Reorder alert
reorder_count = df_verify.filter(col("needs_reorder") == True).count()
print("  REORDER ALERT: " + str(reorder_count) + " product-warehouse combos need restocking!")

print("\n[DONE] Silver Inventory complete!")
print("[NEXT] Cell 9 - Silver Clickstream")

STEP 1: Bronze Inventory read - 150 rows
STEP 2: Found 4 negative stock rows
  Negative values set to 0 and flagged
STEP 3: Data types standardized
STEP 4: Business columns derived
  quantity_available, stock_status, needs_reorder
  days_since_restock, stock_value
STEP 5: Deduplication - 0 duplicates removed
STEP 6: Quality check - 0 bad records

SILVER INVENTORY - COMPLETE
  Source (Bronze):     150 rows
  Negative stock fixed: 4 rows
  Duplicates removed:  0
  Quality rejected:    0
  Final Silver:        150 rows
  Path:                abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/inventory

  Schema:
root
 |-- product_id: string (nullable = true)
 |-- warehouse_id: string (nullable = true)
 |-- quantity_on_hand: integer (nullable = true)
 |-- quantity_reserved: integer (nullable = true)
 |-- reorder_point: integer (nullable = true)
 |-- reorder_quantity: integer (nullable = true)
 |-- last_restock_date: date (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |--

In [0]:
# ============================================================
# CELL 9: SILVER LAYER - CLICKSTREAM TRANSFORMATION
# ============================================================
#
# WHAT IS CLICKSTREAM DATA?
# -------------------------
# Every click, scroll, search, and action a user takes on
# the website/app is recorded as an "event".
#
# This is the HIGHEST VOLUME data source (3000 rows here,
# but in production: millions per day).
#
# WHY IS IT VALUABLE?
# - Understanding user behavior (what do people browse?)
# - Conversion funnel analysis (browse -> cart -> buy)
# - Personalization (recommend based on browsing history)
# - A/B testing (which page layout gets more clicks?)
# - Fraud detection (bot-like clicking patterns)
#
# BRONZE ISSUES TO FIX:
#   - 626 NULL customer_ids (20%) -> anonymous/not-logged-in users
#     These are VALID events, not errors. Many users browse
#     without logging in. We keep them but flag them.
#   - Nested geo_location struct -> flatten
#   - event_timestamp is string -> convert to timestamp
#   - Derive: session metrics, time parts, funnel flags
#
# TWO OUTPUT TABLES:
#   1. silver/clickstream  -> event-level (every single click)
#   2. silver/sessions     -> session-level (aggregated per visit)
#   Session aggregation is key for funnel analysis.
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# ----------------------------------------------------------
# Step 1: Read Bronze Clickstream
# ----------------------------------------------------------
# WHY NO multiLine=True?
# Your clickstream.json is JSON-Lines format (one JSON per line):
#   {"event_id": "EVT001", ...}
#   {"event_id": "EVT002", ...}
# This is the standard format for streaming/high-volume data
# because each line can be parsed independently (parallelizable).
# JSON array format (used by customers/products) requires
# reading the entire file to find the closing bracket.

df_clicks_bronze = spark.read \
    .json(BRONZE + "/source4_clickstream_eventhub/clickstream.json")

bronze_count = df_clicks_bronze.count()
print("STEP 1: Bronze Clickstream read - " + str(bronze_count) + " rows")


# ----------------------------------------------------------
# Step 2: Flatten nested geo_location struct
# ----------------------------------------------------------
# geo_location: struct
#   |-- city: string
#   |-- country: string
#
# Same technique as customers.address - dot notation to extract.

df_clicks_flat = df_clicks_bronze \
    .withColumn("geo_city", col("geo_location.city")) \
    .withColumn("geo_country", col("geo_location.country")) \
    .drop("geo_location")

print("STEP 2: geo_location flattened -> geo_city, geo_country")


# ----------------------------------------------------------
# Step 3: Data type conversions and cleaning
# ----------------------------------------------------------
# WHY CONVERT event_timestamp FROM STRING?
# As string: "2025-01-15T10:30:00Z" -> can't do time math
# As timestamp: can calculate session duration, hourly patterns
#
# WHY TRIM all string columns?
# Clickstream data comes from web browsers and mobile apps.
# User agents, URLs, and other fields often have extra spaces
# or inconsistent formatting. trim() normalizes everything.

df_clicks_clean = df_clicks_flat \
    .withColumn("event_id", trim(col("event_id"))) \
    .withColumn("session_id", trim(col("session_id"))) \
    .withColumn("customer_id", trim(col("customer_id"))) \
    .withColumn("event_type", lower(trim(col("event_type")))) \
    .withColumn("event_timestamp", to_timestamp(col("event_timestamp"))) \
    .withColumn("product_id", trim(col("product_id"))) \
    .withColumn("device_type", lower(trim(col("device_type")))) \
    .withColumn("browser", initcap(trim(col("browser")))) \
    .withColumn("os", initcap(trim(col("os")))) \
    .withColumn("page_url", trim(col("page_url"))) \
    .withColumn("referrer", lower(trim(col("referrer")))) \
    .withColumn("search_query", lower(trim(col("search_query"))))

print("STEP 3: Data types standardized")


# ----------------------------------------------------------
# Step 4: Derive event-level columns
# ----------------------------------------------------------
# EVENT DATE & TIME PARTS:
#   Breaking timestamp into parts enables fast filtering:
#   "Show me events from last Monday" -> WHERE day_name = 'Monday'
#   "Peak hours analysis" -> GROUP BY event_hour
#
# IS_ANONYMOUS:
#   Flag for events without a customer_id.
#   20% of our events are anonymous (not logged in).
#   This is normal for e-commerce - people browse before logging in.
#
# IS_PURCHASE_INTENT:
#   Which event types signal buying intent?
#   add_to_cart and checkout = high intent
#   page_view = low intent (just browsing)
#   This flag helps conversion funnel analysis:
#   "What % of sessions show purchase intent?"
#
# FUNNEL STAGE:
#   E-commerce conversion funnel:
#     Awareness  -> user visits site (page_view)
#     Interest   -> user looks at products (product_view)
#     Desire     -> user adds to cart or searches (add_to_cart, search)
#     Action     -> user buys (checkout)
#   Mapping events to funnel stages enables funnel analysis:
#   "Where do we lose the most customers?"

df_clicks_enriched = df_clicks_clean \
    .withColumn("event_date", to_date(col("event_timestamp"))) \
    .withColumn("event_hour", hour(col("event_timestamp"))) \
    .withColumn("event_day_of_week", dayofweek(col("event_timestamp"))) \
    .withColumn("day_name", date_format(col("event_timestamp"), "EEEE")) \
    .withColumn("is_weekend",
        when(dayofweek(col("event_timestamp")).isin(1, 7), lit(True))
        .otherwise(lit(False))) \
    .withColumn("is_anonymous",
        when(col("customer_id").isNull(), lit(True)).otherwise(lit(False))) \
    .withColumn("is_purchase_intent",
        when(col("event_type").isin("add_to_cart", "checkout"), lit(True))
        .otherwise(lit(False))) \
    .withColumn("funnel_stage",
        when(col("event_type") == "page_view", lit("1-Awareness"))
        .when(col("event_type") == "product_view", lit("2-Interest"))
        .when(col("event_type").isin("add_to_cart", "search", "remove_from_cart"), lit("3-Consideration"))
        .when(col("event_type") == "checkout", lit("4-Purchase"))
        .otherwise(lit("Other")))

print("STEP 4: Event-level columns derived")
print("  event_date, event_hour, day_name, is_weekend")
print("  is_anonymous, is_purchase_intent, funnel_stage")


# ----------------------------------------------------------
# Step 5: Deduplicate on event_id
# ----------------------------------------------------------
df_clicks_deduped = df_clicks_enriched.dropDuplicates(["event_id"])
dedup_count = df_clicks_deduped.count()
dupes = bronze_count - dedup_count
print("STEP 5: Deduplication - " + str(dupes) + " duplicates removed")


# ----------------------------------------------------------
# Step 6: Write event-level Silver table
# ----------------------------------------------------------
df_clicks_silver = df_clicks_deduped \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

silver_clicks_path = SILVER + "/clickstream"

df_clicks_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .option("overwriteSchema", True) \
    .save(silver_clicks_path)

events_final = spark.read.format("delta").load(silver_clicks_path).count()
print("STEP 6: Event-level table written - " + str(events_final) + " rows")


# ----------------------------------------------------------
# Step 7: Build SESSION-level aggregation
# ----------------------------------------------------------
# WHAT IS A SESSION?
# A session is one "visit" to the website.
# Identified by session_id (set by the browser/app).
# One session can have many events (page views, clicks, etc.)
#
# WHY AGGREGATE TO SESSION LEVEL?
# Event-level data is too granular for most analysis.
# Business questions are at the session level:
#   "Average session duration?" -> session level
#   "Conversion rate?" -> sessions with checkout / total sessions
#   "Bounce rate?" -> sessions with only 1 event / total sessions
#
# WHAT WE CALCULATE PER SESSION:
#   - total_events: how many clicks in this visit
#   - products_viewed: how many unique products they looked at
#   - session_start/end: first and last event timestamp
#   - session_duration: end - start (in seconds)
#   - has_cart_activity: did they add anything to cart?
#   - has_checkout: did they complete a purchase?
#   - bounce: only 1 event = left immediately
#   - engagement_level: High/Medium/Low/Bounce based on event count

df_sessions = df_clicks_deduped \
    .groupBy("session_id", "customer_id", "device_type", "browser", "os") \
    .agg(
        count("*").alias("total_events"),
        countDistinct("event_type").alias("unique_event_types"),
        countDistinct("product_id").alias("products_viewed"),
        min("event_timestamp").alias("session_start"),
        max("event_timestamp").alias("session_end"),
        sum(when(col("event_type") == "page_view", lit(1)).otherwise(lit(0))).alias("page_views"),
        sum(when(col("event_type") == "product_view", lit(1)).otherwise(lit(0))).alias("product_views"),
        sum(when(col("event_type") == "add_to_cart", lit(1)).otherwise(lit(0))).alias("cart_adds"),
        sum(when(col("event_type") == "remove_from_cart", lit(1)).otherwise(lit(0))).alias("cart_removes"),
        sum(when(col("event_type") == "checkout", lit(1)).otherwise(lit(0))).alias("checkouts"),
        sum(when(col("event_type") == "search", lit(1)).otherwise(lit(0))).alias("searches"),
        first("referrer").alias("entry_referrer"),
        first("geo_city").alias("geo_city"),
        first("geo_country").alias("geo_country")
    ) \
    .withColumn("session_duration_sec",
        unix_timestamp(col("session_end")) - unix_timestamp(col("session_start"))) \
    .withColumn("session_duration_min",
        round(col("session_duration_sec") / 60, 2)) \
    .withColumn("has_cart_activity",
        when(col("cart_adds") > 0, lit(True)).otherwise(lit(False))) \
    .withColumn("has_checkout",
        when(col("checkouts") > 0, lit(True)).otherwise(lit(False))) \
    .withColumn("is_bounce",
        when(col("total_events") == 1, lit(True)).otherwise(lit(False))) \
    .withColumn("is_anonymous",
        when(col("customer_id").isNull(), lit(True)).otherwise(lit(False))) \
    .withColumn("engagement_level",
        when(col("total_events") >= 10, lit("High"))
        .when(col("total_events") >= 5, lit("Medium"))
        .when(col("total_events") >= 2, lit("Low"))
        .otherwise(lit("Bounce"))) \
    .withColumn("session_date", to_date(col("session_start"))) \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

# Write sessions
silver_sessions_path = SILVER + "/sessions"

df_sessions.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(silver_sessions_path)

sessions_final = spark.read.format("delta").load(silver_sessions_path).count()
print("STEP 7: Session-level table written - " + str(sessions_final) + " sessions")


# ----------------------------------------------------------
# Step 8: Verify both tables
# ----------------------------------------------------------
df_events = spark.read.format("delta").load(silver_clicks_path)
df_sess = spark.read.format("delta").load(silver_sessions_path)

print("")
print("=" * 65)
print("SILVER CLICKSTREAM - COMPLETE (2 tables)")
print("=" * 65)
print("  TABLE 1: Event-Level")
print("    Rows:     " + str(df_events.count()))
print("    Path:     " + silver_clicks_path)
print("  TABLE 2: Session-Level")
print("    Rows:     " + str(df_sess.count()))
print("    Path:     " + silver_sessions_path)

print("\n  Event-level schema:")
df_events.printSchema()

print("\n  Event sample:")
df_events.select(
    "event_id", "session_id", "customer_id", "event_type",
    "product_id", "device_type", "funnel_stage", "is_anonymous"
).show(5, truncate=False)

print("\n  Event type distribution:")
df_events.groupBy("event_type", "funnel_stage").count().orderBy("funnel_stage").show()

print("\n  Device distribution:")
df_events.groupBy("device_type").count().orderBy(desc("count")).show()

print("\n  Anonymous vs logged-in events:")
df_events.groupBy("is_anonymous").count().show()

# Session-level stats
print("\n  Session-level schema:")
df_sess.printSchema()

print("\n  Session sample:")
df_sess.select(
    "session_id", "customer_id", "total_events", "products_viewed",
    "session_duration_min", "has_checkout", "engagement_level"
).show(5, truncate=False)

print("\n  Engagement distribution:")
df_sess.groupBy("engagement_level").agg(
    count("*").alias("sessions"),
    round(avg("total_events"), 1).alias("avg_events"),
    round(avg("session_duration_min"), 1).alias("avg_duration_min"),
    round(avg("products_viewed"), 1).alias("avg_products")
).orderBy("engagement_level").show()

# Conversion funnel
total_sessions = df_sess.count()
cart_sessions = df_sess.filter(col("has_cart_activity") == True).count()
checkout_sessions = df_sess.filter(col("has_checkout") == True).count()

print("\n  CONVERSION FUNNEL:")
print("    Total Sessions:     " + str(total_sessions))
print("    With Cart Activity: " + str(cart_sessions) + " (" + str(int(cart_sessions * 100 / total_sessions)) + " pct)")
print("    With Checkout:      " + str(checkout_sessions) + " (" + str(int(checkout_sessions * 100 / total_sessions)) + " pct)")

print("\n[DONE] Silver Clickstream complete!")
print("[NEXT] Cell 10 - Silver Payments")

STEP 1: Bronze Clickstream read - 3000 rows
STEP 2: geo_location flattened -> geo_city, geo_country
STEP 3: Data types standardized
STEP 4: Event-level columns derived
  event_date, event_hour, day_name, is_weekend
  is_anonymous, is_purchase_intent, funnel_stage
STEP 5: Deduplication - 0 duplicates removed
STEP 6: Event-level table written - 3000 rows
STEP 7: Session-level table written - 3000 sessions

SILVER CLICKSTREAM - COMPLETE (2 tables)
  TABLE 1: Event-Level
    Rows:     3000
    Path:     abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/clickstream
  TABLE 2: Session-Level
    Rows:     3000
    Path:     abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/sessions

  Event-level schema:
root
 |-- browser: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- ip_address: stri

In [0]:
# ============================================================
# CELL 10: SILVER LAYER - PAYMENTS TRANSFORMATION
# ============================================================
#
# WHAT IS THIS TABLE?
# -------------------
# Every payment transaction linked to an order.
# One order = one payment (in our data).
# In real systems, one order could have:
#   - Multiple payment attempts (first failed, retry succeeded)
#   - Split payments (half credit card, half wallet)
#   - Refund transactions
#
# WHY IS PAYMENTS DATA IMPORTANT?
# - Revenue recognition: only "success" payments = real revenue
# - Fraud detection: high risk scores, unusual patterns
# - Payment method analysis: which methods have highest failure?
# - Reconciliation: match payments to orders to bank statements
#
# BRONZE ISSUES TO FIX:
#   - transaction_timestamp is string -> convert to timestamp
#   - Standardize status values
#   - Add risk level categorization
#   - Add fraud signal flags
#   - Link validation (every payment should have a valid order)
#
# INTERVIEW GOLD: This table enables FRAUD DETECTION ML later.
# We're building features now that the ML model will use.
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Read Bronze Payments
# ----------------------------------------------------------
df_pay_bronze = spark.read \
    .option("multiLine", True) \
    .json(BRONZE + "/source6_payments_api/payments.json")

bronze_count = df_pay_bronze.count()
print("STEP 1: Bronze Payments read - " + str(bronze_count) + " rows")


# ----------------------------------------------------------
# Step 2: Data type conversions
# ----------------------------------------------------------
# WHY IS transaction_timestamp TRICKY?
# Payment systems often use different date formats:
#   - "2025-01-15T10:30:00Z" (ISO 8601 with Z)
#   - "2025-01-15T10:30:00" (ISO without timezone)
#   - "01/15/2025 10:30:00" (US format)
#   - "15-01-2025 10:30:00" (EU format)
#
# coalesce() with multiple to_timestamp() patterns tries
# each format until one works. This handles inconsistencies
# WITHOUT failing the pipeline.
#
# In your data, it's consistent ISO format, but defensive
# coding protects against future source changes.

df_pay_typed = df_pay_bronze \
    .withColumn("transaction_id", trim(col("transaction_id"))) \
    .withColumn("order_id", trim(col("order_id"))) \
    .withColumn("payment_method", lower(trim(col("payment_method")))) \
    .withColumn("card_type", lower(trim(col("card_type")))) \
    .withColumn("amount", col("amount").cast("double")) \
    .withColumn("currency", upper(trim(coalesce(col("currency"), lit("USD"))))) \
    .withColumn("status", lower(trim(col("status")))) \
    .withColumn("gateway_response_code", trim(col("gateway_response_code"))) \
    .withColumn("is_international", col("is_international").cast("boolean")) \
    .withColumn("risk_score", col("risk_score").cast("int")) \
    .withColumn("ip_address", trim(col("ip_address"))) \
    .withColumn("device_fingerprint", trim(col("device_fingerprint"))) \
    .withColumn("transaction_timestamp",
        coalesce(
            to_timestamp(col("transaction_timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"),
            to_timestamp(col("transaction_timestamp"), "yyyy-MM-dd'T'HH:mm:ss"),
            to_timestamp(col("transaction_timestamp"), "yyyy-MM-dd HH:mm:ss"),
            to_timestamp(col("transaction_timestamp"))
        ))

# Check how many timestamps parsed successfully
null_ts = df_pay_typed.filter(col("transaction_timestamp").isNull()).count()
print("STEP 2: Data types converted")
print("  Unparseable timestamps: " + str(null_ts))


# ----------------------------------------------------------
# Step 3: Standardize payment status
# ----------------------------------------------------------
# WHY STANDARDIZE?
# Different payment gateways return different status strings:
#   Stripe: "succeeded", "failed"
#   PayPal: "COMPLETED", "DENIED"
#   Razorpay: "captured", "failed"
#
# We normalize to: SUCCESS, FAILED, PENDING, REFUNDED
# This makes reporting consistent regardless of gateway.

df_pay_std = df_pay_typed \
    .withColumn("status_original", col("status")) \
    .withColumn("status",
        when(col("status").isin("success", "succeeded", "completed", "captured", "approved"),
            lit("SUCCESS"))
        .when(col("status").isin("failed", "failure", "declined", "denied", "rejected"),
            lit("FAILED"))
        .when(col("status").isin("pending", "processing", "initiated"),
            lit("PENDING"))
        .when(col("status").isin("refunded", "reversed", "voided"),
            lit("REFUNDED"))
        .otherwise(upper(col("status"))))

print("STEP 3: Payment status standardized")


# ----------------------------------------------------------
# Step 4: Add risk categorization and fraud signals
# ----------------------------------------------------------
# RISK LEVEL:
#   Based on the numeric risk_score (1-99) from payment gateway.
#   We categorize into business-friendly labels.
#   These become features for our ML fraud model later.
#
# FRAUD SIGNAL FLAGS:
#   Each flag represents a suspicious pattern:
#
#   is_high_risk: risk_score >= 70
#     Gateway's own risk assessment is high
#
#   is_off_hours: transaction between 11 PM and 5 AM
#     Legitimate purchases happen less at 3 AM
#     Fraudsters often operate during off-hours
#
#   is_high_amount: amount > $2000
#     Large purchases have higher fraud risk
#     Card testing often uses small amounts, but
#     actual fraud uses large amounts
#
#   is_international: cross-border transaction
#     Higher fraud risk than domestic
#
#   fraud_signal_count: how many flags are TRUE
#     0 signals = very safe
#     1 signal = monitor
#     2 signals = review
#     3+ signals = likely fraud, block and investigate
#
# WHY BUILD THESE IN SILVER (not Gold)?
#   These flags are reusable across multiple Gold tables
#   and ML models. Computing them once in Silver ensures
#   consistency. The ML model in Cell 16+ will use these
#   as features.

df_pay_enriched = df_pay_std \
    .withColumn("risk_level",
        when(col("risk_score") >= 80, lit("CRITICAL"))
        .when(col("risk_score") >= 60, lit("HIGH"))
        .when(col("risk_score") >= 40, lit("MEDIUM"))
        .when(col("risk_score") >= 20, lit("LOW"))
        .otherwise(lit("VERY_LOW"))) \
    .withColumn("is_high_risk",
        when(col("risk_score") >= 70, lit(True)).otherwise(lit(False))) \
    .withColumn("transaction_hour", hour(col("transaction_timestamp"))) \
    .withColumn("is_off_hours",
        when((hour(col("transaction_timestamp")) >= 23) |
             (hour(col("transaction_timestamp")) <= 5),
            lit(True)).otherwise(lit(False))) \
    .withColumn("is_high_amount",
        when(col("amount") > 2000, lit(True)).otherwise(lit(False))) \
    .withColumn("fraud_signal_count",
        col("is_high_risk").cast("int") +
        col("is_off_hours").cast("int") +
        col("is_high_amount").cast("int") +
        col("is_international").cast("int")) \
    .withColumn("fraud_risk_label",
        when(col("fraud_signal_count") >= 3, lit("CRITICAL"))
        .when(col("fraud_signal_count") >= 2, lit("HIGH"))
        .when(col("fraud_signal_count") >= 1, lit("MEDIUM"))
        .otherwise(lit("LOW"))) \
    .withColumn("transaction_date", to_date(col("transaction_timestamp"))) \
    .withColumn("transaction_day_of_week", dayofweek(col("transaction_timestamp"))) \
    .withColumn("day_name", date_format(col("transaction_timestamp"), "EEEE"))

print("STEP 4: Risk and fraud signals added")
print("  risk_level, is_high_risk, is_off_hours")
print("  is_high_amount, fraud_signal_count, fraud_risk_label")


# ----------------------------------------------------------
# Step 5: Deduplicate on transaction_id
# ----------------------------------------------------------
df_pay_deduped = df_pay_enriched.dropDuplicates(["transaction_id"])
dedup_count = df_pay_deduped.count()
dupes = bronze_count - dedup_count
print("STEP 5: Deduplication - " + str(dupes) + " duplicates removed")


# ----------------------------------------------------------
# Step 6: Quality check
# ----------------------------------------------------------
df_pay_good = df_pay_deduped.filter(
    col("transaction_id").isNotNull() &
    col("order_id").isNotNull() &
    col("amount").isNotNull() &
    (col("amount") > 0) &
    col("transaction_timestamp").isNotNull()
)

bad_count = df_pay_deduped.count() - df_pay_good.count()
print("STEP 6: Quality check - " + str(bad_count) + " bad records")


# ----------------------------------------------------------
# Step 7: Add metadata and write
# ----------------------------------------------------------
df_pay_silver = df_pay_good \
    .withColumn("_silver_processed_at", current_timestamp()) \
    .withColumn("_silver_version", lit("1.0"))

silver_payments_path = SILVER + "/payments"

df_pay_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(silver_payments_path)


# ----------------------------------------------------------
# Step 8: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(silver_payments_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("SILVER PAYMENTS - COMPLETE")
print("=" * 65)
print("  Source (Bronze):     " + str(bronze_count) + " rows")
print("  Duplicates removed:  " + str(dupes))
print("  Quality rejected:    " + str(bad_count))
print("  Final Silver:        " + str(final_count) + " rows")
print("  Path:                " + silver_payments_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample data:")
df_verify.select(
    "transaction_id", "order_id", "amount", "status",
    "payment_method", "risk_score", "risk_level", "fraud_risk_label"
).show(5, truncate=False)

# Payment status
print("\n  Payment status distribution:")
df_verify.groupBy("status").agg(
    count("*").alias("count"),
    round(sum("amount"), 2).alias("total_amount"),
    round(avg("amount"), 2).alias("avg_amount")
).orderBy(desc("count")).show()

# Risk level
print("\n  Risk level distribution:")
df_verify.groupBy("risk_level").agg(
    count("*").alias("count"),
    round(avg("risk_score"), 1).alias("avg_risk_score")
).orderBy("avg_risk_score").show()

# Fraud signals
print("\n  Fraud risk label distribution:")
df_verify.groupBy("fraud_risk_label").agg(
    count("*").alias("count"),
    round(avg("fraud_signal_count"), 1).alias("avg_signals")
).orderBy(desc("avg_signals")).show()

# Payment method
print("\n  Payment method distribution:")
df_verify.groupBy("payment_method").agg(
    count("*").alias("count"),
    round(sum("amount"), 2).alias("total_amount")
).orderBy(desc("count")).show()

# High risk transactions
high_risk_count = df_verify.filter(col("fraud_risk_label").isin("HIGH", "CRITICAL")).count()
print("  FRAUD ALERT: " + str(high_risk_count) + " transactions flagged as HIGH/CRITICAL risk")

print("\n  Sample HIGH/CRITICAL risk transactions:")
df_verify.filter(col("fraud_risk_label").isin("HIGH", "CRITICAL")).select(
    "transaction_id", "amount", "risk_score", "is_off_hours",
    "is_high_amount", "is_international", "fraud_signal_count"
).show(5, truncate=False)

print("\n[DONE] Silver Payments complete!")
print("=" * 65)
print("ALL SILVER TABLES COMPLETE!")
print("=" * 65)
print("  1. silver/orders       - 1948 rows (52 quarantined)")
print("  2. silver/order_items  - 4904 rows")
print("  3. silver/customers    - 500 rows (PII masked)")
print("  4. silver/products     - 50 rows (attributes flattened)")
print("  5. silver/inventory    - 150 rows (neg stock fixed)")
print("  6. silver/clickstream  - 3000 rows (events)")
print("  7. silver/sessions     - 3000 rows (session aggregates)")
print("  8. silver/payments     - 2000 rows (fraud signals added)")
print("")
print("[NEXT] Cell 11 - GOLD LAYER (Star Schema: Dimensions + Facts)")

STEP 1: Bronze Payments read - 2000 rows
STEP 2: Data types converted
  Unparseable timestamps: 0
STEP 3: Payment status standardized
STEP 4: Risk and fraud signals added
  risk_level, is_high_risk, is_off_hours
  is_high_amount, fraud_signal_count, fraud_risk_label
STEP 5: Deduplication - 0 duplicates removed
STEP 6: Quality check - 0 bad records

SILVER PAYMENTS - COMPLETE
  Source (Bronze):     2000 rows
  Duplicates removed:  0
  Quality rejected:    0
  Final Silver:        2000 rows
  Path:                abfss://silver@dlsshopsmartdev123.dfs.core.windows.net/payments

  Schema:
root
 |-- amount: double (nullable = true)
 |-- card_type: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device_fingerprint: string (nullable = true)
 |-- gateway_response_code: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- is_international: boolean (nullable = true)
 |-- order_id: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |--

In [0]:
# ============================================================
# CELL 11: GOLD LAYER - dim_date (Date Dimension)
# ============================================================
#
# WHAT IS A DATE DIMENSION?
# -------------------------
# A pre-built calendar table with one row per day.
# Instead of calculating "is this a weekend?" or 
# "what quarter is this?" in every query, we calculate
# it ONCE and store it.
#
# Every fact table joins to dim_date via a date key.
# Example: fact_sales.order_date_key -> dim_date.date_key
#
# WHY NOT JUST USE THE DATE COLUMN DIRECTLY?
# 1. Performance: pre-computed attributes avoid runtime functions
# 2. Consistency: everyone uses the same fiscal year definition
# 3. Filtering: "Show Q4 2025" is just WHERE quarter = 4
# 4. Custom attributes: holidays, fiscal periods, pay days
#    can't be derived from a raw date
#
# THIS IS ASKED IN EVERY DATA ENGINEERING INTERVIEW.
# "How would you design a date dimension?" is a classic question.
#
# We generate dates from 2024-01-01 to 2026-12-31 (3 years)
# to cover all historical and future data.
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Generate a sequence of dates
# ----------------------------------------------------------
# HOW THIS WORKS:
# 1. sequence() creates an array of dates from start to end
#    sequence(2024-01-01, 2026-12-31) = [2024-01-01, 2024-01-02, ...]
# 2. explode() converts the array into individual rows
#    One array of 1096 dates -> 1096 rows
#
# This is a common Spark pattern for generating reference data
# without needing an external source.

df_dates = spark.sql("""
    SELECT explode(sequence(
        to_date('2024-01-01'),
        to_date('2026-12-31'),
        interval 1 day
    )) as date
""")

total_dates = df_dates.count()
print("STEP 1: Generated " + str(total_dates) + " dates (2024-2026)")


# ----------------------------------------------------------
# Step 2: Build all date attributes
# ----------------------------------------------------------
# Each attribute serves a specific analytics purpose:
#
# date_key (int): 20250115 format
#   Used as JOIN key with fact tables.
#   Integer keys are faster than date JOINs.
#   Format: YYYYMMDD
#
# year, quarter, month, day: Basic components
#   "Revenue by quarter" -> GROUP BY quarter
#
# month_name, day_name: Human-readable labels
#   Dashboards show "January" not "1"
#
# week_of_year: For weekly reporting
#   "Week-over-week growth" needs this
#
# is_weekend: Saturday/Sunday flag
#   "Weekend vs weekday sales patterns"
#   dayofweek returns 1=Sunday, 7=Saturday
#
# is_month_start, is_month_end: Boundary flags
#   Financial reporting often focuses on month boundaries
#
# quarter_label: "Q1-2025" format
#   Clean label for dashboard filters
#
# day_of_year: 1-365
#   Useful for year-over-year comparison at the day level

df_dim_date = df_dates.select(
    date_format(col("date"), "yyyyMMdd").cast("int").alias("date_key"),
    col("date").alias("full_date"),
    year("date").alias("year"),
    quarter("date").alias("quarter"),
    month("date").alias("month"),
    dayofmonth("date").alias("day"),
    date_format("date", "MMMM").alias("month_name"),
    date_format("date", "MMM").alias("month_short"),
    dayofweek("date").alias("day_of_week"),
    date_format("date", "EEEE").alias("day_name"),
    date_format("date", "EEE").alias("day_short"),
    weekofyear("date").alias("week_of_year"),
    dayofyear("date").alias("day_of_year"),
    when(dayofweek("date").isin(1, 7), lit(True)).otherwise(lit(False)).alias("is_weekend"),
    when(dayofmonth("date") == 1, lit(True)).otherwise(lit(False)).alias("is_month_start"),
    when(col("date") == last_day("date"), lit(True)).otherwise(lit(False)).alias("is_month_end"),
    when(month("date").isin(11, 12, 1), lit(True)).otherwise(lit(False)).alias("is_holiday_season"),
    concat(lit("Q"), quarter("date").cast("string"), lit("-"), year("date").cast("string")).alias("quarter_label"),
    concat(date_format("date", "MMM"), lit("-"), year("date").cast("string")).alias("month_year_label"),
    when(month("date") <= 6, lit(1)).otherwise(lit(2)).alias("half_year"),
    when(month("date") <= 6, lit("H1")).otherwise(lit("H2")).alias("half_year_label")
)

print("STEP 2: Date attributes built - " + str(len(df_dim_date.columns)) + " columns")


# ----------------------------------------------------------
# Step 3: Write to Gold layer
# ----------------------------------------------------------
gold_dim_date_path = GOLD + "/dim_date"

df_dim_date.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(gold_dim_date_path)


# ----------------------------------------------------------
# Step 4: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(gold_dim_date_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("GOLD dim_date - COMPLETE")
print("=" * 65)
print("  Total dates:  " + str(final_count) + " rows")
print("  Date range:   2024-01-01 to 2026-12-31")
print("  Columns:      " + str(len(df_verify.columns)))
print("  Path:         " + gold_dim_date_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample (first 5 days of 2025):")
df_verify.filter(
    (col("year") == 2025) & (col("month") == 1) & (col("day") <= 5)
).orderBy("full_date").show(truncate=False)

print("\n  Weekend vs Weekday count:")
df_verify.groupBy("is_weekend").count().show()

print("\n  Records per year:")
df_verify.groupBy("year").count().orderBy("year").show()

print("\n  Quarter labels sample:")
df_verify.select("quarter_label").distinct().orderBy("quarter_label").show(12)

print("[DONE] Gold dim_date complete!")
print("[NEXT] Cell 12 - Gold dim_customer")

STEP 1: Generated 1096 dates (2024-2026)
STEP 2: Date attributes built - 21 columns

GOLD dim_date - COMPLETE
  Total dates:  1096 rows
  Date range:   2024-01-01 to 2026-12-31
  Columns:      21
  Path:         abfss://gold@dlsshopsmartdev123.dfs.core.windows.net/dim_date

  Schema:
root
 |-- date_key: integer (nullable = true)
 |-- full_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- month_short: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- day_short: string (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- is_month_start: boolean (nullable = true)
 |-- is_month_end: boolean (nullable = true)
 |-- is_holiday_season: boolean (nullable = t

In [0]:
# ============================================================
# CELL 12: GOLD LAYER - dim_customer (Customer Dimension)
# ============================================================
#
# WHAT IS dim_customer?
# ---------------------
# The customer dimension describes WHO made the purchase.
# Every row in fact_sales will JOIN to dim_customer to answer:
#   "Revenue by loyalty tier"
#   "Orders by age group"
#   "Customer count by state"
#
# SOURCE: silver/customers (already cleaned & PII masked)
#
# WHAT WE ADD IN GOLD:
# Gold layer is about BUSINESS PERSPECTIVE, not cleaning.
# We select only the columns analysts need and add
# surrogate keys.
#
# SURROGATE KEY vs NATURAL KEY:
# - Natural key: customer_id ("CUST001") - from source system
# - Surrogate key: customer_sk (1, 2, 3...) - generated by us
#
# WHY SURROGATE KEYS?
# 1. Performance: integer JOINs faster than string JOINs
# 2. Independence: if source changes CUST001 to C-001, 
#    our surrogate key stays the same
# 3. History: enables SCD Type 2 (tracking changes over time)
# 4. Standard practice: every data warehouse uses them
#
# SCD TYPE 2 (Slowly Changing Dimension):
# When a customer moves from "Silver" to "Gold" loyalty tier,
# we want to keep BOTH versions:
#   customer_sk=1, CUST001, Silver, effective 2024-01-01 to 2025-06-01
#   customer_sk=2, CUST001, Gold,   effective 2025-06-01 to 9999-12-31
#
# This lets us analyze: "What was the customer's tier WHEN 
# they placed that order in March?" — historical accuracy.
#
# For this project, we implement a SIMPLIFIED SCD Type 2
# (single version per customer since we have point-in-time data).
# In production, you'd run this incrementally with MERGE.
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Read Silver Customers
# ----------------------------------------------------------
df_cust_silver = spark.read.format("delta").load(SILVER + "/customers")

silver_count = df_cust_silver.count()
print("STEP 1: Silver Customers read - " + str(silver_count) + " rows")


# ----------------------------------------------------------
# Step 2: Build dim_customer with surrogate key
# ----------------------------------------------------------
# monotonically_increasing_id() generates unique IDs.
# NOTE: These IDs are NOT sequential (1, 2, 3...).
# They're globally unique across partitions.
# For a true sequential key, we use row_number() instead.
#
# WHY row_number() OVER orderBy(customer_id)?
# - Guarantees sequential: 1, 2, 3, 4, ...
# - Deterministic: same input always gives same keys
# - Sorted: customer_sk order matches customer_id order
# This is important for debugging and testing.

from pyspark.sql.window import Window

window_sk = Window.orderBy("customer_id")

df_dim_customer = df_cust_silver \
    .withColumn("customer_sk", row_number().over(window_sk)) \
    .select(
        col("customer_sk"),
        col("customer_id"),
        col("first_name"),
        col("last_name"),
        col("first_name_initial"),
        col("last_name_initial"),
        col("email_hash"),
        col("email_domain"),
        col("phone_masked"),
        col("gender"),
        col("date_of_birth"),
        col("age"),
        col("age_group"),
        col("loyalty_tier"),
        col("registration_date"),
        col("customer_tenure_days"),
        col("tenure_category"),
        col("address_city"),
        col("address_state"),
        col("address_zip"),
        col("address_country"),
        col("pref_categories"),
        col("pref_communication"),
        col("has_email"),
        # SCD Type 2 columns
        col("registration_date").alias("effective_start_date"),
        to_date(lit("9999-12-31")).alias("effective_end_date"),
        lit(True).alias("is_current"),
        current_timestamp().alias("_gold_processed_at"),
        lit("1.0").alias("_gold_version")
    )

print("STEP 2: dim_customer built with surrogate key and SCD2 columns")
print("  Columns: " + str(len(df_dim_customer.columns)))


# ----------------------------------------------------------
# Step 3: Write to Gold layer
# ----------------------------------------------------------
gold_dim_customer_path = GOLD + "/dim_customer"

df_dim_customer.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(gold_dim_customer_path)


# ----------------------------------------------------------
# Step 4: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(gold_dim_customer_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("GOLD dim_customer - COMPLETE")
print("=" * 65)
print("  Source (Silver):  " + str(silver_count) + " rows")
print("  Final Gold:       " + str(final_count) + " rows")
print("  Columns:          " + str(len(df_verify.columns)))
print("  Path:             " + gold_dim_customer_path)

print("\n  Schema:")
df_verify.printSchema()

# Show surrogate key assignment
print("\n  Surrogate key sample:")
df_verify.select(
    "customer_sk", "customer_id", "first_name", "last_name",
    "gender", "age_group", "loyalty_tier"
).orderBy("customer_sk").show(5, truncate=False)

# Show SCD2 columns
print("\n  SCD Type 2 columns:")
df_verify.select(
    "customer_sk", "customer_id", "loyalty_tier",
    "effective_start_date", "effective_end_date", "is_current"
).orderBy("customer_sk").show(5, truncate=False)

# Show geographic distribution
print("\n  Top 10 states by customer count:")
df_verify.groupBy("address_state").count().orderBy(desc("count")).show(10)

# Loyalty by age group
print("\n  Loyalty tier by age group:")
df_verify.groupBy("age_group", "loyalty_tier") \
    .count() \
    .orderBy("age_group", "loyalty_tier") \
    .show(25)

print("[DONE] Gold dim_customer complete!")
print("[NEXT] Cell 13 - Gold dim_product")

STEP 1: Silver Customers read - 500 rows
STEP 2: dim_customer built with surrogate key and SCD2 columns




  Columns: 29

GOLD dim_customer - COMPLETE
  Source (Silver):  500 rows
  Final Gold:       500 rows
  Columns:          29
  Path:             abfss://gold@dlsshopsmartdev123.dfs.core.windows.net/dim_customer

  Schema:
root
 |-- customer_sk: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name_initial: string (nullable = true)
 |-- last_name_initial: string (nullable = true)
 |-- email_hash: string (nullable = true)
 |-- email_domain: string (nullable = true)
 |-- phone_masked: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- loyalty_tier: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- customer_tenure_days: integer (nullable = true)
 |-- tenure_category: string (nullable = true)
 |-- address_city: string

In [0]:
# ============================================================
# CELL 13: GOLD LAYER - dim_product (Product Dimension)
# ============================================================
#
# WHAT IS dim_product?
# --------------------
# Describes WHAT was sold. Every fact_sales row joins here.
# Business questions this enables:
#   "Revenue by category" 
#   "Top 10 brands by sales"
#   "Average margin by price tier"
#   "Products to discontinue (low rating + low sales)"
#
# SOURCE: silver/products (already cleaned, attributes flattened)
#
# WHAT WE ADD IN GOLD:
# - Surrogate key (product_sk)
# - Only business-relevant columns (drop technical metadata)
# - Consistent column ordering (keys first, then attributes)
#
# DESIGN PRINCIPLE:
# Dimension tables should be WIDE (many columns) but SHORT 
# (few rows). Our dim_product has 50 rows and ~25 columns.
# This is typical - a retail company might have 50,000 products
# with 30+ attributes each.
#
# The "width" of dimensions is what makes star schema powerful.
# One JOIN to dim_product gives you access to category, brand,
# price tier, rating, margin - all in one hop.
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.window import Window

# ----------------------------------------------------------
# Step 1: Read Silver Products
# ----------------------------------------------------------
df_prod_silver = spark.read.format("delta").load(SILVER + "/products")

silver_count = df_prod_silver.count()
print("STEP 1: Silver Products read - " + str(silver_count) + " rows")


# ----------------------------------------------------------
# Step 2: Build dim_product with surrogate key
# ----------------------------------------------------------
# Same row_number() approach as dim_customer.
# Ordered by product_id for deterministic key assignment.

window_sk = Window.orderBy("product_id")

df_dim_product = df_prod_silver \
    .withColumn("product_sk", row_number().over(window_sk)) \
    .select(
        # Keys (always first in a dimension)
        col("product_sk"),
        col("product_id"),
        # Descriptive attributes
        col("product_name"),
        col("category"),
        col("sub_category"),
        col("brand"),
        col("supplier_id"),
        # Price and cost
        col("price").alias("current_price"),
        col("cost_price"),
        col("profit_margin"),
        col("margin_pct"),
        col("price_tier"),
        # Product characteristics
        col("weight_kg"),
        col("rating"),
        col("review_count"),
        col("rating_category"),
        col("is_active"),
        # Flattened attributes
        col("attr_battery_life"),
        col("attr_colors"),
        col("attr_connectivity"),
        # Time attributes
        col("created_at").alias("product_created_at"),
        col("updated_at").alias("product_updated_at"),
        col("product_age_days"),
        # Gold metadata
        current_timestamp().alias("_gold_processed_at"),
        lit("1.0").alias("_gold_version")
    )

print("STEP 2: dim_product built - " + str(len(df_dim_product.columns)) + " columns")


# ----------------------------------------------------------
# Step 3: Write to Gold layer
# ----------------------------------------------------------
gold_dim_product_path = GOLD + "/dim_product"

df_dim_product.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(gold_dim_product_path)


# ----------------------------------------------------------
# Step 4: Verify
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(gold_dim_product_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("GOLD dim_product - COMPLETE")
print("=" * 65)
print("  Source (Silver):  " + str(silver_count) + " rows")
print("  Final Gold:       " + str(final_count) + " rows")
print("  Columns:          " + str(len(df_verify.columns)))
print("  Path:             " + gold_dim_product_path)

print("\n  Schema:")
df_verify.printSchema()

# Surrogate key sample
print("\n  Surrogate key sample:")
df_verify.select(
    "product_sk", "product_id", "product_name",
    "category", "brand", "current_price", "price_tier"
).orderBy("product_sk").show(5, truncate=False)

# Category and margin analysis
print("\n  Category analysis:")
df_verify.groupBy("category").agg(
    count("*").alias("products"),
    round(avg("current_price"), 2).alias("avg_price"),
    round(avg("margin_pct"), 2).alias("avg_margin_pct"),
    round(avg("rating"), 2).alias("avg_rating"),
    sum(col("is_active").cast("int")).alias("active_count")
).orderBy(desc("products")).show()

# Price tier analysis
print("\n  Price tier analysis:")
df_verify.groupBy("price_tier").agg(
    count("*").alias("products"),
    round(min("current_price"), 2).alias("min_price"),
    round(max("current_price"), 2).alias("max_price"),
    round(avg("margin_pct"), 2).alias("avg_margin_pct")
).orderBy("min_price").show()

print("[DONE] Gold dim_product complete!")
print("[NEXT] Cell 14 - Gold fact_sales (THE MAIN FACT TABLE)")

STEP 1: Silver Products read - 50 rows
STEP 2: dim_product built - 25 columns

GOLD dim_product - COMPLETE
  Source (Silver):  50 rows
  Final Gold:       50 rows
  Columns:          25
  Path:             abfss://gold@dlsshopsmartdev123.dfs.core.windows.net/dim_product

  Schema:
root
 |-- product_sk: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- supplier_id: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- cost_price: double (nullable = true)
 |-- profit_margin: double (nullable = true)
 |-- margin_pct: double (nullable = true)
 |-- price_tier: string (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- rating_category: string (nullable = true)
 |-- is_active: boolean (nullable

In [0]:
# ============================================================
# CELL 14: GOLD LAYER - fact_sales (Main Fact Table)
# ============================================================
#
# THIS IS THE HEART OF THE ENTIRE DATA PLATFORM.
#
# WHAT IS A FACT TABLE?
# ---------------------
# A fact table records BUSINESS EVENTS (things that happened).
# Each row = one item sold in one order.
# This is called the "grain" of the fact table.
#
# GRAIN = "The most atomic level of detail in the fact table"
# Our grain: ONE ORDER ITEM (not one order!)
#
# WHY ORDER ITEM LEVEL (not order level)?
# If we aggregate to order level, we lose product-level detail:
#   - "Which product sold the most?" needs item-level
#   - "Revenue by category" needs item + product JOIN
#   - "Average items per order" needs item count per order
#
# Rule: Always choose the LOWEST useful grain. You can always
# aggregate UP (items -> orders), but you can't disaggregate 
# DOWN (orders -> items) without the source data.
#
# HOW FACT TABLE CONNECTS TO DIMENSIONS:
#
#   dim_date -------- date_key -------- fact_sales
#   dim_customer ---- customer_id ----- fact_sales  
#   dim_product ----- product_id ------ fact_sales
#
# This is the STAR shape:
#           dim_date
#              |
#   dim_customer -- fact_sales -- dim_product
#
# MEASURES (numeric values we analyze):
#   quantity, unit_price, line_total, discount_amount,
#   net_line_total, shipping_amount
#
# These are the numbers we SUM, AVG, COUNT in dashboards.
#
# DEGENERATE DIMENSIONS:
#   order_id and item_id live in the fact table directly.
#   They don't have their own dimension table because
#   there's nothing more to describe about them.
#   This is called a "degenerate dimension" - a dimension
#   key without a dimension table.
# ============================================================

from pyspark.sql.functions import *
from pyspark.sql.types import *

# ----------------------------------------------------------
# Step 1: Read Silver tables we need to JOIN
# ----------------------------------------------------------
df_orders = spark.read.format("delta").load(SILVER + "/orders")
df_items = spark.read.format("delta").load(SILVER + "/order_items")

orders_count = df_orders.count()
items_count = df_items.count()
print("STEP 1: Silver data read")
print("  Orders:      " + str(orders_count) + " rows")
print("  Order Items: " + str(items_count) + " rows")


# ----------------------------------------------------------
# Step 2: JOIN orders with order_items
# ----------------------------------------------------------
# WHY JOIN IN GOLD (not Silver)?
# Silver tables are INDEPENDENT cleaned tables.
# Gold is where we COMBINE them for analytics.
#
# JOIN TYPE: INNER JOIN
# We only want items that belong to valid orders.
# If an item has an order_id that doesn't exist in orders,
# it's orphaned data and should not be in the fact table.
#
# WHAT COLUMNS COME FROM WHERE:
# From orders: customer_id, order_date, order_status, 
#              payment_method, channel, shipping_amount
# From items:  product_id, quantity, unit_price,
#              discount_percent, line_total, net_line_total
#
# We prefix nothing because column names don't conflict
# (we already designed Silver tables carefully).

df_joined = df_orders.alias("o").join(
    df_items.alias("i"),
    col("o.order_id") == col("i.order_id"),
    "inner"
)

joined_count = df_joined.count()
print("STEP 2: Orders JOIN Order Items = " + str(joined_count) + " rows")

# Check for orphaned items (items without matching orders)
orphan_items = df_items.join(df_orders, "order_id", "left_anti").count()
print("  Orphaned items (no matching order): " + str(orphan_items))


# ----------------------------------------------------------
# Step 3: Build fact_sales
# ----------------------------------------------------------
# SELECT COLUMNS carefully:
#
# KEYS (for joining to dimensions):
#   order_date_key -> joins to dim_date.date_key
#   customer_id    -> joins to dim_customer.customer_id
#   product_id     -> joins to dim_product.product_id
#
# DEGENERATE DIMENSIONS (identifiers without dim tables):
#   order_id       -> for drill-down to specific order
#   item_id        -> for uniqueness (primary key of fact)
#
# MEASURES (numbers we aggregate):
#   quantity              -> SUM for total units sold
#   unit_price            -> for price analysis
#   line_total            -> quantity * unit_price (gross)
#   discount_percent      -> for discount analysis
#   discount_amount       -> SUM for total discounts given
#   net_line_total        -> revenue after discount
#   shipping_amount       -> allocated from order level
#
# ATTRIBUTES (for filtering/grouping):
#   order_status, payment_method, channel
#   item_status
#
# FLAGS (pre-computed boolean filters):
#   is_cancelled, is_returned, has_discount
#   is_weekend
#
# TIME ATTRIBUTES (from orders, for quick filtering):
#   order_year, order_month, order_hour, day_name

df_fact_sales = df_joined.select(
    # Primary key
    col("i.item_id").alias("sales_key"),
    # Dimension keys
    date_format(col("o.order_date"), "yyyyMMdd").cast("int").alias("order_date_key"),
    col("o.customer_id"),
    col("i.product_id"),
    # Degenerate dimensions
    col("o.order_id"),
    col("i.item_id"),
    # Measures
    col("i.quantity"),
    col("i.unit_price"),
    col("i.line_total"),
    col("i.discount_percent"),
    col("i.discount_amount"),
    col("i.net_line_total"),
    col("o.shipping_amount"),
    # Order attributes
    col("o.order_date"),
    col("o.order_status"),
    col("o.payment_method"),
    col("o.channel"),
    col("i.item_status"),
    # Pre-computed flags
    col("o.is_cancelled"),
    col("o.is_returned"),
    col("i.has_discount"),
    col("o.is_weekend"),
    col("o.has_free_shipping"),
    # Time parts (for quick filtering without joining dim_date)
    col("o.order_year"),
    col("o.order_month"),
    col("o.order_day"),
    col("o.order_hour"),
    col("o.day_name"),
    # Metadata
    current_timestamp().alias("_gold_processed_at"),
    lit("1.0").alias("_gold_version")
)

print("STEP 3: fact_sales built - " + str(len(df_fact_sales.columns)) + " columns")


# ----------------------------------------------------------
# Step 4: Write to Gold layer (partitioned by year, month)
# ----------------------------------------------------------
# WHY PARTITION fact_sales?
# Fact tables are LARGE (millions of rows in production).
# Partitioning by year/month means:
#   - Query "SELECT * WHERE order_year = 2025" only reads 
#     2025 files, skipping all other years
#   - This is called PARTITION PRUNING
#   - Can reduce query time by 90%+
#
# WHY year + month (not just date)?
# - Partitioning by date creates too many small files
#   (365 partitions per year, each with few rows)
# - year + month = 12 partitions per year (optimal)
# - This is a common production pattern

gold_fact_sales_path = GOLD + "/fact_sales"

df_fact_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_year", "order_month") \
    .option("overwriteSchema", True) \
    .save(gold_fact_sales_path)


# ----------------------------------------------------------
# Step 5: Verify and analyze
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(gold_fact_sales_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("GOLD fact_sales - COMPLETE")
print("=" * 65)
print("  Orders (Silver):    " + str(orders_count))
print("  Items (Silver):     " + str(items_count))
print("  Joined rows:        " + str(joined_count))
print("  Final fact_sales:   " + str(final_count) + " rows")
print("  Columns:            " + str(len(df_verify.columns)))
print("  Partitioned by:     order_year, order_month")
print("  Path:               " + gold_fact_sales_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample data:")
df_verify.select(
    "sales_key", "order_date_key", "order_id", "customer_id",
    "product_id", "quantity", "unit_price", "net_line_total",
    "order_status", "channel"
).show(5, truncate=False)

# ============================================================
# BUSINESS KPI QUERIES (This is what the Gold layer enables!)
# ============================================================

print("\n" + "=" * 65)
print("BUSINESS KPIs FROM fact_sales")
print("=" * 65)

# KPI 1: Total Revenue
print("\n  KPI 1: Revenue Summary")
df_verify.agg(
    count("*").alias("total_line_items"),
    countDistinct("order_id").alias("total_orders"),
    countDistinct("customer_id").alias("total_customers"),
    round(sum("net_line_total"), 2).alias("total_revenue"),
    round(avg("net_line_total"), 2).alias("avg_line_item_value"),
    round(sum("discount_amount"), 2).alias("total_discounts")
).show(truncate=False)

# KPI 2: Revenue by Channel
print("\n  KPI 2: Revenue by Channel")
df_verify.groupBy("channel").agg(
    countDistinct("order_id").alias("orders"),
    round(sum("net_line_total"), 2).alias("revenue"),
    round(avg("net_line_total"), 2).alias("avg_item_value")
).orderBy(desc("revenue")).show()

# KPI 3: Revenue by Month
print("\n  KPI 3: Monthly Revenue Trend")
df_verify.groupBy("order_year", "order_month").agg(
    countDistinct("order_id").alias("orders"),
    round(sum("net_line_total"), 2).alias("revenue")
).orderBy("order_year", "order_month").show(15)

# KPI 4: Order Status Breakdown
print("\n  KPI 4: Order Status Breakdown")
df_verify.groupBy("order_status").agg(
    countDistinct("order_id").alias("orders"),
    round(sum("net_line_total"), 2).alias("revenue")
).orderBy(desc("orders")).show()

# KPI 5: Payment Method Analysis
print("\n  KPI 5: Payment Method Analysis")
df_verify.groupBy("payment_method").agg(
    countDistinct("order_id").alias("orders"),
    round(sum("net_line_total"), 2).alias("revenue")
).orderBy(desc("revenue")).show()

print("[DONE] Gold fact_sales complete!")
print("[NEXT] Cell 15 - Gold agg_daily_sales (Pre-aggregated metrics)")

STEP 1: Silver data read
  Orders:      1948 rows
  Order Items: 4904 rows
STEP 2: Orders JOIN Order Items = 4780 rows
  Orphaned items (no matching order): 124
STEP 3: fact_sales built - 30 columns

GOLD fact_sales - COMPLETE
  Orders (Silver):    1948
  Items (Silver):     4904
  Joined rows:        4780
  Final fact_sales:   4780 rows
  Columns:            30
  Partitioned by:     order_year, order_month
  Path:               abfss://gold@dlsshopsmartdev123.dfs.core.windows.net/fact_sales

  Schema:
root
 |-- sales_key: string (nullable = true)
 |-- order_date_key: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- line_total: double (nullable = true)
 |-- discount_percent: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- net_line_t

In [0]:
# ============================================================
# CELL 15: GOLD LAYER - agg_daily_sales (Pre-Aggregated Table)
# ============================================================
#
# WHAT IS A PRE-AGGREGATED TABLE?
# --------------------------------
# Instead of running heavy GROUP BY queries on 4780 rows 
# (or millions in production) every time a dashboard loads,
# we pre-compute daily summaries.
#
# Dashboard query WITHOUT pre-aggregation:
#   SELECT date, SUM(net_line_total) FROM fact_sales 
#   GROUP BY date
#   -> Scans ALL rows every time (slow at scale)
#
# Dashboard query WITH pre-aggregation:
#   SELECT * FROM agg_daily_sales WHERE date = '2025-01-15'
#   -> Reads ONE pre-computed row (instant)
#
# WHY IS THIS IMPORTANT?
# - Power BI dashboards refresh every 15-30 minutes
# - Each refresh runs all queries
# - Without pre-agg: 50 queries * millions of rows = slow
# - With pre-agg: 50 queries * hundreds of rows = instant
#
# WHAT METRICS DO WE PRE-COMPUTE?
# Everything a business executive looks at daily:
#   - Total revenue, orders, customers
#   - Average order value
#   - Cancellation and return rates
#   - Revenue by channel
#   - Units sold
#
# GRAIN: One row per day per channel
# This lets us analyze both:
#   - Total daily metrics (GROUP BY date)
#   - Channel comparison (GROUP BY date, channel)
# ============================================================

from pyspark.sql.functions import *

# ----------------------------------------------------------
# Step 1: Read fact_sales from Gold
# ----------------------------------------------------------
df_fact = spark.read.format("delta").load(GOLD + "/fact_sales")
fact_count = df_fact.count()
print("STEP 1: fact_sales read - " + str(fact_count) + " rows")


# ----------------------------------------------------------
# Step 2: Build daily aggregation by channel
# ----------------------------------------------------------
# WHY GROUP BY date AND channel?
# 
# If we only group by date:
#   2025-01-15 | revenue: $50,000 | orders: 200
#   (Can't drill down by channel)
#
# If we group by date + channel:
#   2025-01-15 | web        | revenue: $15,000 | orders: 60
#   2025-01-15 | mobile_app | revenue: $18,000 | orders: 75
#   2025-01-15 | in_store   | revenue: $10,000 | orders: 40
#   2025-01-15 | marketplace| revenue: $7,000  | orders: 25
#   (Can see channel breakdown AND roll up to daily total)
#
# METRICS EXPLAINED:
#   total_orders: COUNT DISTINCT order_id (not line items!)
#     One order with 3 items = 1 order, not 3
#
#   total_customers: unique buyers that day
#     Important for "customer acquisition" tracking
#
#   total_items_sold: SUM of quantity
#     Physical units moved (logistics metric)
#
#   gross_revenue: before discounts
#   total_discount: money given away
#   net_revenue: what we actually earned
#   avg_order_value: revenue / orders
#     Key e-commerce KPI, target is to increase this
#
#   cancel_rate: % of orders cancelled
#     High cancel rate = UX problem or fraud
#
#   return_rate: % of orders returned
#     High return rate = product quality issue

df_agg_daily = df_fact.groupBy(
    col("order_year"),
    col("order_month"),
    col("order_day"),
    to_date(col("order_date")).alias("order_date"),
    col("channel")
).agg(
    # Volume metrics
    countDistinct("order_id").alias("total_orders"),
    countDistinct("customer_id").alias("total_customers"),
    count("*").alias("total_line_items"),
    sum("quantity").alias("total_items_sold"),
    # Revenue metrics
    round(sum("line_total"), 2).alias("gross_revenue"),
    round(sum("discount_amount"), 2).alias("total_discount"),
    round(sum("net_line_total"), 2).alias("net_revenue"),
    round(sum("shipping_amount"), 2).alias("total_shipping"),
    # Averages
    round(avg("net_line_total"), 2).alias("avg_item_value"),
    round(avg("quantity"), 2).alias("avg_quantity_per_item"),
    # Status counts
    countDistinct(when(col("order_status") == "CANCELLED", col("order_id"))).alias("cancelled_orders"),
    countDistinct(when(col("order_status") == "RETURNED", col("order_id"))).alias("returned_orders"),
    countDistinct(when(col("order_status") == "DELIVERED", col("order_id"))).alias("delivered_orders"),
    # Discount metrics
    countDistinct(when(col("has_discount") == True, col("order_id"))).alias("orders_with_discount"),
    # Weekend flag
    first("is_weekend").alias("is_weekend"),
    first("day_name").alias("day_name")
)

# Add calculated rates
df_agg_enriched = df_agg_daily \
    .withColumn("avg_order_value",
        when(col("total_orders") > 0,
            round(col("net_revenue") / col("total_orders"), 2))
        .otherwise(lit(0.0))) \
    .withColumn("cancel_rate_pct",
        when(col("total_orders") > 0,
            round(col("cancelled_orders") / col("total_orders") * 100, 2))
        .otherwise(lit(0.0))) \
    .withColumn("return_rate_pct",
        when(col("total_orders") > 0,
            round(col("returned_orders") / col("total_orders") * 100, 2))
        .otherwise(lit(0.0))) \
    .withColumn("discount_rate_pct",
        when(col("total_orders") > 0,
            round(col("orders_with_discount") / col("total_orders") * 100, 2))
        .otherwise(lit(0.0))) \
    .withColumn("_gold_processed_at", current_timestamp()) \
    .withColumn("_gold_version", lit("1.0"))

agg_count = df_agg_enriched.count()
print("STEP 2: Daily aggregation built - " + str(agg_count) + " rows")


# ----------------------------------------------------------
# Step 3: Write to Gold
# ----------------------------------------------------------
gold_agg_path = GOLD + "/agg_daily_sales"

df_agg_enriched.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(gold_agg_path)


# ----------------------------------------------------------
# Step 4: Verify and show executive dashboard metrics
# ----------------------------------------------------------
df_verify = spark.read.format("delta").load(gold_agg_path)
final_count = df_verify.count()

print("")
print("=" * 65)
print("GOLD agg_daily_sales - COMPLETE")
print("=" * 65)
print("  Fact rows aggregated: " + str(fact_count))
print("  Aggregated rows:      " + str(final_count))
print("  Compression ratio:    " + str(fact_count) + " -> " + str(final_count) + " rows")
print("  Path:                 " + gold_agg_path)

print("\n  Schema:")
df_verify.printSchema()

print("\n  Sample daily data:")
df_verify.select(
    "order_date", "channel", "total_orders", "total_customers",
    "net_revenue", "avg_order_value", "cancel_rate_pct"
).orderBy(desc("order_date")).show(10, truncate=False)

# ============================================================
# EXECUTIVE DASHBOARD QUERIES
# ============================================================
print("\n" + "=" * 65)
print("EXECUTIVE DASHBOARD METRICS")
print("=" * 65)

# Overall platform metrics (roll up all channels)
print("\n  PLATFORM TOTALS:")
df_verify.agg(
    sum("total_orders").alias("total_orders"),
    sum("total_customers").alias("total_customer_visits"),
    round(sum("net_revenue"), 2).alias("total_net_revenue"),
    round(sum("total_discount"), 2).alias("total_discounts_given"),
    round(sum("total_items_sold")).alias("total_units_sold"),
    round(avg("avg_order_value"), 2).alias("platform_avg_order_value")
).show(truncate=False)

# Monthly trend (what executives look at first)
print("\n  MONTHLY REVENUE TREND:")
df_verify.groupBy("order_year", "order_month").agg(
    sum("total_orders").alias("orders"),
    round(sum("net_revenue"), 2).alias("revenue"),
    round(avg("avg_order_value"), 2).alias("avg_order_value"),
    round(avg("cancel_rate_pct"), 2).alias("avg_cancel_rate")
).orderBy("order_year", "order_month").show(15)

# Channel performance comparison
print("\n  CHANNEL PERFORMANCE:")
df_verify.groupBy("channel").agg(
    sum("total_orders").alias("orders"),
    round(sum("net_revenue"), 2).alias("revenue"),
    round(avg("avg_order_value"), 2).alias("avg_order_value"),
    round(avg("cancel_rate_pct"), 2).alias("avg_cancel_rate"),
    round(avg("return_rate_pct"), 2).alias("avg_return_rate")
).orderBy(desc("revenue")).show()

# Weekend vs Weekday
print("\n  WEEKEND vs WEEKDAY:")
df_verify.groupBy("is_weekend").agg(
    sum("total_orders").alias("orders"),
    round(sum("net_revenue"), 2).alias("revenue"),
    round(avg("avg_order_value"), 2).alias("avg_order_value")
).show()

# ============================================================
# FINAL GOLD LAYER SUMMARY
# ============================================================
print("\n" + "=" * 65)
print("GOLD LAYER - ALL TABLES COMPLETE!")
print("=" * 65)
print("  1. gold/dim_date         - 1096 rows  (calendar dimension)")
print("  2. gold/dim_customer     - 500 rows   (customer dimension)")
print("  3. gold/dim_product      - 50 rows    (product dimension)")
print("  4. gold/fact_sales       - " + str(fact_count) + " rows  (main fact table)")
print("  5. gold/agg_daily_sales  - " + str(final_count) + " rows   (pre-aggregated)")
print("")
print("  STAR SCHEMA:")
print("        dim_date")
print("           |")
print("  dim_customer --- fact_sales --- dim_product")
print("                       |")
print("                 agg_daily_sales")
print("")
print("[NEXT] Cell 16+ : ML Models (Customer Segmentation, Forecasting)")

STEP 1: fact_sales read - 4780 rows
STEP 2: Daily aggregation built - 1107 rows

GOLD agg_daily_sales - COMPLETE
  Fact rows aggregated: 4780
  Aggregated rows:      1107
  Compression ratio:    4780 -> 1107 rows
  Path:                 abfss://gold@dlsshopsmartdev123.dfs.core.windows.net/agg_daily_sales

  Schema:
root
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_day: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- channel: string (nullable = true)
 |-- total_orders: long (nullable = true)
 |-- total_customers: long (nullable = true)
 |-- total_line_items: long (nullable = true)
 |-- total_items_sold: long (nullable = true)
 |-- gross_revenue: double (nullable = true)
 |-- total_discount: double (nullable = true)
 |-- net_revenue: double (nullable = true)
 |-- total_shipping: double (nullable = true)
 |-- avg_item_value: double (nullable = true)
 |-- avg_quantity_per_item: double (nullable = true)
 |-- cancell