In [5]:
import logging
from pyspark.sql.session import SparkSession, DataFrame
from pyspark.sql.types import DateType
from pyspark.sql.functions import lit, max as spark_max, when, col, coalesce
from delta import *

In [6]:
builder = (
    SparkSession.builder.appName("scd2")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [7]:
# logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.info("Spark session created successfully with Delta Lake configuration")

2025-10-12 16:14:30 - __main__ - INFO - Spark session created successfully with Delta Lake configuration


In [8]:
# add metadata function
def add_scd2_metadata(dataframe: DataFrame):
    logger.info(
        "Adding SCD2 metadata columns: is_active, start_date, end_date, version"
    )
    return (
        dataframe.withColumn("is_active", lit(1))
        .withColumn("start_date", coalesce(dataframe["created_date"]))
        .withColumn("end_date", lit(""))
        .withColumn("version", lit(1))
    )


def new_data_classifier(current_data: DataFrame, new_data: DataFrame):
    current_data.createOrReplaceTempView("current_data")
    new_data.createOrReplaceTempView("new_data")

    changes_query = """
    SELECT *
    FROM new_data
    EXCEPT
    SELECT * EXCEPT(start_date, end_date, version, is_active)
    FROM current_data
    """

    return spark.sql(changes_query)


def expire_old_register(target_table: str, data_to_update_df: DataFrame, ids: list):
    data_to_update_df.createOrReplaceTempView("data_to_expire")
    join_conditions = " AND ".join([f"target.{id} = source.{id}" for id in ids])

    return spark.sql(
        f"""
        MERGE INTO {target_table} AS target
        USING data_to_expire AS source
        ON {join_conditions} AND target.is_active = 1
        WHEN MATCHED THEN UPDATE SET
            target.is_active = 0,
            target.end_date = CURRENT_DATE()
    """
    )
    


In [9]:
# animals data csv
logger.info("Reading animals data from CSV file: pyspark_cr7/scd2/data/animals.csv")
current_data_df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("data/animals.csv")
)

logger.info(f"dataframe loaded with {current_data_df.count()} rows")


2025-10-12 16:14:46 - __main__ - INFO - Reading animals data from CSV file: pyspark_cr7/scd2/data/animals.csv
2025-10-12 16:14:53 - __main__ - INFO - dataframe loaded with 10 rows           


In [10]:
# add metadata
current_data_df = add_scd2_metadata(current_data_df)
logger.info("SCD2 metadata added successfully")

2025-10-12 16:14:57 - __main__ - INFO - Adding SCD2 metadata columns: is_active, start_date, end_date, version
2025-10-12 16:14:57 - __main__ - INFO - SCD2 metadata added successfully


In [11]:
# write animals table
current_data_df.write.format("delta").mode("overwrite").saveAsTable("scd_type2_animals")
logger.info("Animals table successfully written to Delta format")

25/10/12 16:15:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2025-10-12 16:15:14 - __main__ - INFO - Animals table successfully written to Delta format


In [12]:
# read animals table
logger.info("Reading animals data from Delta table: scd_type2_animals")
current_data_df = spark.read.table("scd_type2_animals")
current_data_df.show()
current_data_df.printSchema()

2025-10-12 16:15:14 - __main__ - INFO - Reading animals data from Delta table: scd_type2_animals


+---+-------+------------+---------+----------+--------+-------+
| id| animal|created_date|is_active|start_date|end_date|version|
+---+-------+------------+---------+----------+--------+-------+
|  1|    Cat|  2023-01-01|        1|2023-01-01|        |      1|
|  2|    Cow|  2023-01-04|        1|2023-01-04|        |      1|
|  3|    Dog|  2023-02-19|        1|2023-02-19|        |      1|
|  4|  Horse|  2023-03-05|        1|2023-03-05|        |      1|
|  5| Rabbit|  2023-03-15|        1|2023-03-15|        |      1|
|  6|   Bird|  2023-04-10|        1|2023-04-10|        |      1|
|  7|   Fish|  2023-05-22|        1|2023-05-22|        |      1|
|  8|Hamster|  2023-06-08|        1|2023-06-08|        |      1|
|  9| Turtle|  2023-07-14|        1|2023-07-14|        |      1|
| 10| Lizard|  2023-08-30|        1|2023-08-30|        |      1|
+---+-------+------------+---------+----------+--------+-------+

root
 |-- id: string (nullable = true)
 |-- animal: string (nullable = true)
 |-- created

In [13]:
# new animals data csv
logger.info(
    "Reading updated animals data from CSV file: pyspark_cr7/scd2/data/updated_animals.csv"
)
new_data_df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("data/updated_animals.csv")
)

new_data_df.show()
new_data_df.printSchema()

2025-10-12 16:15:20 - __main__ - INFO - Reading updated animals data from CSV file: pyspark_cr7/scd2/data/updated_animals.csv


+---+-----------+------------+
| id|     animal|created_date|
+---+-----------+------------+
|  1|        Cat|  2025-01-01|
|  2|        Cow|  2025-01-04|
|  3|        Dog|  2025-02-19|
|  4|      Horse|  2023-03-05|
|  5|     Rabbit|  2023-03-15|
|  6|       Bird|  2023-04-10|
|  7|       Fish|  2023-05-22|
|  8|    Hamster|  2025-06-08|
|  9|Evil Turtle|  2023-07-14|
| 10|  Charizard|  2023-08-30|
| 11|        Mew|  2025-05-13|
+---+-----------+------------+

root
 |-- id: string (nullable = true)
 |-- animal: string (nullable = true)
 |-- created_date: string (nullable = true)



In [15]:
# Classify new data and expire old register
logger.info("Classify new data and expire old register")

data_to_update_df = new_data_classifier(
    current_data=current_data_df, new_data=new_data_df
)

data_to_update_df.show()

expire_old_register(
    "scd_type2_animals", data_to_update_df, ["id"]
)


2025-10-12 16:15:57 - __main__ - INFO - Classify new data and expire old register


+---+-----------+------------+
| id|     animal|created_date|
+---+-----------+------------+
|  1|        Cat|  2025-01-01|
| 10|  Charizard|  2023-08-30|
|  8|    Hamster|  2025-06-08|
|  3|        Dog|  2025-02-19|
| 11|        Mew|  2025-05-13|
|  9|Evil Turtle|  2023-07-14|
|  2|        Cow|  2025-01-04|
+---+-----------+------------+



25/10/12 16:16:03 WARN MapPartitionsRDD: RDD 120 was locally checkpointed, its lineage has been truncated and cannot be recomputed after unpersisting


In [16]:
# Append new regieters with metadata


def append_new_scd2_data(updated_dataframe: DataFrame, table_name: str, ids: list):

    latest_data_df = spark.read.table(table_name)
    latest_data_df = current_data_df.groupBy(ids).agg(
        spark_max("version").alias("latest_version")
    )

    join_condition = [id for id in ids]
    
    to_append_records_df = updated_dataframe.join(
        latest_data_df, join_condition, "left"
    )


    to_append_records_df = (
        to_append_records_df.withColumn(
            "version",
            when(col("latest_version").isNull(), lit(1)).otherwise(
                col("latest_version") + 1
            ),
        )
        .withColumn("start_date", (col("created_date")))
        .withColumn("end_date", lit(""))
        .withColumn("is_active", lit(1))
        .drop("latest_version")
    )

    return (
        to_append_records_df.write.format("delta")
        .mode("append")
        .saveAsTable(table_name)
    )

append_new_scd2_data(
    updated_dataframe=data_to_update_df, table_name="scd_type2_animals", ids=["id"]
)


spark.read.table("scd_type2_animals").show()
spark.stop()



+---+-----------+------------+---------+----------+----------+-------+
| id|     animal|created_date|is_active|start_date|  end_date|version|
+---+-----------+------------+---------+----------+----------+-------+
|  1|        Cat|  2023-01-01|        0|2023-01-01|2025-10-12|      1|
|  2|        Cow|  2023-01-04|        0|2023-01-04|2025-10-12|      1|
|  3|        Dog|  2023-02-19|        0|2023-02-19|2025-10-12|      1|
|  4|      Horse|  2023-03-05|        1|2023-03-05|          |      1|
|  5|     Rabbit|  2023-03-15|        1|2023-03-15|          |      1|
|  6|       Bird|  2023-04-10|        1|2023-04-10|          |      1|
|  7|       Fish|  2023-05-22|        1|2023-05-22|          |      1|
|  8|    Hamster|  2023-06-08|        0|2023-06-08|2025-10-12|      1|
|  9|     Turtle|  2023-07-14|        0|2023-07-14|2025-10-12|      1|
| 10|     Lizard|  2023-08-30|        0|2023-08-30|2025-10-12|      1|
|  1|        Cat|  2025-01-01|        1|2025-01-01|          |      2|
| 10| 