In [0]:
from pyspark.sql.functions import *

In [0]:
source_data = [
 (1, "Alice", "HR", 5000), # No Change
 (2, "Bob", "Finance", 7000), # Updated Salary
 (3, "Charlie", "IT", 9000) # New Record
]
source_df = spark.createDataFrame(source_data, ["id", "name", "department", "salary"])

In [0]:
dim_data = [
 (1, "Alice", "HR", 5000, "2024-01-01", "9999-12-31", "Y"),
 (2, "Bob", "Finance", 6500, "2024-02-01", "9999-12-31", "Y")
]
dim_df = spark.createDataFrame(dim_data, ["id", "name", "department", "salary", "start_date", "end_date", "current_flag"])

In [0]:
join_df = source_df.alias("src").join(dim_df.alias("dim"), "id", "left")

In [0]:
updates_df = join_df.filter(
 (col("dim.id").isNotNull()) & 
 ((col("src.name") != col("dim.name")) | 
 (col("src.department") != col("dim.department")) | 
 (col("src.salary") != col("dim.salary")))
).select(
 col("src.id"), col("src.name"), col("src.department"), col("src.salary"),
 current_date().alias("start_date"),
 lit("9999-12-31").alias("end_date"),
 lit("Y").alias("current_flag")
)

In [0]:
expired_df = updates_df.select("id").join(dim_df, "id") \
 .withColumn("end_date", current_date()) \
 .withColumn("current_flag", lit("N"))

In [0]:
inserts_df = source_df.alias("src").join(dim_df.alias("dim"), "id", "left_anti") \
 .withColumn("start_date", current_date()) \
 .withColumn("end_date", lit("9999-12-31")) \
 .withColumn("current_flag", lit("Y"))

In [0]:
final_dim_df = (
 dim_df.filter(col("current_flag") == "Y").subtract(expired_df) # Keep Active Records
 .union(updates_df) # Add Updated Records
 .union(inserts_df) # Add New Inserts
)

In [0]:
final_dim_df.display()