## Data Modelling into Facts and Dims Type 2

### Dim Product SCD2

In [0]:
spark.sql("""
WITH CTE AS (
SELECT 
*,
row_number() over(partition by product_id order by order_date desc) as rn,
row_number() over(order by processed_at, product_id) as dimproductkey
FROM test.silver.silver_table)
SELECT 
  dimproductkey, product_id, product_name, product_category, unit_price, processed_at,
  current_timestamp start_date,
  cast('3000-12-31' as timestamp) as end_date,
  'Y' as is_current 
FROM CTE
WHERE rn = 1 -- Only Latest Product with latest Prices will be shown
""").createOrReplaceTempView('prod_src')

In [0]:
%sql
CREATE TABLE IF NOT EXISTS test.gold.dimproduct 
AS
SELECT * FROM prod_src

num_affected_rows,num_inserted_rows


In [0]:
%sql
MERGE INTO test.gold.dimproduct as trg
USING prod_src as src
ON trg.product_id = src.product_id
AND trg.is_current = 'Y'
WHEN MATCHED AND (
  trg.product_name <> src.product_name OR 
  trg.product_category <> src.product_category OR 
  trg.unit_price <> src.unit_price
)
THEN UPDATE SET
  trg.end_date = current_timestamp,
  trg.is_current = 'N'

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,1,0,0


In [0]:
%sql
MERGE INTO test.gold.dimproduct as trg
USING prod_src as src
ON trg.product_id = src.product_id
AND trg.is_current = 'Y'
WHEN NOT MATCHED THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,0,0,2


In [0]:
%sql
SELECT * FROM test.gold.dimproduct

dimproductkey,product_id,product_name,product_category,unit_price,processed_at,start_date,end_date,is_current
3,502,Airpods Pro,Electronics,149.99,2025-10-31T18:07:58.285Z,2025-10-31T18:10:46.983Z,3000-12-31T00:00:00.000Z,Y
5,503,Nike Shoes,FootWear,99.99,2025-10-31T18:07:58.285Z,2025-10-31T18:10:46.983Z,3000-12-31T00:00:00.000Z,Y
1,501,Iphone 14,Electronics,999.99,2025-10-31T18:07:58.285Z,2025-10-31T18:10:46.983Z,2025-10-31T18:25:34.938Z,N
6,501,Iphone 14,Electronics,899.99,2025-10-31T18:12:44.888Z,2025-10-31T18:25:48.795Z,3000-12-31T00:00:00.000Z,Y
7,504,Iphone 15,Electronics,1199.99,2025-10-31T18:12:44.888Z,2025-10-31T18:25:48.795Z,3000-12-31T00:00:00.000Z,Y


### Dim Customer SCD2

In [0]:
spark.sql("""
WITH CTE AS (
SELECT 
  *,
  row_number() OVER (ORDER BY processed_at, customer_id) as dimcustomerkey,
  row_number() over(partition by customer_id order by order_date desc) as rn
FROM test.silver.silver_table)
SELECT 
  dimcustomerkey, customer_id, customer_name as full_name, f_name, l_name, customer_email, processed_at,
  current_timestamp as start_date,
  CAST('3000-12-31' AS timestamp) AS end_date,
  'Y' AS is_current
FROM CTE 
WHERE rn = 1
""").createOrReplaceTempView('Cust_src')

In [0]:
%sql
CREATE TABLE IF NOT EXISTS test.gold.dimcustomer
AS
SELECT 
*
FROM Cust_src

num_affected_rows,num_inserted_rows


In [0]:
%sql
MERGE INTO test.gold.dimcustomer as trg
USING Cust_src as src
ON trg.customer_id = src.customer_id
AND trg.is_current = 'Y'
WHEN MATCHED AND (
  trg.full_name <> src.full_name OR 
  trg.customer_email <> src.customer_email OR 
  trg.f_name <> src.f_name OR 
  trg.l_name <> src.l_name
)
THEN UPDATE SET
  trg.end_date = current_timestamp,
  trg.is_current = 'N'

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
%sql
MERGE INTO test.gold.dimcustomer as trg
USING Cust_src as src
ON trg.customer_id = src.customer_id
AND trg.is_current = 'Y'
WHEN NOT MATCHED THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


### DimCountry SCD-1

In [0]:
spark.sql("""
WITH CTE AS(
SELECT 
  DISTINCT country 
FROM test.silver.silver_table)
SELECT 
  row_number() over(order by country) as dimcountrykey, *
FROM CTE
""").createOrReplaceTempView('country_src')

In [0]:
%sql
CREATE TABLE IF NOT EXISTS test.gold.dimcountry 
AS
SELECT * FROM country_src

num_affected_rows,num_inserted_rows


In [0]:
%sql
MERGE INTO test.gold.dimcountry as trg
USING country_src as src
ON trg.dimcountrykey = src.dimcountrykey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,2,0,0


### DimPayment SCD-2

In [0]:
spark.sql("""
WITH CTE AS (
SELECT
  DISTINCT payment_type
FROM test.silver.silver_table)
SELECT
row_number() over(order by payment_type) as dimpaymodekey, *
FROM CTE
""").createOrReplaceTempView('paymode_src')

In [0]:
%sql
CREATE TABLE IF NOT EXISTS test.gold.dimpaymode 
AS
SELECT * FROM paymode_src

num_affected_rows,num_inserted_rows


In [0]:
%sql
MERGE INTO test.gold.dimpaymode as trg
USING paymode_src as src
ON trg.dimpaymodekey = src.dimpaymodekey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,3,0,0


### Fact Orders

In [0]:
spark.sql("""
SELECT
  F.order_date,
  F.order_id,
  P.dimproductkey,
  F.unit_price,
  F.quantity,
  F.amount
FROM test.silver.silver_table as F 
LEFT JOIN test.gold.dimproduct as P 
ON F.product_id = P.product_id
""").createOrReplaceTempView('fact_src')

In [0]:
%sql
Create table if not exists test.gold.factsales 
as
select * from fact_src

num_affected_rows,num_inserted_rows


In [0]:
%sql
MERGE INTO test.gold.factsales as trg
USING fact_src as src
ON trg.order_id = src.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT * 

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,3,0,0
