Created a table with some data in DELTA format

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit

data = [
    (1, 'Alice', 'HR', '2023-01-01'),
    (2, 'Bob', 'Finance', '2023-01-01'),
    (3, 'Charlie', 'IT', '2023-01-01')
]

columns = ['emp_id', 'name', 'department', 'start_date']

df = spark.createDataFrame(data, columns) \
    .withColumn("end_date", lit(None).cast("string")) \
    .withColumn("is_current", lit(True))

df.write.format("delta").mode("overwrite").saveAsTable("employee_dim")


In [0]:
display(df)

data with some changes

In [0]:
incoming_data = [
    (1, 'Alice', 'HR'),             # No change
    (2, 'Bob', 'Marketing'),        # Changed department
    (4, 'David', 'IT')              # New employee
]

incoming_columns = ['emp_id', 'name', 'department']
incoming_df = spark.createDataFrame(incoming_data, incoming_columns)

display(incoming_df)


In [0]:
df=spark.read.format("delta").table("default.employee_dim")
display(df)

applying left join from incoming data to current data to check the differences

In [0]:
from pyspark.sql.functions import col

joined_df=incoming_df.alias("incoming").join(
  df.filter("is_current=True").alias("current"),
  col("incoming.emp_id") == col("current.emp_id"),
  "left"
)

display(joined_df)


Filtering the data-
- if the name is NULL in current data, means new data
- if the names are not same after joining, means name updated
- if the department name is not same, means department updated

In [0]:
#filtered out the changes
changed_records = joined_df.filter(
    (col("current.name").isNull()) |
    (col("incoming.name") != col("current.name")) |
    (col("incoming.department") != col("current.department"))
).select("incoming.*")

display(changed_records)

In [0]:
from pyspark.sql.functions import current_date

# Expire old records
updates_df = changed_records.join(
    df.filter("is_current = True"), "emp_id"
).select(
    "emp_id", "name", "department", "start_date",
    current_date().alias("end_date"),
    lit(False).alias("is_current")
)

# Add new records with current flag
new_records_df = changed_records.withColumn("start_date", current_date()) \
    .withColumn("end_date", lit(None).cast("date")) \
    .withColumn("is_current", lit(True))

display(new_records_df)



In [0]:
# Create a temporary view for the current dimension table
df.createOrReplaceTempView("employee_dim_view")

# Create a temporary view for the new records to be inserted
new_records_df.createOrReplaceTempView("new_records_view")

# Expire old records by updating end_date and is_current for matched records
expired_records = spark.sql("""
  SELECT
    d.emp_id,
    d.name,
    d.department,
    d.start_date,
    n.start_date AS end_date,
    false AS is_current
  FROM employee_dim_view d
  INNER JOIN new_records_view n
    ON d.emp_id = n.emp_id
  WHERE d.is_current = true
""")

# Keep unchanged current records
unchanged_records = spark.sql("""
  SELECT *
  FROM employee_dim_view
  WHERE is_current = true
    AND emp_id NOT IN (SELECT emp_id FROM new_records_view)
""")

# Combine expired, unchanged, and new records for the new SCD2 view
scd2_final = expired_records.unionByName(unchanged_records).unionByName(new_records_df)

# Create or replace a view with the SCD2 result
scd2_final.createOrReplaceTempView("employee_dim_scd2_view")

display(spark.sql("SELECT * FROM employee_dim_scd2_view"))