In [0]:
%sql
CREATE OR REPLACE TABLE dev_bronze.test_schma_tmp.scd2Demo (
  pk1 INT,
  pk2 STRING,
  dim1 INT,
  dim2 INT,
  dim3 INT,
  dim4 INT,
  active_status STRING,
  start_date TIMESTAMP,
  end_date TIMESTAMP
)
USING DELTA

In [0]:
%sql
insert into dev_bronze.test_schma_tmp.scd2Demo values (111,'Unit1', 200,500,800,400,'Y', current_timestamp(),'9999-12-31');
insert into dev_bronze.test_schma_tmp.scd2Demo values (222,'Unit2', 900,Null,700,100,'Y', current_timestamp(),'9999-12-31');
insert into dev_bronze.test_schma_tmp.scd2Demo values (333,'Unit3', 300,900,250,650,'Y', current_timestamp(),'9999-12-31');

In [0]:
from delta import DeltaTable
target_table_delta = DeltaTable.forName(spark,"dev_bronze.test_schma_tmp.scd2Demo")
target_table = target_table_delta.toDF()
display(target_table)

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("pk1", StringType(), True),
    StructField("pk2", StringType(), True),
    StructField("dim1", IntegerType(), True),
    StructField("dim2", IntegerType(), True),
    StructField("dim3", IntegerType(), True),
    StructField("dim4", IntegerType(), True)
])

data = [
    ('111', 'Unit1', 200, 500, 800, 400),
    ('222', 'Unit2', 800, 1300, 800, 500),
    ('444', 'Unit4', 100, None, 700, 300)
]

sourceDF = spark.createDataFrame(data=data, schema=schema)
display(sourceDF)

In [0]:
joined_df = (
    sourceDF.join(
        target_table,
        (sourceDF.pk1 == target_table.pk1)
        & (sourceDF.pk2 == target_table.pk2)
        & (target_table.active_status == "Y"),
        "left",
    )
    .selectExpr("*")
    .select(
        sourceDF["*"],
        target_table["pk1"].alias("target_pk1"),
        target_table["pk2"].alias("target_pk2"),
        target_table["dim1"].alias("target_dim1"),
        target_table["dim2"].alias("target_dim2"),
        target_table["dim3"].alias("target_dim3"),
        target_table["dim4"].alias("target_dim4"),
    )
)
display(joined_df)

In [0]:
changed_and_new_data_df = joined_df.filter(
    xxhash64(joined_df.dim1, joined_df.dim2, joined_df.dim3, joined_df.dim4)
    != xxhash64(joined_df.target_dim1,joined_df.target_dim2,joined_df.target_dim3,joined_df.target_dim4)
)
display(changed_and_new_data_df)

In [0]:
target_table.show()

In [0]:
changed_and_new_data_df_with_merge_key = changed_and_new_data_df.withColumn("MERGEKEY", concat(changed_and_new_data_df.pk1, changed_and_new_data_df.pk2))
display(changed_and_new_data_df_with_merge_key)

In [0]:
extra_records_for_changed_records = changed_and_new_data_df.filter(
    "target_pk1 is not null"
).withColumn("MERGEKEY", lit(None))
display(extra_records_for_changed_records)

In [0]:
final_SCD_DF = changed_and_new_data_df_with_merge_key.union(extra_records_for_changed_records)
display(final_SCD_DF)

In [0]:
target_table_delta.alias("target").merge(
    source = final_SCD_DF.alias("source"),
    condition = "source.MERGEKEY = concat(target.pk1,target.pk2) and target.active_status = 'Y'",
).whenMatchedUpdate(
    set={"active_status": "'N'", "end_date": "current_date"}
).whenNotMatchedInsert(
    values={
        "pk1": "source.pk1",
        "pk2": "source.pk2",
        "dim1": "source.dim1",
        "dim2": "source.dim2",
        "dim3": "source.dim3",
        "dim4": "source.dim4",
        "active_status": "'Y'",
        "start_date": "current_date",
        "end_date": "to_date('9999-12-31','yyyy-MM-dd')",
    }
).execute()

In [0]:
display(target_table) # or display(target_table_delta.toDF())