## **CUSTOMER Dimension : SCD type 2**

In [0]:
from pyspark.sql.functions import to_date, col

In [0]:
%sql
CREATE TABLE IF NOT EXISTS retail_demo.gold.dim_customer
(
    customer_sk      BIGINT GENERATED ALWAYS AS IDENTITY,        -- surrogate key
    customer_id      INT,
    first_name       STRING,
    last_name        STRING,
    country          STRING,
    marital_status   STRING,
    loyalty_tier     STRING,
    start_ts         DATE,
    end_ts           DATE,
    is_current       BOOLEAN
)
USING DELTA
PARTITIONED BY (is_current);

**Merge the latest snapshot**

In [0]:
spark.sql(r"""
MERGE INTO retail_demo.gold.dim_customer AS t
USING
  (SELECT customer_id, first_name, last_name, country, marital_status, loyalty_tier, snapshot_ts
   FROM retail_demo.silver.customers_snapshot) AS s 
ON t.customer_id = s.customer_id
WHEN MATCHED AND t.is_current = true
             AND ( t.country <> s.country OR t.marital_status <> s.marital_status OR t.loyalty_tier <> s.loyalty_tier )
  THEN UPDATE SET
       is_current = false,
       end_ts = s.snapshot_ts
WHEN NOT MATCHED
  THEN INSERT (customer_id, first_name, last_name,
               country, marital_status, loyalty_tier,
               start_ts,  end_ts, is_current)
       VALUES (s.customer_id, s.first_name, s.last_name,
               s.country, s.marital_status, s.loyalty_tier,
               s.snapshot_ts, DATE '9999-12-31', true)
""")

## **PRODUCT Dimension**

In [0]:
%sql
CREATE OR REPLACE TABLE retail_demo.gold.dim_product
USING DELTA AS
SELECT
    product_id AS natural_key,
    monotonically_increasing_id() AS product_sk,
    lower(product_name) AS product_name,
    size,
    unit_price
FROM retail_demo.silver.products;

## **FACT Table (Orders)**

In [0]:
from pyspark.sql.functions import to_date, col

# create a DataFrame with surrogate keys
fact_df = spark\
    .table("retail_demo.silver.orders")\
    .withColumn("order_date", to_date("order_ts"))\
    .alias("o")\
    .join(\
        spark.table("retail_demo.gold.dim_customer").alias("c"),\
        (col("o.customer_id") == col("c.customer_id"))\
        & (col("o.order_date") >= col("c.start_ts"))\
        & (col("o.order_date") <  col("c.end_ts")),\
        "left"\
    )\
    .join(\
        spark.table("retail_demo.gold.dim_product").alias("p"),\
        col("o.product_id") == col("p.natural_key"),\
        "left"\
    )\
    .select(\
        col("o.order_id"),\
        col("o.order_ts"),\
        col("o.order_date"),\
        col("c.customer_sk"),\
        col("p.product_sk"),\
        col("o.quantity"),\
        col("o.amount")\
    )

fact_df.write\
      .format("delta")\
      .mode("overwrite")\
      .partitionBy("order_date")\
      .saveAsTable("retail_demo.gold.fact_orders")
