In [0]:
%sql
CREATE TABLE pyspark_cata.source.customers
(
  id STRING,
  email STRING,
  city STRING,
  country STRING,
  modifiedDate TIMESTAMP
)

In [0]:
%sql
INSERT INTO pyspark_cata.source.customers
VALUES
('1','john.smith@example.com','New York','USA',current_timestamp()),
('2','jane.doe@example.com','London','UK',current_timestamp()),
('3','bob.smith@example.com','Paris','France',current_timestamp()),
('4','mary.jones@example.com','Tokyo','Japan',current_timestamp()),
('5','peter.smith@example.com','Sydney','Australia',current_timestamp())

num_affected_rows,num_inserted_rows
5,5


In [0]:
%sql
SELECT * FROM pyspark_cata.source.customers

id,email,city,country,modifiedDate
1,john.smith@example.com,New York,USA,2025-08-24T14:13:34.403Z
2,jane.doe@example.com,London,UK,2025-08-24T14:13:34.403Z
3,bob.smith@example.com,Paris,France,2025-08-24T14:13:34.403Z
4,mary.jones@example.com,Tokyo,Japan,2025-08-24T14:13:34.403Z
5,peter.smith@example.com,Sydney,Australia,2025-08-24T14:13:34.403Z


In [0]:
if spark.catalog.tableExists("pyspark_cata.source.DimCustomers"):
    pass
else:
    spark.sql("""
              CREATE TABLE pyspark_cata.source.DimCustomers
              SELECT *,
              current_timestamp() AS startTime,
              CAST('3000-01-01' AS TIMESTAMP) as endTime,
              'Y' as isActive
              FROM pyspark_cata.source.customers
              """)

In [0]:
%sql
SELECT * FROM pyspark_cata.source.dimcustomers

id,email,city,country,modifiedDate,startTime,endTime,isActive
1,john.smith@example.com,New York,USA,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
2,jane.doe@example.com,London,UK,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
3,bob.smith@example.com,Paris,France,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
4,mary.jones@example.com,Tokyo,Japan,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
5,peter.smith@example.com,Sydney,Australia,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y


## **SCD Type-2**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from delta.tables import DeltaTable

In [0]:
df = spark.sql(
    """
    SELECT * FROM pyspark_cata.source.customers
    """
)

df = df.withColumn("dedup", row_number().over(Window.partitionBy('id').orderBy(desc('modifiedDate'))))
df = df.filter(df.dedup == 1).drop('dedup')

df.createOrReplaceTempView('srctemp')

df = spark.sql("""
              SELECT *,
              current_timestamp() AS startTime,
              CAST('3000-01-01' AS TIMESTAMP) as endTime,
              'Y' as isActive
              FROM srctemp
              """)

df.createOrReplaceTempView('src')

In [0]:
%sql
SELECT * FROM src

id,email,city,country,modifiedDate,startTime,endTime,isActive
1,john.smith@example.com,Seattle,USA,2025-08-24T16:11:02.145Z,2025-08-24T16:14:53.820Z,3000-01-01T00:00:00.000Z,Y
2,jane.doe@example.com,London,UK,2025-08-24T14:13:34.403Z,2025-08-24T16:14:53.820Z,3000-01-01T00:00:00.000Z,Y
3,bob.smith@example.com,Paris,France,2025-08-24T14:13:34.403Z,2025-08-24T16:14:53.820Z,3000-01-01T00:00:00.000Z,Y
4,mary.jones@example.com,Tokyo,Japan,2025-08-24T14:13:34.403Z,2025-08-24T16:14:53.820Z,3000-01-01T00:00:00.000Z,Y
5,peter.smith@example.com,Sydney,Australia,2025-08-24T14:13:34.403Z,2025-08-24T16:14:53.820Z,3000-01-01T00:00:00.000Z,Y
6,jane.doe@example.com,London,UK,2025-08-24T16:11:02.145Z,2025-08-24T16:14:53.820Z,3000-01-01T00:00:00.000Z,Y


### **Merge 1 - Marking the updated records as expired**

In [0]:
%sql
MERGE INTO pyspark_cata.source.DimCustomers as trg
USING src as src
ON trg.id = src.id
AND trg.isActive = 'Y'

WHEN MATCHED AND src.email <> trg.email
OR src.city <> trg.city
OR src.country <> trg.country
OR src.modifiedDate <> trg.modifiedDate

THEN UPDATE SET
trg.endTime = current_timestamp(),
trg.isActive = 'N'

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,1,0,0


### **Merge 2 - Inserting new + updated records**

In [0]:
%sql
MERGE INTO pyspark_cata.source.DimCustomers as trg
USING src as src
ON src.id = trg.id
AND trg.isActive = 'Y'

WHEN NOT MATCHED THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,0,0,2


In [0]:
%sql
SELECT * FROM  pyspark_cata.source.DimCustomers

id,email,city,country,modifiedDate,startTime,endTime,isActive
2,jane.doe@example.com,London,UK,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
3,bob.smith@example.com,Paris,France,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
4,mary.jones@example.com,Tokyo,Japan,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
5,peter.smith@example.com,Sydney,Australia,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,3000-01-01T00:00:00.000Z,Y
1,john.smith@example.com,New York,USA,2025-08-24T14:13:34.403Z,2025-08-24T14:14:30.548Z,2025-08-24T16:15:15.559Z,N
1,john.smith@example.com,Seattle,USA,2025-08-24T16:11:02.145Z,2025-08-24T16:15:35.951Z,3000-01-01T00:00:00.000Z,Y
6,jane.doe@example.com,London,UK,2025-08-24T16:11:02.145Z,2025-08-24T16:15:35.951Z,3000-01-01T00:00:00.000Z,Y


In [0]:
%sql
INSERT INTO pyspark_cata.source.customers
VALUES
('1','john.smith@example.com','Seattle','USA',current_timestamp()),
('6','jane.doe@example.com','London','UK',current_timestamp())

num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql
SELECT * FROM pyspark_cata.source.customers

id,email,city,country,modifiedDate
1,john.smith@example.com,New York,USA,2025-08-24T14:13:34.403Z
2,jane.doe@example.com,London,UK,2025-08-24T14:13:34.403Z
3,bob.smith@example.com,Paris,France,2025-08-24T14:13:34.403Z
4,mary.jones@example.com,Tokyo,Japan,2025-08-24T14:13:34.403Z
5,peter.smith@example.com,Sydney,Australia,2025-08-24T14:13:34.403Z
1,john.smith@example.com,Seattle,USA,2025-08-24T16:11:02.145Z
6,jane.doe@example.com,London,UK,2025-08-24T16:11:02.145Z
