In [0]:
from pyspark.sql.functions import * 

from pyspark.sql.types import * 

from delta.tables import * 

In [0]:
%sql

CREATE or replace TABLE employees (
    EmployeeID INT,
    FirstName STRING,
    LastName STRING,
    Department STRING,
    Title STRING,
    Salary DECIMAL(10,2),
    EffectiveDate DATE,
    EndDate DATE,
    IsCurrent BOOLEAN
)using delta 
 location '/Filestore/tables/scd_2'; 

 truncate table employees;

In [0]:
%sql
INSERT INTO employees (EmployeeID, FirstName, LastName, Department, Title, Salary, EffectiveDate, EndDate, IsCurrent)
VALUES
(1001, 'John', 'Doe', 'Engineering', 'Senior Engineer', 90000.00, '2023-01-01', NULL, True),
(1002, 'Jane', 'Smith', 'Marketing', 'Manager', 75000.00, '2023-01-01', NULL, True),
(1003, 'Emily', 'Jones', 'HR', 'Analyst', 65000.00, '2023-01-01', NULL, True),
(1004, 'Michael', 'Brown', 'IT', 'Developer', 72000.00, '2023-01-01', NULL, True),
(1005, 'Rachel', 'Green', 'Finance', 'Accountant', 58000.00, '2023-01-01', NULL, True),
(1006, 'Ross', 'Geller', 'Science', 'Researcher', 76000.00, '2023-01-01', NULL, True),
(1007, 'Monica', 'Geller', 'Culinary', 'Chef', 62000.00, '2023-01-01', NULL, True),
(1008, 'Joey', 'Tribbiani', 'Acting', 'Actor', 55000.00, '2023-01-01', NULL, True),
(1009, 'Chandler', 'Bing', 'Advertising', 'Copywriter', 64000.00, '2023-01-01', NULL, True),
(1010, 'Phoebe', 'Buffay', 'Music', 'Singer', 50000.00, '2023-01-01', NULL, True);

num_affected_rows,num_inserted_rows
10,10


In [0]:
%sql
select * from employees;

EmployeeID,FirstName,LastName,Department,Title,Salary,EffectiveDate,EndDate,IsCurrent
1001,John,Doe,Engineering,Senior Engineer,90000.0,2023-01-01,,True
1002,Jane,Smith,Marketing,Manager,75000.0,2023-01-01,,True
1003,Emily,Jones,HR,Analyst,65000.0,2023-01-01,,True
1004,Michael,Brown,IT,Developer,72000.0,2023-01-01,,True
1005,Rachel,Green,Finance,Accountant,58000.0,2023-01-01,,True
1006,Ross,Geller,Science,Researcher,76000.0,2023-01-01,,True
1007,Monica,Geller,Culinary,Chef,62000.0,2023-01-01,,True
1008,Joey,Tribbiani,Acting,Actor,55000.0,2023-01-01,,True
1009,Chandler,Bing,Advertising,Copywriter,64000.0,2023-01-01,,True
1010,Phoebe,Buffay,Music,Singer,50000.0,2023-01-01,,True


In [0]:
schema_emp = StructType([
    StructField("EmployeeID", IntegerType(), True),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Department", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Salary", DoubleType(), True)
])

# Data as a list of tuples
data = [
    (1001, 'John', 'Doe', 'Engineering', 'Lead Engineer', 95000.00),
    (1002, 'Jane', 'Smith', 'Sales', 'Sales Manager', 78000.00),
    (1011, 'Alex', 'Taylor', 'Marketing', 'Marketing Analyst', 58000.00)
]

# Create DataFrame
emp_df = spark.createDataFrame(data, schema_emp)
emp_df.show() 

emp_df.createOrReplaceTempView('src_data')

+----------+---------+--------+-----------+-----------------+-------+
|EmployeeID|FirstName|LastName| Department|            Title| Salary|
+----------+---------+--------+-----------+-----------------+-------+
|      1001|     John|     Doe|Engineering|    Lead Engineer|95000.0|
|      1002|     Jane|   Smith|      Sales|    Sales Manager|78000.0|
|      1011|     Alex|  Taylor|  Marketing|Marketing Analyst|58000.0|
+----------+---------+--------+-----------+-----------------+-------+



In [0]:
#left joining on target side 

dt_instance=DeltaTable.forName(spark,"employees") 

target_df=dt_instance.toDF()
target_df.show()

+----------+---------+---------+-----------+---------------+--------+-------------+-------+---------+
|EmployeeID|FirstName| LastName| Department|          Title|  Salary|EffectiveDate|EndDate|IsCurrent|
+----------+---------+---------+-----------+---------------+--------+-------------+-------+---------+
|      1001|     John|      Doe|Engineering|Senior Engineer|90000.00|   2023-01-01|   null|     true|
|      1002|     Jane|    Smith|  Marketing|        Manager|75000.00|   2023-01-01|   null|     true|
|      1003|    Emily|    Jones|         HR|        Analyst|65000.00|   2023-01-01|   null|     true|
|      1004|  Michael|    Brown|         IT|      Developer|72000.00|   2023-01-01|   null|     true|
|      1005|   Rachel|    Green|    Finance|     Accountant|58000.00|   2023-01-01|   null|     true|
|      1006|     Ross|   Geller|    Science|     Researcher|76000.00|   2023-01-01|   null|     true|
|      1007|   Monica|   Geller|   Culinary|           Chef|62000.00|   2023-01-01

In [0]:
joined_df = target_df.join(
    emp_df.alias('s'),
    target_df.EmployeeID == emp_df.EmployeeID,
    how="left"
)
joined_df.show()

+----------+---------+---------+-----------+---------------+--------+-------------+-------+---------+----------+---------+--------+-----------+-------------+-------+
|EmployeeID|FirstName| LastName| Department|          Title|  Salary|EffectiveDate|EndDate|IsCurrent|EmployeeID|FirstName|LastName| Department|        Title| Salary|
+----------+---------+---------+-----------+---------------+--------+-------------+-------+---------+----------+---------+--------+-----------+-------------+-------+
|      1001|     John|      Doe|Engineering|Senior Engineer|90000.00|   2023-01-01|   null|     true|      1001|     John|     Doe|Engineering|Lead Engineer|95000.0|
|      1002|     Jane|    Smith|  Marketing|        Manager|75000.00|   2023-01-01|   null|     true|      1002|     Jane|   Smith|      Sales|Sales Manager|78000.0|
|      1003|    Emily|    Jones|         HR|        Analyst|65000.00|   2023-01-01|   null|     true|      null|     null|    null|       null|         null|   null|
|   

In [0]:
%sql

merge into employees as t 
using src_data as d 
on t.EmployeeID=d.EmployeeID 
when matched and (t.Department!=d.Department or t.Title!=d.Title or t.Salary!=d.Salary) and  t.IsCurrent="true"
             then 
                   update set 
                                t.EndDate=current_date(),
                                t.IsCurrent=false
when not matched 
                 then 
                      insert (EmployeeID,FirstName,LastName,Department,Title,Salary,EffectiveDate,EndDate,IsCurrent)  
                      values(d.EmployeeID,d.FirstName,d.LastName,d.Department,d.Title,d.Salary,current_date(),"","true");

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,2,0,1


In [0]:
%sql
select * from employees 
order by EmployeeID;

EmployeeID,FirstName,LastName,Department,Title,Salary,EffectiveDate,EndDate,IsCurrent
1001,John,Doe,Engineering,Senior Engineer,90000.0,2023-01-01,2024-06-10,False
1002,Jane,Smith,Marketing,Manager,75000.0,2023-01-01,2024-06-10,False
1003,Emily,Jones,HR,Analyst,65000.0,2023-01-01,,True
1004,Michael,Brown,IT,Developer,72000.0,2023-01-01,,True
1005,Rachel,Green,Finance,Accountant,58000.0,2023-01-01,,True
1006,Ross,Geller,Science,Researcher,76000.0,2023-01-01,,True
1007,Monica,Geller,Culinary,Chef,62000.0,2023-01-01,,True
1008,Joey,Tribbiani,Acting,Actor,55000.0,2023-01-01,,True
1009,Chandler,Bing,Advertising,Copywriter,64000.0,2023-01-01,,True
1010,Phoebe,Buffay,Music,Singer,50000.0,2023-01-01,,True


In [0]:
target_df.show()

+----------+---------+---------+-----------+-----------------+--------+-------------+----------+---------+
|EmployeeID|FirstName| LastName| Department|            Title|  Salary|EffectiveDate|   EndDate|IsCurrent|
+----------+---------+---------+-----------+-----------------+--------+-------------+----------+---------+
|      1003|    Emily|    Jones|         HR|          Analyst|65000.00|   2023-01-01|      null|     true|
|      1004|  Michael|    Brown|         IT|        Developer|72000.00|   2023-01-01|      null|     true|
|      1005|   Rachel|    Green|    Finance|       Accountant|58000.00|   2023-01-01|      null|     true|
|      1006|     Ross|   Geller|    Science|       Researcher|76000.00|   2023-01-01|      null|     true|
|      1007|   Monica|   Geller|   Culinary|             Chef|62000.00|   2023-01-01|      null|     true|
|      1008|     Joey|Tribbiani|     Acting|            Actor|55000.00|   2023-01-01|      null|     true|
|      1009| Chandler|     Bing|Adver

In [0]:
emp_false_list_=target_df.select('EmployeeID').filter(col('IsCurrent')=="false").distinct().collect()
emp_false_list_ = [row['EmployeeID'] for row in emp_false_list_] 
print(employee_ids_list)

[1001, 1002]


In [0]:
#filtering the src table which doesnt contain the values as of employee_ids_list 

filtered_src_table=emp_df.filter(col('EmployeeID').isin(employee_ids_list))
filtered_src_table.show() 



+----------+---------+--------+-----------+-------------+-------+
|EmployeeID|FirstName|LastName| Department|        Title| Salary|
+----------+---------+--------+-----------+-------------+-------+
|      1001|     John|     Doe|Engineering|Lead Engineer|95000.0|
|      1002|     Jane|   Smith|      Sales|Sales Manager|78000.0|
+----------+---------+--------+-----------+-------------+-------+



In [0]:
filtered_src_table.createOrReplaceTempView('filtered_src_table')

In [0]:

%sql

insert into employees 
select f.EmployeeID,f.FirstName,f.LastName,f.Department,f.Title,f.Salary,current_date(),null,"true"
from filtered_src_table as f

num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql 

select * from employees
order by EmployeeID asc,EffectiveDate asc

EmployeeID,FirstName,LastName,Department,Title,Salary,EffectiveDate,EndDate,IsCurrent
1001,John,Doe,Engineering,Senior Engineer,90000.0,2023-01-01,2024-06-10,False
1001,John,Doe,Engineering,Lead Engineer,95000.0,2024-06-10,,True
1002,Jane,Smith,Marketing,Manager,75000.0,2023-01-01,2024-06-10,False
1002,Jane,Smith,Sales,Sales Manager,78000.0,2024-06-10,,True
1003,Emily,Jones,HR,Analyst,65000.0,2023-01-01,,True
1004,Michael,Brown,IT,Developer,72000.0,2023-01-01,,True
1005,Rachel,Green,Finance,Accountant,58000.0,2023-01-01,,True
1006,Ross,Geller,Science,Researcher,76000.0,2023-01-01,,True
1007,Monica,Geller,Culinary,Chef,62000.0,2023-01-01,,True
1008,Joey,Tribbiani,Acting,Actor,55000.0,2023-01-01,,True
