# ü•á Gold Layer ‚Äî Star Schema & SCD Type-2
**Project:** End-to-End Retail Lakehouse | Microsoft Fabric

**Layer:** Gold (Business-Ready / Reporting)

**Purpose:** Build the final analytics-ready star schema with SCD Type-2 for customer history.

```
Silver Tables ‚Üí Gold Star Schema
                ‚îú‚îÄ‚îÄ fact_sales
                ‚îú‚îÄ‚îÄ dim_customer   (SCD Type-2)
                ‚îú‚îÄ‚îÄ dim_product
                ‚îú‚îÄ‚îÄ dim_store
                ‚îî‚îÄ‚îÄ dim_date
```

**Star Schema:**
```
dim_date ‚îÄ‚îÄ‚îê
           ‚îú‚îÄ‚îÄ fact_sales ‚îÄ‚îÄ‚îÄ‚îÄ dim_customer (SCD2)
dim_store ‚îÄ‚îò         ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ dim_product
```

In [None]:
# ============================================================
# CELL 1 ‚Äî Configuration
# ============================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, current_timestamp, current_date, to_date,
    year, month, quarter, dayofweek, dayofmonth, weekofyear,
    date_format, when, coalesce, monotonically_increasing_id,
    sha2, concat_ws, row_number, max as spark_max, min as spark_min
)
from pyspark.sql.types import DateType, BooleanType, StringType
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import datetime

LAKEHOUSE_NAME = "RetailLakehouse"  # ‚Üê Update!
BASE   = f"abfss://your_workspace@onelake.dfs.fabric.microsoft.com/{LAKEHOUSE_NAME}.Lakehouse/Tables"
SILVER = f"{BASE}/silver"
GOLD   = f"{BASE}/gold"

SCD2_ACTIVE_END_DATE = "9999-12-31"  # Sentinel date for active records

print("‚úÖ Gold layer config ready.")

In [None]:
# ============================================================
# CELL 2 ‚Äî dim_date (Calendar Dimension)
# ============================================================
print("\nüìÖ Building dim_date...")

from pyspark.sql import Row
import datetime

# Generate date range 2020-01-01 to 2026-12-31
start = datetime.date(2020, 1, 1)
end   = datetime.date(2026, 12, 31)
date_list = [start + datetime.timedelta(days=i) for i in range((end - start).days + 1)]

date_rows = [Row(full_date=d.strftime("%Y-%m-%d")) for d in date_list]
df_dates = spark.createDataFrame(date_rows)

dim_date = df_dates \
    .withColumn("full_date",     to_date(col("full_date"), "yyyy-MM-dd")) \
    .withColumn("date_key",      date_format(col("full_date"), "yyyyMMdd").cast("int")) \
    .withColumn("year",          year(col("full_date"))) \
    .withColumn("quarter",       quarter(col("full_date"))) \
    .withColumn("month_num",     month(col("full_date"))) \
    .withColumn("month_name",    date_format(col("full_date"), "MMMM")) \
    .withColumn("month_short",   date_format(col("full_date"), "MMM")) \
    .withColumn("week_of_year",  weekofyear(col("full_date"))) \
    .withColumn("day_of_month",  dayofmonth(col("full_date"))) \
    .withColumn("day_of_week",   dayofweek(col("full_date"))) \
    .withColumn("day_name",      date_format(col("full_date"), "EEEE")) \
    .withColumn("is_weekend",    col("day_of_week").isin([1, 7])) \
    .withColumn("year_month",    date_format(col("full_date"), "yyyy-MM")) \
    .withColumn("year_quarter",  concat_ws("-Q", col("year"), col("quarter")))

dim_date.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true").save(f"{GOLD}/dim_date")

print(f"   ‚úÖ dim_date written ‚Äî {dim_date.count():,} rows")
dim_date.show(5)

In [None]:
# ============================================================
# CELL 3 ‚Äî dim_product
# ============================================================
print("\nüì¶ Building dim_product...")

silver_products = spark.read.format("delta").load(f"{SILVER}/products")

dim_product = silver_products \
    .withColumn("product_key",
        sha2(col("product_id"), 256).substr(1, 16)
    ) \
    .withColumn("dw_created_at", current_timestamp()) \
    .select(
        "product_key", "product_id", "product_name", "category",
        "sub_category", "unit_price", "cost_price", "gross_margin_pct",
        "price_tier", "supplier", "in_stock", "dw_created_at"
    )

dim_product.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true").save(f"{GOLD}/dim_product")

print(f"   ‚úÖ dim_product written ‚Äî {dim_product.count():,} rows")
dim_product.show(5)

In [None]:
# ============================================================
# CELL 4 ‚Äî dim_store
# ============================================================
print("\nüè™ Building dim_store...")

silver_stores = spark.read.format("delta").load(f"{SILVER}/stores")

dim_store = silver_stores \
    .withColumn("store_key", sha2(col("store_id"), 256).substr(1, 16)) \
    .withColumn("dw_created_at", current_timestamp()) \
    .select(
        "store_key", "store_id", "store_name", "store_type",
        "city", "region", "open_date", "store_age_days",
        "is_online", "dw_created_at"
    )

dim_store.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true").save(f"{GOLD}/dim_store")

print(f"   ‚úÖ dim_store written ‚Äî {dim_store.count():,} rows")
dim_store.show(truncate=False)

In [None]:
# ============================================================
# CELL 5 ‚Äî dim_customer with SCD Type-2
# ============================================================
print("\nüë§ Building dim_customer (SCD Type-2)...")

# SCD Type-2 tracks historical changes in customer attributes
# Each row has: effective_start_date, effective_end_date, is_current flag

silver_customers = spark.read.format("delta").load(f"{SILVER}/customers")

# ‚îÄ‚îÄ Initial load (first time): create SCD2 structure ‚îÄ‚îÄ
def initial_scd2_load(df):
    return df \
        .withColumn("customer_key",
            sha2(concat_ws("|", col("customer_id"), col("_silver_updated_at")), 256).substr(1, 16)
        ) \
        .withColumn("effective_start_date", current_date()) \
        .withColumn("effective_end_date",   to_date(lit(SCD2_ACTIVE_END_DATE), "yyyy-MM-dd")) \
        .withColumn("is_current",           lit(True)) \
        .withColumn("dw_created_at",        current_timestamp()) \
        .withColumn("dw_updated_at",        current_timestamp()) \
        .select(
            "customer_key", "customer_id", "full_name", "first_name", "last_name",
            "email", "email_domain", "city", "segment",
            "signup_date", "is_active", "customer_tenure_days",
            "effective_start_date", "effective_end_date", "is_current",
            "dw_created_at", "dw_updated_at"
        )

dim_customer = initial_scd2_load(silver_customers)

dim_customer.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true").save(f"{GOLD}/dim_customer")

print(f"   ‚úÖ dim_customer initial load ‚Äî {dim_customer.count():,} rows")
dim_customer.show(5)

In [None]:
# ============================================================
# CELL 6 ‚Äî SCD Type-2 MERGE (for incremental updates)
# ============================================================
# Run this cell whenever customers_updated.csv is ingested
print("\nüîÑ Running SCD Type-2 MERGE for customer updates...")

# Read the updated customer records (after Bronze+Silver processing)
# In production: this comes from your Silver customers table after new batch
from pyspark.sql.functions import broadcast

# Simulating: read updated customers from Silver
# In real run: replace this path with actual updated silver data path
updated_customers = silver_customers  # placeholder - in prod this is the new batch

# Load existing Gold dim_customer
dim_customer_delta = DeltaTable.forPath(spark, f"{GOLD}/dim_customer")

# SCD Type-2 logic using Delta MERGE:
# 1. For changed records: expire old row (set is_current=False, update end_date)
# 2. Insert new rows for changed records with is_current=True

# Step 1: Find changed records (compare SCD columns: city, segment)
SCD2_COLUMNS = ["city", "segment", "is_active"]

changed = updated_customers.alias("new").join(
    dim_customer_delta.toDF().filter(col("is_current") == True).alias("existing"),
    on="customer_id",
    how="inner"
).filter(
    # Detect any change in tracked columns
    " OR ".join([f"new.{c} != existing.{c}" for c in SCD2_COLUMNS])
)

changed_ids = [r["customer_id"] for r in changed.select("customer_id").collect()]
print(f"   Changed customers detected: {len(changed_ids)}")

# Step 2: Expire old rows via MERGE
dim_customer_delta.alias("target").merge(
    updated_customers.filter(col("customer_id").isin(changed_ids)).alias("source"),
    condition="target.customer_id = source.customer_id AND target.is_current = true"
).whenMatchedUpdate(set={
    "is_current":         lit(False),
    "effective_end_date": current_date(),
    "dw_updated_at":      current_timestamp()
}).execute()

# Step 3: Insert new rows for changed customers
new_rows = initial_scd2_load(
    updated_customers.filter(col("customer_id").isin(changed_ids))
)
new_rows.write.format("delta").mode("append").save(f"{GOLD}/dim_customer")

print(f"   ‚úÖ SCD Type-2 merge complete ‚Äî {len(changed_ids)} records updated")

# Verify ‚Äî show a changed customer with history
if changed_ids:
    sample_id = changed_ids[0]
    print(f"\n   Sample SCD2 history for customer_id = {sample_id}:")
    spark.read.format("delta").load(f"{GOLD}/dim_customer") \
        .filter(col("customer_id") == sample_id) \
        .select("customer_id","city","segment","effective_start_date","effective_end_date","is_current") \
        .show(truncate=False)

In [None]:
# ============================================================
# CELL 7 ‚Äî fact_sales (Core Fact Table)
# ============================================================
print("\nüí∞ Building fact_sales...")

silver_txn      = spark.read.format("delta").load(f"{SILVER}/transactions")
dim_customer_df = spark.read.format("delta").load(f"{GOLD}/dim_customer").filter(col("is_current") == True)
dim_product_df  = spark.read.format("delta").load(f"{GOLD}/dim_product")
dim_store_df    = spark.read.format("delta").load(f"{GOLD}/dim_store")
dim_date_df     = spark.read.format("delta").load(f"{GOLD}/dim_date")

fact_sales = silver_txn \
    .join(
        dim_customer_df.select("customer_key", "customer_id"),
        on="customer_id", how="left"
    ) \
    .join(
        dim_product_df.select("product_key", "product_id"),
        on="product_id", how="left"
    ) \
    .join(
        dim_store_df.select("store_key", "store_id"),
        on="store_id", how="left"
    ) \
    .join(
        dim_date_df.select("full_date", "date_key"),
        silver_txn["transaction_date"] == dim_date_df["full_date"],
        how="left"
    ) \
    .withColumn("sales_key",
        sha2(concat_ws("|", col("transaction_id"), col("transaction_date")), 256).substr(1,16)
    ) \
    .withColumn("dw_created_at", current_timestamp()) \
    .select(
        # Keys
        "sales_key", "transaction_id",
        "customer_key", "product_key", "store_key", "date_key",
        # Dimensions (degenerate)
        "transaction_date", "txn_year", "txn_month", "txn_quarter",
        "payment_method", "source_system", "status", "is_returned",
        # Measures
        "quantity",
        "unit_price",
        "discount_pct",
        "discount_amount",
        "gross_revenue",
        "total_amount",
        # Metadata
        "dw_created_at"
    )

fact_sales.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true").save(f"{GOLD}/fact_sales")

print(f"   ‚úÖ fact_sales written ‚Äî {fact_sales.count():,} rows")
fact_sales.show(5)

In [None]:
# ============================================================
# CELL 8 ‚Äî Gold Layer Validation & Business Metrics
# ============================================================
from pyspark.sql.functions import spark_sum, avg, countDistinct

fact = spark.read.format("delta").load(f"{GOLD}/fact_sales")

print("\nüìä Gold Layer Summary")
print("=" * 55)

# Table counts
for table in ["fact_sales", "dim_customer", "dim_product", "dim_store", "dim_date"]:
    df = spark.read.format("delta").load(f"{GOLD}/{table}")
    print(f"  ü•á {table:<22} {df.count():>10,} rows")

print("\nüí∞ Key Business Metrics (from fact_sales):")
metrics = fact.filter(col("is_returned") == False).agg(
    spark_sum("total_amount").alias("total_revenue"),
    avg("total_amount").alias("avg_order_value"),
    spark_sum("quantity").alias("total_units_sold"),
    countDistinct("customer_key").alias("unique_customers"),
    countDistinct("transaction_id").alias("total_transactions"),
    spark_sum("discount_amount").alias("total_discounts")
)
metrics.show(truncate=False)

print("\nüìà Revenue by Quarter:")
fact.filter(col("is_returned") == False) \
    .groupBy("txn_year", "txn_quarter") \
    .agg(spark_sum("total_amount").alias("revenue")) \
    .orderBy("txn_year", "txn_quarter") \
    .show()