In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date

# Initialize Spark session
spark = SparkSession.builder.appName("SCD2").getOrCreate()

# Sample data before change
data_before = [("1", "John Doe", "123 Old St", "2022-01-01", "9999-12-31"),
               ("2", "Jane Doe", "456 New Ave", "2022-01-01", "9999-12-31")]

columns = ["CustomerID", "Name", "Address", "StartDate", "EndDate"]

df_before = spark.createDataFrame(data_before, columns).alias("before")

# Sample data after change (John Doe's address has changed)
data_after = [("1", "John Doe", "789 New St1"),
              ("2", "Jane Doe", "456 New Ave1")]

df_after = spark.createDataFrame(data_after, ["CustomerID", "Name", "Address"]).alias("after")

# Current date
current_date_val = current_date()

# Identify records that have changed
changed_records = df_before.join(df_after, "CustomerID") \
    .filter(col("before.Address") != col("after.Address"))

# Mark old records as inactive by setting the end date to the current date
updated_inactive_records = changed_records \
    .withColumn("EndDate", lit(current_date_val)) \
    .select(col("before.CustomerID"), col("before.Name"), col("before.Address"), col("before.StartDate"), lit(current_date_val).alias("EndDate"))

# Create new active records with the updated address and the start date as the current date
new_active_records = changed_records \
    .select(col("after.CustomerID"), col("after.Name"), col("after.Address")) \
    .withColumn("StartDate", lit(current_date_val)) \
    .withColumn("EndDate", lit("9999-12-31"))

# Combine unchanged records with updated records
df_unchanged = df_before.join(changed_records, "CustomerID", "left_anti")
df_scd2 = df_unchanged.union(updated_inactive_records).union(new_active_records)

# Display the final SCD2 table
df_scd2.show()


+----------+--------+------------+----------+----------+
|CustomerID|    Name|     Address| StartDate|   EndDate|
+----------+--------+------------+----------+----------+
|         1|John Doe|  123 Old St|2022-01-01|2025-02-11|
|         2|Jane Doe| 456 New Ave|2022-01-01|2025-02-11|
|         1|John Doe| 789 New St1|2025-02-11|9999-12-31|
|         2|Jane Doe|456 New Ave1|2025-02-11|9999-12-31|
+----------+--------+------------+----------+----------+

