## **Project Title: Implement SCD Type 2 in Lakehouse for Employee History**

## 🎯 **Objective**
#### Use PySpark in a Microsoft Fabric Lakehouse to implement Slowly Changing Dimension Type 2 (SCD Type 2) to track changes in employee records over time.

#### **USE CASE**: Human Resources want to monitor and track the history of employees record changes. This is difficult in current database employee_master table given that the table can accept duplicates, can't keep track of history (what change, when and which record is active for an employee)

### Solution Architecture:
[Architecture](https://drive.google.com/file/d/1wSsc9ilQzCwaSv6xjwfjCDzcRyMeRmwP/view?usp=drive_link)

### Implementation capabilities
**1. Read Source data from bronze_layer schema**

**2. Eliminated Duplicate Records**

**3. Backfill to assign start date for NULL LoadDate records**

**4. Lead approach to assign StartDate of earlier records as EndDate of Previous ones**

**5. Compare source data with scd type2 historical data to identify changes**

**6. Extract Records to Insert & Retired**

**7. Apply changes to silver_layer schema scd type2 historical data**

**8. Identify active and inactive records in ranking of their versions**

**9. Keeps seemless records of historical changes on employee history data**

**10. Adapt medallion layer method (bronze to silver)**



## Limitations
**1. This implementation is not suitable for employees with multiple active roles**

**2. The scope is subjected to manual importation data.**

### Improvement Recommendation
1. Consideration of multiple active roles/department
2. For Production, Live data source ingestion is advisable
3. Great to have data pipleline configured to execute after successful data refresh
4. Implementation can be optimized to break if no new records found

### Tech Stack
**1. Microsoft Fabric**

**2. PySpark & SparkSQL**

**3. Powerpoint (Architecture Design)**

In [1]:
# Initialize Spark Session
from pyspark.sql.functions import col, lit, when, coalesce, to_date, current_timestamp, expr, monotonically_increasing_id
from pyspark.sql.functions import row_number, max as max_
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
spark = SparkSession.builder.getOrCreate()

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 3, Finished, Available, Finished)

##### Step 1: Assign table paths to variables for ease usage

In [2]:
source_path = "Tables/bronze_layer/employee_master"
dim_path = "Tables/silver_layer/employee_history"

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 4, Finished, Available, Finished)

#### Step 2: Loading Source data

In [3]:
master_df = master_df = spark.read.format("delta").load(source_path)

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 5, Finished, Available, Finished)

#### Step 3: Remove duplicate and formating LoadDate

In [4]:
master_df = master_df.dropDuplicates(["EmpID","Name","Gender", "JobTitle", "Department", "LoadDate"])
master_df = master_df.withColumn("LoadDate", to_date("LoadDate"))

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 6, Finished, Available, Finished)

##### Inserting a new record for history tracking observation. This because the script has been executed and the employee_history scd_type table is created at the time of this documentation

In [5]:
#Adding a record to track history
spark.sql("""
    INSERT INTO scd_type2_lakehouse.bronze_layer.employee_master (EmpID, Name, Gender, JobTitle, Department, LoadDate)
    VALUES (21, 'Muftahu Abdulrahman', 'Male', 'ERP Systems  & Data Engineer', 'IT', DATE('2025-07-01'))
""")

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 7, Finished, Available, Finished)

DataFrame[]

#### Step 4: Date Backfill - This is a tricky approach to fill records with NULL LoadDate.
##### Approach: Window Partition orderBy LoadDate in descending order with null at the bottom for each partition, then ranking using temporary number giving newest date the smallest and oldest the largest number. StartDate is obtain by substracting the years based on rank number (i.e ) from MaxDate 

In [6]:
# Partitioning from Latest to Oldest Date with null at the bottom 
temporary_order = Window.partitionBy("EmpID").orderBy(F.col("LoadDate").desc_nulls_last())

# Partitioning from Oldest to Latest for each partition
versioning = Window.partitionBy("EmpID").orderBy(F.col("LoadDate").asc())

# Add temp_num and version columns. temp_num will help to compute EndDate for records without LoadDate in descending order
# Version will hlep to rank record by LoadDate in ascending order. This way, records with Latest StartDate will be the latest version with highest rank number
master_df = master_df.withColumn("temp_num", F.row_number().over(temporary_order)) \
            .withColumn("version", F.row_number().over(versioning))

# Computing MaxDate which is the max LoadDate minus temp_num years (if temp_num = 3, 3*12 months will be substracted from max(LoadDate))
master_df = master_df.withColumn("MaxLoadDate",F.add_months(F.max("LoadDate").over(temporary_order),-1 * F.col("temp_num") * 12))

#StartDate is LoadDate fro NotNull LoadDate; MaxLoadDate for Null LoadDate.
# Records with Null MaxLoadDate are assign the current date
master_df = master_df.withColumn(
    "StartDate",
    F.when(F.col("LoadDate").isNotNull(), F.col("LoadDate"))
     .otherwise(F.coalesce(F.col("MaxLoadDate"), F.current_date()))
)

# Dropping irrelevant columns and forcing display columns
master_df = master_df.drop("temp_num", "MaxLoadDate", "LoadDate")
master_df   = master_df.select(
    "EmpID", "Name", "Gender", "JobTitle","Department", "version","StartDate")

display(master_df)

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d36c8d59-d30b-4e29-a276-e5aab422651e)

#### Step 5: Retiring Older versions and Assigning IsActive = True for active version records
##### Another tricky approach: Window partition to order version in ascending so that the last StartDate of the last row in a partition become the EndDate of the row before it using lead() function

In [7]:
# 1. Define ascending version order window (older to newer)
version_window = Window.partitionBy("EmpID").orderBy(F.col("version").asc())

# 2. Add EndDate to retire older version history
scdtype2_prepared = master_df.withColumn(
    "EndDate",
    F.lead("StartDate").over(version_window)
)

# 3. Set IsActive = True if EndDate is null (latest version)
scdtype2_prepared = scdtype2_prepared.withColumn(
    "IsActive",
    F.when(F.col("EndDate").isNull(), F.lit(True)).otherwise(F.lit(False))
)

display(scdtype2_prepared)

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a06a6118-eb8f-461d-95d8-db831f8f339a)

##### Step 6: Write processed data to SCD Type 2 employee_history table if not exist
##### If table exist then read the table and proceed to detect changes.
##### A complex query comparison between employee_history and processed data from source.
##### The idea is that, all change records will have new entry which are always the active one with previous retired. So, filter the processed data by IsActive = True.
##### Notice that the detected change is the new inserted record from above


In [8]:
# Check if employee_history exists
if not DeltaTable.isDeltaTable(spark, dim_path):

    # Add employee_sk column
    df_with_sk = scdtype2_prepared.withColumn("employee_sk", monotonically_increasing_id())

    # Reorder columns so that employee_sk is first
    reordered_cols = ["employee_sk"] + [col for col in df_with_sk.columns if col != "employee_sk"]

    #Write to scd type2
    df_with_sk.select(reordered_cols).write.format("delta").save(dim_path)

    print("Initial dim_employee table created.")
else:
    # Load existing employee_history table
    dim_employee_df = DeltaTable.forPath(spark, dim_path)

        # Filter existing employee_history for join operation
    df_existing =  dim_employee_df.toDF().filter("IsActive = True")
    
    # Join to detect changes
    join_cond = [scdtype2_prepared["EmpID"] == df_existing["EmpID"]]
    df_changes = scdtype2_prepared.join(df_existing, join_cond, "left_outer") \
    .where(
        (
            (scdtype2_prepared["Name"] != df_existing["Name"]) |
            (scdtype2_prepared["JobTitle"] != df_existing["JobTitle"]) |
            (scdtype2_prepared["Department"] != df_existing["Department"]) |
            (scdtype2_prepared["version"] != df_existing["version"]) |
            df_existing["EmpID"].isNull()
        ) & 
        (scdtype2_prepared["IsActive"] == "True")
    )
    display(df_changes)


StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 055bc72f-bfc7-45c3-a7c3-fe734311e231)

##### Step 7: Extract record to retire from employee_history filter by IsActive = True (i.e df_existing) whose EmpID matches with detected changes

In [9]:
# Rows to retire in existing history data
df_retire = df_existing.join(df_changes, "EmpID", "inner") \
    .select(df_existing["*"])
display(df_retire)

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f6bd903f-9df9-4a24-8b90-3e974eeb6e6a)

##### Step 8: Extract rows to insert by matching processed data from source filter by IsActive=True with detected changes

In [10]:
#Rows to insert
df_new = scdtype2_prepared.filter(F.col("IsActive") == True) \
    .join(df_changes, "EmpID", "inner") \
    .select(scdtype2_prepared["*"])
display(df_new)

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cbbbfcd2-0924-465e-b32f-e46c72b12540)

##### Step 9: Update or retire old records from existing scd type2 employee_history and insert new records

In [11]:
# Apply updates
if not df_expire.isEmpty():
    dim_employee_df.alias("target").merge(
        df_expire.alias("source"),
        "target.EmpID = source.EmpID AND target.IsActive = True"
    ).whenMatchedUpdate(set={
        "IsActive": "False",
        "EndDate": "current_date()"
    }).execute()

# Append new records
if not df_new.isEmpty():
    df_new.withColumn("employee_sk", monotonically_increasing_id()) \
        .write.format("delta").mode("append").save(dim_path)

print("SCD Type 2 logic applied.")

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 13, Finished, Available, Finished)

SCD Type 2 logic applied.


##### Step 10: Verify update is effected. From result below, old result is retired and new record was inserted successfully 

In [12]:
#Dissplay employee_history table after changes has been effected
df = spark.sql("SELECT * FROM scd_type2_lakehouse.silver_layer.employee_history LIMIT 1000")
display(df)

StatementMeta(, 1f7206a9-da46-4858-9f3e-a9a958f4888d, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a8d6be60-1246-4d29-8672-05c811c37199)