In [0]:
%sql
create table pyspark_cata.source.customers
(
  id string,
  email string,
  city string,
  country string,
  modifiedDate timestamp

)

In [0]:
%sql
insert into pyspark_cata.source.customers
values
('1','<msms@gmail.com>','New York','USA', current_timestamp()),
('2', '<john.smith@gmail.com>','London','UK', current_timestamp()),
('3', '<mary.smith@gmail.com>','Paris', 'France', current_timestamp()),
('4', 'sara.jones@gmail.com','Berlin','Germany', current_timestamp()),
('5','<jane.doe@gmail.com>','Tokyo','Japan', current_timestamp())

num_affected_rows,num_inserted_rows
5,5


In [0]:
%sql
select * from pyspark_cata.source.customers

id,email,city,country,modifiedDate
1,,New York,USA,2025-11-26T00:54:39.829Z
2,,London,UK,2025-11-26T00:54:39.829Z
3,,Paris,France,2025-11-26T00:54:39.829Z
4,sara.jones@gmail.com,Berlin,Germany,2025-11-26T00:54:39.829Z
5,,Tokyo,Japan,2025-11-26T00:54:39.829Z


In [0]:
if spark.catalog.tableExists("pyspark_cata.source.DimCustomers"):

    pass
else:
    spark.sql("""
              create table pyspark_cata.source.DimCustomers
              select *,
                        current_timestamp() as startTime,
                        cast('3000-01-01' as timestamp) as endTime,
                        'Y' as isActive
              from pyspark_cata.source.customers
              """)
              


In [0]:
%sql
select * from pyspark_cata.source.DimCustomers

id,email,city,country,modifiedDate,startTime,endTime,isActive
1,,New York,USA,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
2,,London,UK,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
3,,Paris,France,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
4,sara.jones@gmail.com,Berlin,Germany,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
5,,Tokyo,Japan,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y


# scd type-2

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql.functions import row_number, desc

In [0]:
df = spark.sql("""
               select * from pyspark_cata.source.customers
               """)

df = df.withColumn("dedupe", row_number().over(Window.partitionBy("id").orderBy(desc('modifiedDate')))).drop('dedupe')

df = df.filter(col('dedupe')==1)
               
df.createOrReplaceTempView('srctemp')
df = spark.sql("""
            
              select *,
                        current_timestamp() as startTime,
                        cast('3000-01-01' as timestamp) as endTime,
                        'Y' as isActive
              from srctemp
              """)

df.createOrReplaceTempView('src')

In [0]:
%sql
select * from src

id,email,city,country,modifiedDate,startTime,endTime,isActive
1,,New York,USA,2025-11-26T00:54:39.829Z,2025-11-26T01:36:59.896Z,3000-01-01T00:00:00.000Z,Y
2,,London,UK,2025-11-26T00:54:39.829Z,2025-11-26T01:36:59.896Z,3000-01-01T00:00:00.000Z,Y
3,,Paris,France,2025-11-26T00:54:39.829Z,2025-11-26T01:36:59.896Z,3000-01-01T00:00:00.000Z,Y
4,sara.jones@gmail.com,Berlin,Germany,2025-11-26T00:54:39.829Z,2025-11-26T01:36:59.896Z,3000-01-01T00:00:00.000Z,Y
5,,Tokyo,Japan,2025-11-26T00:54:39.829Z,2025-11-26T01:36:59.896Z,3000-01-01T00:00:00.000Z,Y


### **Merge-1 Marking updated records as expired **

In [0]:
%sql
Merge into pyspark_cata.source.DimCustomers as trg
using src as src 
On trg.id = src.id
and trg.isActive = 'Y'
when matched and src.email<> trg.email
or src.city <> trg.city
or src.country <> trg.country
or src.modifiedDate <> trg.modifiedDate
then update set 
trg.endTime = current_timestamp(),
trg.isActive = 'N'

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


### Merge-2 Inserting new + updated records

In [0]:
%sql

merge into pyspark_cata.source.DimCustomers as trg
using src as src 
On trg.id = src.id
and trg.isActive = 'Y'
when not matched then insert *


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
%sql
select * from pyspark_cata.source.dimcustomers

id,email,city,country,modifiedDate,startTime,endTime,isActive
1,,New York,USA,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
2,,London,UK,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
3,,Paris,France,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
4,sara.jones@gmail.com,Berlin,Germany,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
5,,Tokyo,Japan,2025-11-26T00:54:39.829Z,2025-11-26T01:00:47.448Z,3000-01-01T00:00:00.000Z,Y
