# SCD t2
### 1. Create new Schema in Catalog

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.source_schema2")  # Create source schema if it doesn't exist
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.target_schema2")  # Create target schema if it doesn't exist

DataFrame[]

### 2. Extract data from Delat shared

In [0]:
sales = spark.sql("select * from samples.bakehouse.sales_customers")
sales.write.mode("overwrite").saveAsTable("workspace.source_schema2.src_cust2")

### 3. Diplay Extracted Data

In [0]:
source = spark.read.table("workspace.source_schema2.src_cust2")
source.display()

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender
2000259,Kayla,Barrett,brittanyramos@example.org,349-683-9514x73065,717 Whitney Roads,Kathrynborough,Massachusetts,Japan,Asia,81587,female
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female
2000261,Steven,Tanner,haileysanchez@example.net,859-946-4140x24086,08560 Thomas Land,Williamshire,Missouri,Japan,Asia,20642,female
2000262,Jennifer,Forbes,belldonna@example.com,633-427-4977,5840 Warren Garden Suite 901,Delacruzville,Nevada,Australia,Oceania,21440,male
2000263,Kenneth,Berger,bdalton@example.net,(831)220-1833x906,693 Baker Dale,West Wendy,Colorado,Australia,Oceania,32756,female
2000264,James,Edwards,mary39@example.org,271-321-1561x9697,665 Campbell Streets Suite 966,Sherryton,Illinois,Japan,Asia,76717,male
2000265,Ann,Montgomery,michaelreese@example.org,7673757334,220 Barber Islands Apt. 664,East Todd,Iowa,Australia,Oceania,77976,female
2000266,Heather,Moore,mannmartin@example.com,278-606-3676x938,896 Greene Hill Suite 575,Josephburgh,Maryland,Australia,Oceania,2149,male
2000267,Joseph,Graham,suzannereeves@example.org,538.787.9102x4963,2563 Jessica Mountains Apt. 192,Kelseyview,Mississippi,Australia,Oceania,85290,female
2000268,Lisa,Ramirez,kristi04@example.com,(643)361-8721x12062,573 Bailey Lights Suite 941,West Deniseland,Connecticut,Japan,Asia,58198,female


### 4. Load Data From Source and concatenate all columns into 'ConCatValue'

In [0]:
from pyspark.sql import functions as F
source = source.withColumn('ConCatValue', F.concat_ws('', *source.columns))
display(source)

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,ConCatValue
2000259,Kayla,Barrett,brittanyramos@example.org,349-683-9514x73065,717 Whitney Roads,Kathrynborough,Massachusetts,Japan,Asia,81587,female,2000259KaylaBarrettbrittanyramos@example.org349-683-9514x73065717 Whitney RoadsKathrynboroughMassachusettsJapanAsia81587female
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540East CatherineRhode IslandJapanAsia6657female
2000261,Steven,Tanner,haileysanchez@example.net,859-946-4140x24086,08560 Thomas Land,Williamshire,Missouri,Japan,Asia,20642,female,2000261StevenTannerhaileysanchez@example.net859-946-4140x2408608560 Thomas LandWilliamshireMissouriJapanAsia20642female
2000262,Jennifer,Forbes,belldonna@example.com,633-427-4977,5840 Warren Garden Suite 901,Delacruzville,Nevada,Australia,Oceania,21440,male,2000262JenniferForbesbelldonna@example.com633-427-49775840 Warren Garden Suite 901DelacruzvilleNevadaAustraliaOceania21440male
2000263,Kenneth,Berger,bdalton@example.net,(831)220-1833x906,693 Baker Dale,West Wendy,Colorado,Australia,Oceania,32756,female,2000263KennethBergerbdalton@example.net(831)220-1833x906693 Baker DaleWest WendyColoradoAustraliaOceania32756female
2000264,James,Edwards,mary39@example.org,271-321-1561x9697,665 Campbell Streets Suite 966,Sherryton,Illinois,Japan,Asia,76717,male,2000264JamesEdwardsmary39@example.org271-321-1561x9697665 Campbell Streets Suite 966SherrytonIllinoisJapanAsia76717male
2000265,Ann,Montgomery,michaelreese@example.org,7673757334,220 Barber Islands Apt. 664,East Todd,Iowa,Australia,Oceania,77976,female,2000265AnnMontgomerymichaelreese@example.org7673757334220 Barber Islands Apt. 664East ToddIowaAustraliaOceania77976female
2000266,Heather,Moore,mannmartin@example.com,278-606-3676x938,896 Greene Hill Suite 575,Josephburgh,Maryland,Australia,Oceania,2149,male,2000266HeatherMooremannmartin@example.com278-606-3676x938896 Greene Hill Suite 575JosephburghMarylandAustraliaOceania2149male
2000267,Joseph,Graham,suzannereeves@example.org,538.787.9102x4963,2563 Jessica Mountains Apt. 192,Kelseyview,Mississippi,Australia,Oceania,85290,female,2000267JosephGrahamsuzannereeves@example.org538.787.9102x49632563 Jessica Mountains Apt. 192KelseyviewMississippiAustraliaOceania85290female
2000268,Lisa,Ramirez,kristi04@example.com,(643)361-8721x12062,573 Bailey Lights Suite 941,West Deniseland,Connecticut,Japan,Asia,58198,female,2000268LisaRamirezkristi04@example.com(643)361-8721x12062573 Bailey Lights Suite 941West DeniselandConnecticutJapanAsia58198female


### 5. Add IndCurrent, CreatedDate, and ModifiedDate columns

In [0]:
source = source.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

In [0]:
from pyspark.sql.window import Window

window_spec = Window.orderBy(F.monotonically_increasing_id())
source = source.withColumn("storage_id", F.row_number().over(window_spec))

first_cols = ["storage_id"]
other_cols = [col for col in source.columns if col not in first_cols]
source = source.select(first_cols + other_cols)

display(source)



storage_id,customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,ConCatValue,IndCurrent,CreatedDate,ModifiedDate
1,2000259,Kayla,Barrett,brittanyramos@example.org,349-683-9514x73065,717 Whitney Roads,Kathrynborough,Massachusetts,Japan,Asia,81587,female,2000259KaylaBarrettbrittanyramos@example.org349-683-9514x73065717 Whitney RoadsKathrynboroughMassachusettsJapanAsia81587female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
2,2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540East CatherineRhode IslandJapanAsia6657female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
3,2000261,Steven,Tanner,haileysanchez@example.net,859-946-4140x24086,08560 Thomas Land,Williamshire,Missouri,Japan,Asia,20642,female,2000261StevenTannerhaileysanchez@example.net859-946-4140x2408608560 Thomas LandWilliamshireMissouriJapanAsia20642female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
4,2000262,Jennifer,Forbes,belldonna@example.com,633-427-4977,5840 Warren Garden Suite 901,Delacruzville,Nevada,Australia,Oceania,21440,male,2000262JenniferForbesbelldonna@example.com633-427-49775840 Warren Garden Suite 901DelacruzvilleNevadaAustraliaOceania21440male,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
5,2000263,Kenneth,Berger,bdalton@example.net,(831)220-1833x906,693 Baker Dale,West Wendy,Colorado,Australia,Oceania,32756,female,2000263KennethBergerbdalton@example.net(831)220-1833x906693 Baker DaleWest WendyColoradoAustraliaOceania32756female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
6,2000264,James,Edwards,mary39@example.org,271-321-1561x9697,665 Campbell Streets Suite 966,Sherryton,Illinois,Japan,Asia,76717,male,2000264JamesEdwardsmary39@example.org271-321-1561x9697665 Campbell Streets Suite 966SherrytonIllinoisJapanAsia76717male,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
7,2000265,Ann,Montgomery,michaelreese@example.org,7673757334,220 Barber Islands Apt. 664,East Todd,Iowa,Australia,Oceania,77976,female,2000265AnnMontgomerymichaelreese@example.org7673757334220 Barber Islands Apt. 664East ToddIowaAustraliaOceania77976female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
8,2000266,Heather,Moore,mannmartin@example.com,278-606-3676x938,896 Greene Hill Suite 575,Josephburgh,Maryland,Australia,Oceania,2149,male,2000266HeatherMooremannmartin@example.com278-606-3676x938896 Greene Hill Suite 575JosephburghMarylandAustraliaOceania2149male,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
9,2000267,Joseph,Graham,suzannereeves@example.org,538.787.9102x4963,2563 Jessica Mountains Apt. 192,Kelseyview,Mississippi,Australia,Oceania,85290,female,2000267JosephGrahamsuzannereeves@example.org538.787.9102x49632563 Jessica Mountains Apt. 192KelseyviewMississippiAustraliaOceania85290female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z
10,2000268,Lisa,Ramirez,kristi04@example.com,(643)361-8721x12062,573 Bailey Lights Suite 941,West Deniseland,Connecticut,Japan,Asia,58198,female,2000268LisaRamirezkristi04@example.com(643)361-8721x12062573 Bailey Lights Suite 941West DeniselandConnecticutJapanAsia58198female,1,2025-07-06T19:22:56.252Z,2025-07-06T19:22:56.252Z


### 6. Generate SHA-256 hash of concatenated column values and drop 'ConCatValue'

In [0]:
source = source.withColumn("RowHash", F.sha2(F.col("ConCatValue"), 256)).drop('ConCatValue')
display(source)



storage_id,customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,IndCurrent,CreatedDate,ModifiedDate,RowHash
1,2000259,Kayla,Barrett,brittanyramos@example.org,349-683-9514x73065,717 Whitney Roads,Kathrynborough,Massachusetts,Japan,Asia,81587,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,bb17fb933e82c69c99b14ccb4ae37904bbd2dfa7c9a671c2ee0c810998bef31c
2,2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,5ceb84c4ce5078a7caf799a2e4e0b9d1e703517e80eb97f127f6c9bdc0da745b
3,2000261,Steven,Tanner,haileysanchez@example.net,859-946-4140x24086,08560 Thomas Land,Williamshire,Missouri,Japan,Asia,20642,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,6896a43a4d1d64ec3a4b2fba76e9d9dbb2b6cdfb6178b8089d1ec8e242d74d77
4,2000262,Jennifer,Forbes,belldonna@example.com,633-427-4977,5840 Warren Garden Suite 901,Delacruzville,Nevada,Australia,Oceania,21440,male,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,10d2231e7cccbd8538fb8ff31d51ed036a7c798de720e9f41d4adb043786b73f
5,2000263,Kenneth,Berger,bdalton@example.net,(831)220-1833x906,693 Baker Dale,West Wendy,Colorado,Australia,Oceania,32756,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,9b0014e4b504fabf767aed285098ae6ac82c868bb31b99a6110d543cf33b4665
6,2000264,James,Edwards,mary39@example.org,271-321-1561x9697,665 Campbell Streets Suite 966,Sherryton,Illinois,Japan,Asia,76717,male,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,4de64bc371ded10202e8b20b00954e49ca5d2c7ac709797cc0d436dec5849e1e
7,2000265,Ann,Montgomery,michaelreese@example.org,7673757334,220 Barber Islands Apt. 664,East Todd,Iowa,Australia,Oceania,77976,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,1719062d9af6cae82bac2194bc5b3da09bce2241c22fd5ec7db72cff5b640f0f
8,2000266,Heather,Moore,mannmartin@example.com,278-606-3676x938,896 Greene Hill Suite 575,Josephburgh,Maryland,Australia,Oceania,2149,male,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,21ff817d08805859c7506a187ed24d0d646a3e3175375ab7ab88b1c95dd1956a
9,2000267,Joseph,Graham,suzannereeves@example.org,538.787.9102x4963,2563 Jessica Mountains Apt. 192,Kelseyview,Mississippi,Australia,Oceania,85290,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,dd08a73ddf2a876cb42763d7d551f457f8fa820545592f4742f5a7297fae2f03
10,2000268,Lisa,Ramirez,kristi04@example.com,(643)361-8721x12062,573 Bailey Lights Suite 941,West Deniseland,Connecticut,Japan,Asia,58198,female,1,2025-07-06T19:23:11.016Z,2025-07-06T19:23:11.016Z,fc3f544dbeebdcbe23f0f8d5c53e90a919cccbb59241bdeac8aa922c003a08a2


### 7. writing to the target schema 

In [0]:
source.write.mode("append").saveAsTable("workspace.target_schema2.trg_cust2")



In [0]:
source.write.mode("append").saveAsTable("workspace.source_schema2.src_cust")

### 8. Start SCD type 2
### a. Load data in source and target table

In [0]:
SourceTable='workspace.source_schema2.src_cust2'
TargetTable='workspace.target_schema2.trg_cust2'

### b.Read source table into DataFrame

In [0]:
SourceDf=spark.read.table(SourceTable)  
TargetDf=spark.read.table(TargetTable)

### c. See data in source 

In [0]:
from pyspark.sql.functions import col
SourceDf.filter(col("customerID") == "2000260").display()

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female


### d.Change in source table

In [0]:
from pyspark.sql.functions import col, when


SourceDf = SourceDf.withColumn(
    "city",
    when(col("customerID") == "2000260", "Bhubaneswar").otherwise(col("city"))
)

SourceDf.filter(col("customerID") == "2000260").display()


customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,Bhubaneswar,Rhode Island,Japan,Asia,6657,female


### e. Now create the hash of Source table

In [0]:
from pyspark.sql import functions as F
SourceDf = SourceDf.withColumn('RowHash', F.concat_ws('', *SourceDf.columns))

### f. Add IndCurrent, CreateData, ModifiedData Column

In [0]:
SourceDf = SourceDf.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

### g. See the Change data in Source table

In [0]:
SourceDf.filter(col("customerID") == "2000260").display()

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,RowHash,IndCurrent,CreatedDate,ModifiedDate
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,Bhubaneswar,Rhode Island,Japan,Asia,6657,female,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540BhubaneswarRhode IslandJapanAsia6657female,1,2025-07-06T19:24:55.373Z,2025-07-06T19:24:55.373Z


### h. inspect the data in the target table for a specific customerID

In [0]:

display(spark.sql("select * from workspace.target_sch.target_cust_sales where customerID='2000260'"))

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,ConCatValue,IndCurrent,CreatedDate,ModifiedDate
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540East CatherineRhode IslandJapanAsia6657female,1,2025-07-06T16:11:03.275Z,2025-07-06T16:11:03.275Z


### SCD Type 2 start

In [0]:
SourceDf.filter(col("customerID") == "2000260").display()

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,RowHash,IndCurrent,CreatedDate,ModifiedDate
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,Bhubaneswar,Rhode Island,Japan,Asia,6657,female,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540BhubaneswarRhode IslandJapanAsia6657female,1,2025-07-06T19:25:18.826Z,2025-07-06T19:25:18.826Z


### join with Target Table and create Flag

In [0]:

TargetDf=spark.read.table(TargetTable).select(['customerID','RowHash']).withColumnRenamed('RowHash','TargetHash')
SourceDf=SourceDf.join(TargetDf, on =['customerID'], how='left').withColumn('Flag', F.when(col('TargetHash').isNull() | (col('TargetHash') != col('RowHash')), 'New').when(col('TargetHash') == col('RowHash'), 'NoChange').otherwise('Update'))




### Drop the TargetHash column and filter base on flag

In [0]:
SourceDf=SourceDf.filter(col("Flag") == "New")

In [0]:
SourceDf=SourceDf.drop('TargetHash')
display(SourceDf)

customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,RowHash,IndCurrent,CreatedDate,ModifiedDate,Flag
2000259,Kayla,Barrett,brittanyramos@example.org,349-683-9514x73065,717 Whitney Roads,Kathrynborough,Massachusetts,Japan,Asia,81587,female,2000259KaylaBarrettbrittanyramos@example.org349-683-9514x73065717 Whitney RoadsKathrynboroughMassachusettsJapanAsia81587female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,Bhubaneswar,Rhode Island,Japan,Asia,6657,female,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540BhubaneswarRhode IslandJapanAsia6657female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000261,Steven,Tanner,haileysanchez@example.net,859-946-4140x24086,08560 Thomas Land,Williamshire,Missouri,Japan,Asia,20642,female,2000261StevenTannerhaileysanchez@example.net859-946-4140x2408608560 Thomas LandWilliamshireMissouriJapanAsia20642female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000262,Jennifer,Forbes,belldonna@example.com,633-427-4977,5840 Warren Garden Suite 901,Delacruzville,Nevada,Australia,Oceania,21440,male,2000262JenniferForbesbelldonna@example.com633-427-49775840 Warren Garden Suite 901DelacruzvilleNevadaAustraliaOceania21440male,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000263,Kenneth,Berger,bdalton@example.net,(831)220-1833x906,693 Baker Dale,West Wendy,Colorado,Australia,Oceania,32756,female,2000263KennethBergerbdalton@example.net(831)220-1833x906693 Baker DaleWest WendyColoradoAustraliaOceania32756female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000264,James,Edwards,mary39@example.org,271-321-1561x9697,665 Campbell Streets Suite 966,Sherryton,Illinois,Japan,Asia,76717,male,2000264JamesEdwardsmary39@example.org271-321-1561x9697665 Campbell Streets Suite 966SherrytonIllinoisJapanAsia76717male,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000265,Ann,Montgomery,michaelreese@example.org,7673757334,220 Barber Islands Apt. 664,East Todd,Iowa,Australia,Oceania,77976,female,2000265AnnMontgomerymichaelreese@example.org7673757334220 Barber Islands Apt. 664East ToddIowaAustraliaOceania77976female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000266,Heather,Moore,mannmartin@example.com,278-606-3676x938,896 Greene Hill Suite 575,Josephburgh,Maryland,Australia,Oceania,2149,male,2000266HeatherMooremannmartin@example.com278-606-3676x938896 Greene Hill Suite 575JosephburghMarylandAustraliaOceania2149male,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000267,Joseph,Graham,suzannereeves@example.org,538.787.9102x4963,2563 Jessica Mountains Apt. 192,Kelseyview,Mississippi,Australia,Oceania,85290,female,2000267JosephGrahamsuzannereeves@example.org538.787.9102x49632563 Jessica Mountains Apt. 192KelseyviewMississippiAustraliaOceania85290female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New
2000268,Lisa,Ramirez,kristi04@example.com,(643)361-8721x12062,573 Bailey Lights Suite 941,West Deniseland,Connecticut,Japan,Asia,58198,female,2000268LisaRamirezkristi04@example.com(643)361-8721x12062573 Bailey Lights Suite 941West DeniselandConnecticutJapanAsia58198female,1,2025-07-06T19:25:48.475Z,2025-07-06T19:25:48.475Z,New


In [0]:
display(spark.sql("select * from workspace.target_schema2.trg_cust2 where customerID='2000260'"))

storage_id,customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,IndCurrent,CreatedDate,ModifiedDate,RowHash
2,2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,1,2025-07-06T19:23:24.696Z,2025-07-06T19:23:24.696Z,5ceb84c4ce5078a7caf799a2e4e0b9d1e703517e80eb97f127f6c9bdc0da745b


### Now do SCD type 2 operation

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit, col, udf
from pyspark.sql.types import StringType
import uuid

# Configuration
table_name = "workspace.target_schema2.trg_cust2"
key_column = "customerID"
hash_column = "RowHash"
is_current_column = "IndCurrent"
surrogate_key_column = "storage_id"
created_column = "CreatedDate"

# Reference Delta table
target_table = DeltaTable.forName(spark, table_name)

# Add new columns to source DataFrame
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
SourceDf = SourceDf \
    .withColumn(surrogate_key_column, uuid_udf()) \
    .withColumn(created_column, current_timestamp()) \
    .withColumn(is_current_column, lit(1))

# Use aliases properly
src = SourceDf.alias("src")
tgt = target_table.alias("tgt")

# Use column expressions (not strings) in merge condition
tgt.merge(
    source=src,
    condition=(
        (col(f"tgt.{key_column}") == col(f"src.{key_column}")) &
        (col(f"tgt.{is_current_column}") == lit(1))
    )
).whenMatchedUpdate(
    condition=col(f"tgt.{hash_column}") != col(f"src.{hash_column}"),
    set={
        is_current_column: lit(0)
    }
).whenNotMatchedInsertAll().execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

### Inspect the target table data before

In [0]:
display(spark.sql("select * from workspace.target_schema2.trg_cust2 where customerID='2000260'"))

storage_id,customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,IndCurrent,CreatedDate,ModifiedDate,RowHash
2,2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,0,2025-07-06T19:23:24.696Z,2025-07-06T19:23:24.696Z,5ceb84c4ce5078a7caf799a2e4e0b9d1e703517e80eb97f127f6c9bdc0da745b


In [0]:
SourceDf = SourceDf.drop('storage_id','Flag')
max_storage_id = spark.sql(f"select max(storage_id) as max_id from {table_name}").first()['max_id']
next_storage_id = 1 if not max_storage_id or max_storage_id == 0 else max_storage_id + 1

SourceDf = SourceDf.withColumn('storage_id', lit(next_storage_id))
SourceDf = SourceDf.withColumn('IndCurrent', lit(1))
SourceDf.write.format('delta').mode('append').saveAsTable(table_name)

### Inspect the target table data after final SCD

In [0]:
display(spark.sql("select * from workspace.target_schema2.trg_cust2 where customerID='2000260'"))

storage_id,customerID,first_name,last_name,email_address,phone_number,address,city,state,country,continent,postal_zip_code,gender,IndCurrent,CreatedDate,ModifiedDate,RowHash
301,2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,Bhubaneswar,Rhode Island,Japan,Asia,6657,female,1,2025-07-06T19:30:57.248Z,2025-07-06T19:30:57.248Z,2000260AmandaReedscollier@example.org+1-999-308-911069075 Logan Circles Apt. 540BhubaneswarRhode IslandJapanAsia6657female
2,2000260,Amanda,Reed,scollier@example.org,+1-999-308-9110,69075 Logan Circles Apt. 540,East Catherine,Rhode Island,Japan,Asia,6657,female,0,2025-07-06T19:23:24.696Z,2025-07-06T19:23:24.696Z,5ceb84c4ce5078a7caf799a2e4e0b9d1e703517e80eb97f127f6c9bdc0da745b
