In [0]:
%sql
create or replace table Scd2_Demo(
  PK1 INT,  
  PK2 STRING,
  DIM1 INT,
  DIM2 INT,
  DIM3 INT,
  DIM4 INT,
  ACTIVE_STATUS STRING,
  START_DATE TIMESTAMP,
  END_DATE TIMESTAMP
)
USING DELTA

**Inserting Value to Table**

In [0]:
%sql
insert into scd2_demo values(111,'Unit1',200,500,800,400,'Y',current_timestamp(),'9999-12-31');
insert into scd2_demo values(222,'Unit2',900, NULL, 700,100,'Y',current_timestamp(),'9999-12-31');
insert into scd2_demo values(333,'Unit3', 300, 250, 900, 650, 'Y',current_timestamp(),'9999-12-31')

**Display Target Table**

In [0]:
from delta import *
targetTable = DeltaTable.forName(spark, "scd2_Demo")
targetDF=targetTable.toDF()
display(targetDF)

**Building PySpark Schema**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([StructField("PK1", StringType(),True), 
                     StructField("PK2", StringType(),True), 
                     StructField("DIM1", IntegerType(),True), 
                     StructField("DIM2", IntegerType(),True), 
                     StructField("DIM3", IntegerType(),True), 
                     StructField("DIM4", IntegerType(),True)])

**Adding Data to Schema**

In [0]:
data = [(111,'Unit1', 200,500,800,400),
        (222,'Unit2', 800,1300,950,500),
        (444,'Unit4', 100, None, 700, 300)]
sourceDF = spark.createDataFrame(data = data, schema=schema)
display(sourceDF)
 

**Joining Source with Target file**

In [0]:
JoinDF = sourceDF.join(targetDF, (sourceDF.PK1==targetDF.PK1) &\
                             (sourceDF.PK2==targetDF.PK2) &\
                             (targetDF.ACTIVE_STATUS=='Y'), "leftouter")\
                                 select(sourceDF[*]),\
                                     targetDF.PK1.alias('targetPK1'),\
                                     targetDF.PK2.alias('targetPK2'),\
                                     targetDF.DIM1.alias('targetDIM1'),\
                                     targetDF.DIM2.alias('targetDIM2'),\
                                     targetDF.DIM3.alias('targetDIM3'),\
                                     targetDF.DIM4.alias('targetDIM4')
display(JoinDF)

**Filtering out the new and updated data**

In [0]:
FilterDF = JoinDF.filter(xxhash64(JoinDF.DIM1, JoinDF.DIM2, JoinDF.DIM3, JoinDF.DIM4) != xxhash64(JoinDF.targetDIM1, JoinDF.targetDIM2, JoinDF.targetDIM3, JoinDF.targetDIM4))

display(FilterDF)

In [0]:
mergeDF = FilterDF.withColumn("MergeKey", concat(FilterDF.PK1, FilterDF.PK2))
display(mergeDF)

In [0]:
dumyDF = FilterDF.filter("targetPK1 is not null").withColumn("MergeKey", concat(lit (None)))
display(dumyDF)

**Union of Merge and Dumy**

In [0]:
ScdDF= mergeDF.union(dumyDF)
display(ScdDF)

**Inserting Source Data into Target Data**

In [0]:
targetTable.alias("target").merge(
    source=ScdDF.alias("source"),
    condition="concat(target.PK1, target.PK2) = source.MergeKey and target.ACTIVE_STATUS = 'Y'").whenMatchedUpdate(set =
        {
            "ACTIVE_STATUS": "'N'",
            "END_DATE": "current_date()"
        }
).whenNotMatchedInsert(
    values={
        "PK1": "source.PK1",
        "PK2": "source.PK2",
        "DIM1": "source.DIM1",
        "DIM2": "source.DIM2",
        "DIM3": "source.DIM3",
        "DIM4": "source.DIM4",
        "ACTIVE_STATUS": "'Y'",
        "START_DATE": "current_date()",
        "END_DATE": """to_date('9999-12-31','yyyy-MM-dd')"""
    }
).execute()

**Calling Target(DeltaTable)**

In [0]:
%sql
Select * from Scd2_Demo

**TIME TRAVEL**

Snapshot of table

In [0]:
%sql 
describe history Scd2_demo

**Time Travel using PySpark**

In [0]:
df = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .table("Scd2_Demo")

display(df)


In [0]:
%sql
DESCRIBE DETAIL Scd2_Demo

In [0]:
df = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2026-01-18T04:39:26.000+00:00" ) \
    .load("Scd2_Demo")

display(df)  

**Time Travel using SQL**

In [0]:
%sql
select * from Scd2_Demo version as of 4

In [0]:
%sql
select * from Scd2_Demo timestamp as of "2026-01-18T04:39:30.000+00:00"

**OPTIMIZE and ZORDER**

In [0]:
%sql
optimize Scd2_Demo
zorder by (PK1)

**VACUUM**

In [0]:
%sql
VACUUM Scd2_Demo RETAIN 168 HOURS