In [142]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql.types import TimestampType
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
import os

In [143]:
# Create Spark session
builder = SparkSession.builder \
    .appName("SCD2_Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [144]:
spark

In [146]:
#Path configurations
dim_path =  "/home/riddhi/Documents/Riddhi-Tech/Projects/Python-Projects/CRUD-Flask/datasets/delta/dim_registration_data/scd_output_data"
raw_path = "/home/riddhi/Documents/Riddhi-Tech/Projects/Python-Projects/CRUD-Flask/datasets/parquet/registration_data/output_data"

In [None]:
#Read new incoming snapshot
incoming_df = spark.read.parquet(raw_path)
incoming_df.show(1, truncate=False)

In [None]:
# Add start_date and default end_date to incoming data
DEFAULT_END_DATE = "2999-12-31 23:59:59"

incoming_df = incoming_df \
    .withColumn("start_date", current_timestamp()) \
    .withColumn("end_date", lit(DEFAULT_END_DATE).cast(TimestampType()))

incoming_df.show(1, truncate=False)

In [None]:
# Check if existing SCD-delta data exists
if not os.path.exists(dim_path):
    # First-time load: no existing SCD-delta data
    print("SCD-delta table does not exist — writing initial snapshot.")
    incoming_df.write.format("delta").mode("overwrite").save(dim_path)
else: # add new records to SCD-delta table
    # Load existing SCD-delta data
    dim_df = spark.read.format("delta").load(dim_path)

    # Ensure end_date column is correctly cast
    dim_df = dim_df.withColumn("end_date", col("end_date").cast(TimestampType()))

    # Filter active records (end_date = default)
    active_df = dim_df.filter(col("end_date") == lit(DEFAULT_END_DATE))

    #Filter new records (not already present by id)
    new_records_df = incoming_df.join(
        active_df.select("id"),
        on="id",
        how="left_anti"
    )

    print(f"New records found: {new_records_df.count()}")

    #detect updated records
    updated_records_df = incoming_df.alias("incoming").join(
        active_df.alias("active"),
        on="id",
        how="inner"
    ).filter(
        (col("incoming.name") != col("active.name")) |
        (col("incoming.email") != col("active.email")) |
        (col("incoming.password") != col("active.password"))
    ).select("incoming.*")

   # If updated records are found, set their end_date to current timestamp, as we need to add them as new versions
    expired_updated_df = active_df.join(
        updated_records_df.select("id"),
        on="id",
        how="inner"
    ).withColumn("end_date", current_timestamp())

    # Detect deleted records
    deleted_records_df = active_df.join(
        incoming_df.select("id"),
        on="id",
        how="left_anti"
    ).withColumn("end_date", current_timestamp())

    print(f"Deleted records found: {deleted_records_df.count()}")
    
    # Union deleted records with expired updated records (records to be marked as inactive)
    inactive_records_df = deleted_records_df.unionByName(expired_updated_df)

    # Remove old active versions for deleted records from active_df
    remaining_active_df = active_df.join(
    inactive_records_df.select("id"),
    on="id",
    how="left_anti"
    )

    #Union and write back:
    # - all non-active history
    # - remaining active (not deleted)
    # - new records
    # - deleted records (with updated end_date)
    # - updated records
    final_df = dim_df.filter(col("end_date") != lit(DEFAULT_END_DATE)) \
                .unionByName(new_records_df) \
                .unionByName(deleted_records_df) \
                .unionByName(remaining_active_df) \
                .unionByName(updated_records_df.withColumn("end_date", lit(DEFAULT_END_DATE).cast(TimestampType())))

    final_df.show(10, truncate=False)
    final_df.write.format("delta").mode("overwrite").save(dim_path) 
    print("Updated SCD-delta table written successfully.")

In [None]:
# Check Delta table history
delta_table = DeltaTable.forPath(spark, dim_path)
history_df = delta_table.history()  # By default, returns last 20 operations
history_df.show(truncate=False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|1      |2025-07-14 17:28:11.45 |NULL

In [None]:
# Read the Delta table at version 0
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(dim_path)
df_v0.show(50, truncate=False)