In [0]:
select * from datamodeling.silver.silver_table

order_id,order_date,customer_id,customer_name,customer_email,product_id,product_name,product_category,quantity,unit_price,payment_type,country,last_updated,customer_name_upper,processed_date
1004,2024-07-02,4,David Lee,david@abc.com,504,Samsung S23,Electronics,1,899.99,Credit Card,USA,2024-07-02,DAVID LEE,2025-12-10
1005,2024-07-02,1,Alice Johnson,alice@gmail.com,503,Nike Shoes,Footwear,2,129.99,Credit Card,USA,2024-07-02,ALICE JOHNSON,2025-12-10
1001,2024-07-01,1,Alice Johnson,alice@gmail.com,501,iPhone 14,Electronics,1,999.99,Credit Card,USA,2024-07-01,ALICE JOHNSON,2025-12-10
1002,2024-07-01,2,Bob Smith,bob@yahoo.com,502,AirPods Pro,Electronics,2,199.99,PayPal,USA,2024-07-01,BOB SMITH,2025-12-10
1003,2024-07-01,3,Charlie Brown,charlie@outlook.com,503,Nike Shoes,Footwear,1,129.99,Credit Card,Canada,2024-07-01,CHARLIE BROWN,2025-12-10
1006,2024-07-03,4,David Bakhem,david@abc.com,504,Samsung S23,Electronics,1,899.99,Credit Card,France,2024-07-02,DAVID BAKHEM,2025-12-10


In [0]:
%sql
MERGE INTO datamodeling.gold.DimCustomers AS target
USING (
    SELECT DISTINCT customer_id, customer_email, customer_name, Customer_Name_Upper 
    FROM datamodeling.silver.silver_table
) AS source
ON target.customer_id = source.customer_id

-- 1. Check for UPDATES first (Existing Customers)
WHEN MATCHED AND (
       target.customer_email <> source.customer_email OR
       target.customer_name  <> source.customer_name
     ) THEN
    UPDATE SET 
       target.customer_email = source.customer_email,
       target.customer_name  = source.customer_name,
       target.Customer_Name_Upper = source.customer_name_upper

-- 2. Check for INSERTS second (New Customers)
WHEN NOT MATCHED THEN
    INSERT (customer_id, customer_email, customer_name, Customer_Name_Upper)
    VALUES (source.customer_id, source.customer_email, source.customer_name, source.customer_name_upper);

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,1,0,0


In [0]:

MERGE INTO datamodeling.gold.DimProducts AS target
USING (
    SELECT DISTINCT product_id, product_name, product_category 
    FROM datamodeling.silver.silver_table
) AS source
ON target.product_id = source.product_id

WHEN MATCHED AND (
       target.product_name <> source.product_name OR
       target.product_category <> source.product_category
     ) THEN
    UPDATE SET 
       target.product_name = source.product_name,
       target.product_category = source.product_category


WHEN NOT MATCHED THEN
    INSERT (product_id, product_name, product_category)
    VALUES (source.product_id, source.product_name, source.product_category);






num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:

MERGE INTO datamodeling.gold.DimPayments AS target
USING (SELECT DISTINCT payment_type FROM datamodeling.silver.silver_table) AS source
ON target.payment_type = source.payment_type
WHEN NOT MATCHED THEN
    INSERT (payment_type) VALUES (source.payment_type);

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
MERGE INTO datamodeling.gold.DimRegions AS target
USING (SELECT DISTINCT country FROM datamodeling.silver.silver_table) AS source
ON target.country = source.country
WHEN NOT MATCHED THEN
    INSERT (country) VALUES (source.country);



num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,0,0,1


In [0]:
%python

if spark.catalog.tableExists('datamodeling.gold.FactSales'):
    max_order_id = spark.sql("select max(order_id) from datamodeling.gold.FactSales").collect()[0][0]
    if max_order_id is None: max_order_id = 0
else:
    max_order_id = 0

spark.sql(f"""
    SELECT * FROM datamodeling.silver.silver_table 
    WHERE order_id > {max_order_id}
""").createOrReplaceTempView("new_sales_data")

print(f"Processing sales with Order ID greater than: {max_order_id}")

Processing sales with Order ID greater than: 1005


In [0]:
%sql
MERGE INTO datamodeling.gold.FactSales AS target
USING (
    SELECT 
        C.DimCustomerKey,
        P.DimProductKey,
        R.DimRegionKey,
        PY.DimPaymentKey,
        S.quantity,
        S.unit_price,
        S.order_id,
        current_timestamp() as loaded_date
    FROM 
        new_sales_data S
    -- Join to Dimensions to get the STABLE Surrogate Keys
    LEFT JOIN datamodeling.gold.DimCustomers C ON S.customer_id = C.customer_id
    LEFT JOIN datamodeling.gold.DimProducts P  ON S.product_id = P.product_id
    LEFT JOIN datamodeling.gold.DimRegions R   ON S.country = R.country
    LEFT JOIN datamodeling.gold.DimPayments PY ON S.payment_type = PY.payment_type
) AS source
ON target.order_id = source.order_id


WHEN NOT MATCHED THEN
    INSERT (DimCustomerKey, DimProductKey, DimRegionKey, DimPaymentKey, quantity, unit_price, order_id, loaded_date)
    VALUES (source.DimCustomerKey, source.DimProductKey, source.DimRegionKey, source.DimPaymentKey, source.quantity, source.unit_price, source.order_id, source.loaded_date);



num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,0,0,1


In [0]:

SELECT * FROM datamodeling.gold.FactSales;

FactSalesKey,DimCustomerKey,DimProductKey,DimRegionKey,DimPaymentKey,quantity,unit_price,order_id,loaded_date
1,2,1,2,1,1,999.99,1001,2025-12-10T12:26:04.960Z
2,3,3,2,2,2,199.99,1002,2025-12-10T12:26:04.960Z
3,1,2,1,1,1,129.99,1003,2025-12-10T12:26:04.960Z
4,4,4,2,1,1,899.99,1004,2025-12-10T12:34:30.184Z
5,2,2,2,1,2,129.99,1005,2025-12-10T12:34:30.184Z
6,4,4,3,1,1,899.99,1006,2025-12-10T12:39:09.659Z


In [0]:
SELECT * from datamodeling.gold.dimcustomers

DimCustomerKey,customer_id,customer_email,customer_name,Customer_Name_Upper
4,4,david@abc.com,David Bakhem,DAVID BAKHEM
1,3,charlie@outlook.com,Charlie Brown,CHARLIE BROWN
2,1,alice@gmail.com,Alice Johnson,ALICE JOHNSON
3,2,bob@yahoo.com,Bob Smith,BOB SMITH
