<a href="https://colab.research.google.com/github/codeandclouddev/AzureSynapse/blob/main/SlowlyChangingDimension_Type2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# --- SCD2 (from files) â€” PySpark, no Delta ---

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date, coalesce, input_file_name
from pyspark.sql.types import StringType, BooleanType, StructField, StructType

spark = SparkSession.builder.appName("SCD2_from_files").getOrCreate()

# from google.colab import files
# uploaded = files.upload()

# === 1) FILE PATHS ===
# Upload these CSVs to these paths (or change paths to your Drive)
EXISTING_CSV = "/content/existing_customers.csv"
INCOMING_CSV = "/content/incoming_customers.csv"

# === 2) SCHEMAS ===
existing_schema = StructType([
    StructField("CustomerId", StringType(), True),
    StructField("Name",       StringType(), True),
    StructField("City",       StringType(), True),
    StructField("Tier",       StringType(), True),
    StructField("StartDate",  StringType(), True),   # keep as string for simplicity
    StructField("EndDate",    StringType(), True),   # may be empty
    StructField("IsActive",   StringType(), True),   # we'll treat "true"/"false" as strings
])

incoming_schema = StructType([
    StructField("CustomerId", StringType(), True),
    StructField("Name",       StringType(), True),
    StructField("City",       StringType(), True),
    StructField("Tier",       StringType(), True),
])

# === 3) READ FILES ===
df_existing = (
    spark.read
    .option("header", True)
    .schema(existing_schema)
    .csv(EXISTING_CSV)
)

df_existing = df_existing.withColumn("IsActive", col("IsActive"))

df_incoming = (
    spark.read
    .option("header", True)
    .schema(incoming_schema)
    .csv(INCOMING_CSV)
)

# === 4) ACTIVE SNAPSHOT ===
df_active = df_existing.filter(col("IsActive") == lit("true"))

# === 5) FULL OUTER JOIN ON BUSINESS KEY ===
df_joined = (
    df_active.alias("old")
    .join(df_incoming.alias("new"), col("old.CustomerId") == col("new.CustomerId"), "full")
)

# === 6) CHANGED ROWS (same key exists, but City or Tier changed) ===
changed = df_joined.filter(
    col("old.CustomerId").isNotNull() & col("new.CustomerId").isNotNull() & (
        (coalesce(col("old.City"), lit("")) != coalesce(col("new.City"), lit(""))) |
        (coalesce(col("old.Tier"), lit("")) != coalesce(col("new.Tier"), lit("")))
    )
)

# a) Deactivate old version
df_update_deactivate = (
    changed.select(
        col("old.CustomerId").alias("CustomerId"),
        col("old.Name").alias("Name"),
        col("old.City").alias("City"),
        col("old.Tier").alias("Tier"),
        col("old.StartDate").alias("StartDate")
    )
    .withColumn("EndDate",  current_date().cast("string"))
    .withColumn("IsActive", lit("false"))
)


# b) Insert new active version
df_insert_newversion = (
    changed.select(
        col("new.CustomerId").alias("CustomerId"),
        col("new.Name").alias("Name"),
        col("new.City").alias("City"),
        col("new.Tier").alias("Tier")
    )
    .withColumn("StartDate", current_date().cast("string"))
    .withColumn("EndDate",   lit(None).cast("string"))
    .withColumn("IsActive",  lit("true"))
)

# === 7) NEW KEYS (present only in incoming) ===
new_only = df_joined.filter(col("old.CustomerId").isNull() & col("new.CustomerId").isNotNull())

df_inserts_new = (
    new_only.select(
        col("new.CustomerId").alias("CustomerId"),
        col("new.Name").alias("Name"),
        col("new.City").alias("City"),
        col("new.Tier").alias("Tier"),
    )
    .withColumn("StartDate", current_date().cast("string"))
    .withColumn("EndDate",   lit(None).cast("string"))
    .withColumn("IsActive",  lit("true"))
)

# === 8) RESULTS ===
print("\n-- Deactivate (old versions) --")
df_update_deactivate.show(truncate=False)


print("\n-- Inserts (new versions for changed + totally new records) --")
df_combined_inserts = df_insert_newversion.unionByName(df_inserts_new)
df_combined_inserts.show(truncate=False)

# If you want the NEXT ACTIVE SNAPSHOT (SCD2 after applying changes), union unchanged actives too:
unchanged_actives = df_active.alias("a").join(
    changed.select(col("old.CustomerId").alias("cid")).distinct(),
    col("a.CustomerId") == col("cid"),
    "left_anti"
)

next_active_snapshot = unchanged_actives.select(df_existing.columns) \
    .unionByName(df_insert_newversion.select(df_existing.columns)) \
    .unionByName(df_inserts_new.select(df_existing.columns))

print("\n-- Next active snapshot (post-change) --")
next_active_snapshot.orderBy("CustomerId").show(truncate=False)


-- Deactivate (old versions) --
+----------+-----+---------+------+----------+----------+--------+
|CustomerId|Name |City     |Tier  |StartDate |EndDate   |IsActive|
+----------+-----+---------+------+----------+----------+--------+
|C001      |Alice|Toronto  |Gold  |2025-01-01|2025-11-09|false   |
|C002      |Bob  |Vancouver|Silver|2025-01-01|2025-11-09|false   |
+----------+-----+---------+------+----------+----------+--------+


-- Inserts (new versions for changed + totally new records) --
+----------+-----+---------+------+----------+-------+--------+
|CustomerId|Name |City     |Tier  |StartDate |EndDate|IsActive|
+----------+-----+---------+------+----------+-------+--------+
|C001      |Alice|Montreal |Gold  |2025-11-09|NULL   |true    |
|C002      |Bob  |Vancouver|Gold  |2025-11-09|NULL   |true    |
|C004      |David|Toronto  |Silver|2025-11-09|NULL   |true    |
+----------+-----+---------+------+----------+-------+--------+


-- Next active snapshot (post-change) --
+--------