In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

from pyspark.sql.functions import col, current_date, when
from pyspark.sql.types import DateType

df_raw = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/scd_data.csv")
)

df_cleaned = df_raw.withColumn(
    "LoadDate",
    when(col("LoadDate").isNull(), current_date()).otherwise(col("LoadDate").cast(DateType()))
)

display(df_cleaned)


StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3ca985f6-7139-4f16-a48b-d0696f91f433)

In [2]:
from pyspark.sql.functions import col, current_date, trim
from pyspark.sql.types import DateType

df_casted = df_raw.withColumn(
    "LoadDate",
    trim(col("LoadDate")).cast(DateType())
)


df_deduped = df_casted.dropDuplicates()


df_final = df_deduped.withColumn(
    "LoadDate",
    when(col("LoadDate").isNull(), current_date()).otherwise(col("LoadDate"))
)

df_final.select("LoadDate").distinct().show()

StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 4, Finished, Available, Finished)

+----------+
|  LoadDate|
+----------+
|2023-06-01|
|2025-07-24|
|2024-01-01|
+----------+



In [3]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("EmpID").orderBy(F.col("LoadDate").desc())

df_silver = df_final.withColumn("row_num", row_number().over(window_spec)) \
                    .filter(F.col("row_num") == 1) \
                    .drop("row_num")

df_silver.show(truncate=False)

StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 5, Finished, Available, Finished)

+-----+----------------+------+---------------+----------+----------+
|EmpID|Name            |Gender|JobTitle       |Department|LoadDate  |
+-----+----------------+------+---------------+----------+----------+
|1    |Allison Hill    |Male  |Data Analyst   |HR        |2025-07-24|
|2    |Noah Rhodes     |Male  |Data Engineer  |Finance   |2025-07-24|
|3    |Angie Henderson |Male  |Product Manager|IT        |2025-07-24|
|4    |Daniel Wagner   |Female|Data Analyst   |IT        |2025-07-24|
|5    |Cristian Santos |Male  |Data Engineer  |Finance   |2025-07-24|
|6    |Connie Lawrence |Male  |Product Manager|Finance   |2025-07-24|
|7    |Abigail Shaffer |Female|Data Engineer  |Marketing |2025-07-24|
|8    |Gina Moore      |Female|Data Analyst   |Finance   |2025-07-24|
|9    |Gabrielle Davis |Female|BI Developer   |HR        |2025-07-24|
|10   |Ryan Munoz      |Male  |Data Engineer  |HR        |2025-07-24|
|11   |Monica Herrera  |Male  |Data Analyst   |Marketing |2025-07-24|
|12   |Jamie Arnold 

In [4]:
df_silver = df_silver.withColumn("StartDate", F.current_date()) \
                     .withColumn("EndDate", F.lit(None).cast("date")) \
                     .withColumn("IsCurrent", F.lit(True))

df_silver.write.mode("overwrite").saveAsTable("dim_employee")
print("✅ Initial load of dim_employee completed.")

StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 6, Finished, Available, Finished)

✅ Initial load of dim_employee completed.


In [5]:
from pyspark.sql.functions import col

df_dim = spark.table("dim_employee").filter("IsCurrent = True")

df_joined = df_final.alias("new").join(
    df_dim.alias("existing"),
    on="EmpID",
    how="left"
)

df_changed = df_joined.filter(
    (col("new.JobTitle") != col("existing.JobTitle")) |
    (col("new.Department") != col("existing.Department"))
).select("new.*")

df_new = df_joined.filter(col("existing.EmpID").isNull()).select("new.*")


StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 7, Finished, Available, Finished)

In [6]:
from pyspark.sql.functions import current_date, when

emp_ids_changed = [row["EmpID"] for row in df_changed.select("EmpID").distinct().collect()]

df_dim_updated = spark.table("dim_employee").withColumn(
    "IsCurrent",
    when(col("EmpID").isin(emp_ids_changed), False).otherwise(col("IsCurrent"))
).withColumn(
    "EndDate",
    when(col("EmpID").isin(emp_ids_changed), current_date()).otherwise(col("EndDate"))
)

from pyspark.sql.functions import lit

df_to_insert = df_changed.union(df_new) \
    .withColumn("IsCurrent", lit(True)) \
    .withColumn("StartDate", current_date()) \
    .withColumn("EndDate", lit(None).cast("date"))

df_final_dim = df_dim_updated.unionByName(df_to_insert)

df_final_dim.write.format("delta").mode("overwrite").saveAsTable("dim_employee")

print("✅ Incremental SCD Type 2 load completed.")

StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 8, Finished, Available, Finished)

✅ Incremental SCD Type 2 load completed.


In [7]:
spark.table("dim_employee").orderBy("EmpID").show(truncate=False)

StatementMeta(, 76123cf6-f0b8-45c6-9ebe-901ffb5811e1, 9, Finished, Available, Finished)

+-----+---------------+------+---------------+----------+----------+----------+----------+---------+
|EmpID|Name           |Gender|JobTitle       |Department|LoadDate  |StartDate |EndDate   |IsCurrent|
+-----+---------------+------+---------------+----------+----------+----------+----------+---------+
|1    |Allison Hill   |Male  |Data Analyst   |HR        |2025-07-24|2025-07-24|2025-07-24|false    |
|1    |Allison Hill   |Male  |BI Developer   |IT        |2023-06-01|2025-07-24|NULL      |true     |
|2    |Noah Rhodes    |Male  |Data Engineer  |Finance   |2025-07-24|2025-07-24|NULL      |true     |
|3    |Angie Henderson|Male  |Data Engineer  |Finance   |2023-06-01|2025-07-24|NULL      |true     |
|3    |Angie Henderson|Male  |Product Manager|IT        |2025-07-24|2025-07-24|2025-07-24|false    |
|3    |Angie Henderson|Male  |ML Engineer    |IT        |2024-01-01|2025-07-24|NULL      |true     |
|4    |Daniel Wagner  |Female|Data Analyst   |IT        |2025-07-24|2025-07-24|NULL      |t