In [147]:
import urllib
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime

server='DESKTOP-HJVSCEN\MSSQLSERVER1'
database='Python ETL'
username='sa'
password='Ka@12345678'


ConnectionString = f"""
    DRIVER={{ODBC Driver 18 for SQL Server}};
    SERVER={server};
    DATABASE={database};
    UID={username};
    PWD={password};
    TrustServerCertificate=yes;
"""
# URL-encode the connection string for SQLAlchemy
params=urllib.parse.quote_plus(ConnectionString)

engine=create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

### Step 1 – Creating Initial Employee Dataset for Full SCD Type 6 Implementation

This dataset contains employee records with attributes handled as different SCD types:

- **Type 1: `EmailAddress`** → Minor changes like typo corrections, updated in-place.
- **Type 2: `Designation`, `Department`** → Track full historical versioning with new row insertions.
- **Type 3: `CurrentCity`, `CurrentProject`** → Track last value in separate column (1-level history only).

### Table Structure – `employee_dimension_scd6`

| Column Name         | Description                                           |
|---------------------|-------------------------------------------------------|
| `EmployeeID`        | Unique identifier                                     |
| `EmployeeName`      | Full name                                             |
| `EmailAddress`      | Type 1: Overwritten, no history                       |
| `Designation`       | Type 2: Tracked by inserting new row                  |
| `Department`        | Type 2: Tracked by inserting new row                  |
| `CurrentCity`       | Type 3: Current city of posting                       |
| `PreviousCity`      | Type 3: Prior city (1-level tracking)                 |
| `CurrentProject`    | Type 3: Project assigned                              |
| `PreviousProject`   | Type 3: Prior project (1-level tracking)              |
| `StartDate`         | When this version became effective                    |
| `EndDate`           | When this version ended (NULL = current row)         |
| `IsActive`          | Boolean flag indicating active version                |
| `LastUpdated`       | When this record was last updated                     |

In [148]:
data = [
    {
        "EmployeeID": 501, "EmployeeName": "Tanuj", "EmailAddress": "tanuj@corp.com",
        "Designation": "Junior Analyst", "Department": "Analytics",
        "CurrentCity": "Hyderabad", "PreviousCity": None,
        "CurrentProject": "Apollo", "PreviousProject": None,
        "StartDate": datetime(2025, 1, 1), "EndDate": pd.NaT,
        "IsActive": 1, "LastUpdated": datetime(2025, 1, 1)
    },
    {
        "EmployeeID": 502, "EmployeeName": "Meenu", "EmailAddress": "meenu@corp.com",
        "Designation": "Data Engineer", "Department": "Engineering",
        "CurrentCity": "Pune", "PreviousCity": None,
        "CurrentProject": "Hermes", "PreviousProject": None,
        "StartDate": datetime(2025, 1, 1), "EndDate": pd.NaT,
        "IsActive": 1, "LastUpdated": datetime(2025, 1, 1)
    },
    {
        "EmployeeID": 503, "EmployeeName": "Adi", "EmailAddress": "adi@corp.com",
        "Designation": "Analyst", "Department": "Finance",
        "CurrentCity": "Mumbai", "PreviousCity": None,
        "CurrentProject": "Zeus", "PreviousProject": None,
        "StartDate": datetime(2025, 1, 1), "EndDate": pd.NaT,
        "IsActive": 1, "LastUpdated": datetime(2025, 1, 1)
    },
    {
        "EmployeeID": 504, "EmployeeName": "Smruthi", "EmailAddress": "smruthi@corp.com",
        "Designation": "Consultant", "Department": "Strategy",
        "CurrentCity": "Bangalore", "PreviousCity": None,
        "CurrentProject": "Orion", "PreviousProject": None,
        "StartDate": datetime(2025, 1, 1), "EndDate": pd.NaT,
        "IsActive": 1, "LastUpdated": datetime(2025, 1, 1)
    },
    {
        "EmployeeID": 505, "EmployeeName": "Chirkut", "EmailAddress": "chirkut@corp.com",
        "Designation": "Executive", "Department": "Operations",
        "CurrentCity": "Chennai", "PreviousCity": None,
        "CurrentProject": "Poseidon", "PreviousProject": None,
        "StartDate": datetime(2025, 1, 1), "EndDate": pd.NaT,
        "IsActive": 1, "LastUpdated": datetime(2025, 1, 1)
    },
]

initial_df=pd.DataFrame(data)
initial_df

Unnamed: 0,EmployeeID,EmployeeName,EmailAddress,Designation,Department,CurrentCity,PreviousCity,CurrentProject,PreviousProject,StartDate,EndDate,IsActive,LastUpdated
0,501,Tanuj,tanuj@corp.com,Junior Analyst,Analytics,Hyderabad,,Apollo,,2025-01-01,NaT,1,2025-01-01
1,502,Meenu,meenu@corp.com,Data Engineer,Engineering,Pune,,Hermes,,2025-01-01,NaT,1,2025-01-01
2,503,Adi,adi@corp.com,Analyst,Finance,Mumbai,,Zeus,,2025-01-01,NaT,1,2025-01-01
3,504,Smruthi,smruthi@corp.com,Consultant,Strategy,Bangalore,,Orion,,2025-01-01,NaT,1,2025-01-01
4,505,Chirkut,chirkut@corp.com,Executive,Operations,Chennai,,Poseidon,,2025-01-01,NaT,1,2025-01-01


In [149]:
initial_df.to_sql('employee_dimensions_scd6',index=False,con=engine,if_exists='replace')

5

### Step 2 – Simulating New Incoming Records

We now simulate a scenario where some employee details have changed:

- **EmployeeID 501 (Tanuj):**
  - `EmailAddress` changed → Type 1
  - `Designation` changed → Type 2
  - `CurrentCity` changed → Type 3

- **EmployeeID 502 (Meenu):**
  - `Department` changed → Type 2
  - `CurrentProject` changed → Type 3

- **EmployeeID 503 (Adi):**
  - No changes → Should be ignored

These changes will be applied based on appropriate SCD logic:
- Type 1 fields are updated **in-place** on active rows.
- Type 2 fields trigger **new row insertion** with old row closed.
- Type 3 fields shift the old value to `Previous*` columns, and update `Current*`.


In [150]:
# Simulated new incoming records with some updates
incoming_updates = [
    {
        "EmployeeID": 501, "EmployeeName": "Tanuj", "EmailAddress": "tanuj_updated@corp.com",
        "Designation": "Senior Analyst",  # Type 2 change
        "Department": "Analytics",        # No change
        "CurrentCity": "Mumbai",          # Type 3 change
        "CurrentProject": "Hermes",       # Type 3 change
        "LastUpdated": datetime(2025, 6, 20)
    },
    {
        "EmployeeID": 502, "EmployeeName": "Meenu", "EmailAddress": "meenu@corp.com",
        "Designation": "Data Engineer",   # No change
        "Department": "Product",          # Type 2 change
        "CurrentCity": "Pune",            # No change
        "CurrentProject": "Zeus",         # Type 3 change
        "LastUpdated": datetime(2025, 6, 20)
    },
    {
        "EmployeeID": 503, "EmployeeName": "Adi", "EmailAddress": "adi@corp.com",
        "Designation": "Analyst",         # No change
        "Department": "Finance",          # No change
        "CurrentCity": "Mumbai",          # No change
        "CurrentProject": "Zeus",         # No change
        "LastUpdated": datetime(2025, 6, 20)
    },
]

incoming_df = pd.DataFrame(incoming_updates)


### Step 3 – Applying Full SCD Logic (Types 1, 2, and 3)

We now apply the appropriate logic for each type of slowly changing dimension:

#### ➤ Type 1 (Overwrite in place – `EmailAddress`)
- If only Type 1 fields changed, update in-place in the current active row.

#### ➤ Type 2 (Versioning – `Designation`, `Department`)
- If any Type 2 field has changed:
  - Mark the current record as inactive (`IsActive = False`, `EndDate = today`)
  - Insert a new row with the updated values, new `StartDate`, and `IsActive = True`

#### ➤ Type 3 (One-level history – `CurrentCity`, `CurrentProject`)
- If any Type 3 field has changed:
  - Set `Previous* = Current*`
  - Update `Current*` with the new value
  - Done only for the new inserted row (not in-place)

We will:
1. Read the existing active data
2. Compare it with the incoming data
3. Apply updates using Pandas row-by-row


In [151]:
existing_df=pd.read_sql(f"select * from employee_dimensions_scd6 where IsActive='{int(True)}'",con=engine)
existing_df

Unnamed: 0,EmployeeID,EmployeeName,EmailAddress,Designation,Department,CurrentCity,PreviousCity,CurrentProject,PreviousProject,StartDate,EndDate,IsActive,LastUpdated
0,501,Tanuj,tanuj@corp.com,Junior Analyst,Analytics,Hyderabad,,Apollo,,2025-01-01,,1,2025-01-01
1,502,Meenu,meenu@corp.com,Data Engineer,Engineering,Pune,,Hermes,,2025-01-01,,1,2025-01-01
2,503,Adi,adi@corp.com,Analyst,Finance,Mumbai,,Zeus,,2025-01-01,,1,2025-01-01
3,504,Smruthi,smruthi@corp.com,Consultant,Strategy,Bangalore,,Orion,,2025-01-01,,1,2025-01-01
4,505,Chirkut,chirkut@corp.com,Executive,Operations,Chennai,,Poseidon,,2025-01-01,,1,2025-01-01


In [152]:
common_df=pd.merge(existing_df,incoming_df,on='EmployeeID',how='inner',suffixes=('_old','_new'))
common_df[[
    'EmployeeID','EmailAddress_old','EmailAddress_new',
    'Designation_old','Designation_new','Department_old','Department_new',
    'CurrentCity_old','CurrentCity_new','CurrentProject_old','CurrentProject_new'
]]

Unnamed: 0,EmployeeID,EmailAddress_old,EmailAddress_new,Designation_old,Designation_new,Department_old,Department_new,CurrentCity_old,CurrentCity_new,CurrentProject_old,CurrentProject_new
0,501,tanuj@corp.com,tanuj_updated@corp.com,Junior Analyst,Senior Analyst,Analytics,Analytics,Hyderabad,Mumbai,Apollo,Hermes
1,502,meenu@corp.com,meenu@corp.com,Data Engineer,Data Engineer,Engineering,Product,Pune,Pune,Hermes,Zeus
2,503,adi@corp.com,adi@corp.com,Analyst,Analyst,Finance,Finance,Mumbai,Mumbai,Zeus,Zeus


### Step 4 – Refactored SCD Type 6 Logic (Separated by Type)

We split the logic into clearly separated flows for each SCD type:

#### ✅ Type 1: `EmailAddress`
- In-place update of the current active row (no row insert)

#### ✅ Type 2: `Designation`, `Department`
- Mark old row inactive
- Insert new row with updated values
- If any Type 3 fields also changed, include updated city/project + their previous values in the new row

#### ✅ Type 3: `CurrentCity`, `CurrentProject`
- In-place update of the current active row
- Move current value to `Previous*`
- Update `Current*` with new value
- No row insertion


In [153]:
print(existing_df)
print(incoming_df)
print(common_df)


   EmployeeID EmployeeName      EmailAddress     Designation   Department  \
0         501        Tanuj    tanuj@corp.com  Junior Analyst    Analytics   
1         502        Meenu    meenu@corp.com   Data Engineer  Engineering   
2         503          Adi      adi@corp.com         Analyst      Finance   
3         504      Smruthi  smruthi@corp.com      Consultant     Strategy   
4         505      Chirkut  chirkut@corp.com       Executive   Operations   

  CurrentCity PreviousCity CurrentProject PreviousProject  StartDate EndDate  \
0   Hyderabad         None         Apollo            None 2025-01-01    None   
1        Pune         None         Hermes            None 2025-01-01    None   
2      Mumbai         None           Zeus            None 2025-01-01    None   
3   Bangalore         None          Orion            None 2025-01-01    None   
4     Chennai         None       Poseidon            None 2025-01-01    None   

   IsActive LastUpdated  
0         1  2025-01-01  
1   

In [154]:
from sqlalchemy import text

today=pd.to_datetime('today').normalize()
rows_to_insert=[]

with engine.begin() as con:
    for _,row in common_df.iterrows():
        emp_id=row['EmployeeID']
        
        type1_change=row['EmailAddress_old']!=row['EmailAddress_new']
        type2_change=(
            row['Designation_old']!=row['Designation_new'] or
            row['Department_old']!=row['Department_new']
        )
        type3_change=(
            row['CurrentCity_old']!=row['CurrentCity_new'] or
            row['CurrentProject_old']!=row['CurrentProject_new']
        )

        if type1_change==True and type2_change==False and type3_change==False:
            con.execute(text(f"""
                update employee_dimensions_scd6
                set EmailAddress='{row["EmailAddress_new"]}',
                LastUpdated='{today}'
                where EmployeeID={emp_id} and IsActive={int(True)}
            """))

        elif type2_change:
            con.execute(text(f"""
                update employee_dimensions_scd6
                set IsActive=0,
                LastUpdated='{today}',EndDate='{today}'
                where EmployeeID={emp_id} and IsActive={int(True)}
            """))

            new_row={
                "EmployeeID":emp_id,
                "EmployeeName":row["EmployeeName_new"],
                "EmailAddress":row["EmailAddress_new"],

                "Designation":row["Designation_new"],
                "Department":row["Department_new"],

                "CurrentCity":row["CurrentCity_new"],
                "PreviousCity":row["CurrentCity_old"] if type3_change else row["PreviousCity"],
                "CurrentProject":row["CurrentProject_new"],
                "PreviousProject":row["CurrentProject_old"] if type3_change else row["PreviousProject"],

                "StartDate":today,
                "EndDate":None,
                "IsActive":1,
                "LastUpdated":today
            }
            rows_to_insert.append(new_row)

        elif (type3_change or type1_change) and not type2_change:
            set_clauses=[]
            if type1_change:
                set_clauses.append(f"EmailAddress='{row['EmailAddress_new']}'")

            if(row['CurrentCity_new']!=row['CurrentCity_old']):
                set_clauses.append(f"CurrentCity='{row['CurrentCity_new']}'")
                set_clauses.append(f"PreviousCity='{row['CurrentCity_old']}'")
            
            if(row['CurrentProject_new']!=row['CurrentProject_old']):
                set_clauses.append(f"CurrentProject='{row['CurrentProject_new']}'")
                set_clauses.append(f"PreviousProject='{row['CurrentProject_old']}'")

            con.execute(text(f"""
                update employee_dimensions_scd6
                set {','.join(set_clauses)}
                where EmployeeID={emp_id} and IsActive={int(True)}
            """))

if rows_to_insert:
    insert_df=pd.DataFrame(rows_to_insert)
    insert_df.to_sql('employee_dimensions_scd6',index=False,if_exists='append',con=engine)
        

In [156]:
df=pd.read_sql_table('employee_dimensions_scd6',con=engine)

In [157]:
df

Unnamed: 0,EmployeeID,EmployeeName,EmailAddress,Designation,Department,CurrentCity,PreviousCity,CurrentProject,PreviousProject,StartDate,EndDate,IsActive,LastUpdated
0,501,Tanuj,tanuj@corp.com,Junior Analyst,Analytics,Hyderabad,,Apollo,,2025-01-01,2025-06-25,0,2025-06-25
1,502,Meenu,meenu@corp.com,Data Engineer,Engineering,Pune,,Hermes,,2025-01-01,2025-06-25,0,2025-06-25
2,503,Adi,adi@corp.com,Analyst,Finance,Mumbai,,Zeus,,2025-01-01,NaT,1,2025-01-01
3,504,Smruthi,smruthi@corp.com,Consultant,Strategy,Bangalore,,Orion,,2025-01-01,NaT,1,2025-01-01
4,505,Chirkut,chirkut@corp.com,Executive,Operations,Chennai,,Poseidon,,2025-01-01,NaT,1,2025-01-01
5,501,Tanuj,tanuj_updated@corp.com,Senior Analyst,Analytics,Mumbai,Hyderabad,Hermes,Apollo,2025-06-25,NaT,1,2025-06-25
6,502,Meenu,meenu@corp.com,Data Engineer,Product,Pune,Pune,Zeus,Hermes,2025-06-25,NaT,1,2025-06-25
