#### SCD Type 2

#### Create a Dbutils widget to capture configuration

In [0]:
dbutils.widgets.removeAll()

In [0]:
####Create dbutil widgets to connect to storage account

#### Setup Configuration

In [0]:
#### Setup Configuration to Connect to Storage Account

#### Prepare the 3 tables - Employee, Employee_Address & Employee_Information

In [0]:
#This cell is creating a employee (driving), employee_address (referential table) and employee_information(referential table) delta tables
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat, hash
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, BooleanType
import datetime

employee_schema = StructType([ \
    StructField("employee_id",IntegerType(),True), \
    StructField("first_name",StringType(),True), \
    StructField("last_name",StringType(),True), \
    StructField("email", StringType(), True), \
    StructField("create_update_date", DateType(), True), \
  ])

employee_addr_schema = StructType([ \
    StructField("employee_id",IntegerType(),True), \
    StructField("city",StringType(),True), \
    StructField("region",StringType(),True), \
    StructField("street_address", StringType(), True), \
    StructField("country", StringType(), True), \
    StructField("create_update_date", DateType(), True) \
  ])

employee_info_schema = StructType([ \
    StructField("employee_id",IntegerType(),True), \
    StructField("salary",IntegerType(),True), \
    StructField("is_fte",BooleanType(),True), \
    StructField("is_remote", BooleanType(), True), \
    StructField("employment_date", DateType(), True), \
    StructField("create_update_date", DateType(), True) \
  ])

employee_data = [
    (100, "Steven", "King", "steven.king@contoso.com", datetime.date(2023,1,27)),
    (101, "Neena", "Kochhar", "neena.kocchar@contoso.com", datetime.date(2023,2,9)),
    (102, "Lex", "De Haan", "lex.de-haan@contoso.com", datetime.date(2023,4,7)),
    (103, "Alexander", "Hunold", "alexander.hunold@contoso.com", datetime.date(2023,2,24)),
    (104, "Bruce", "Ernst", "bruce.ernst@contoso.com", datetime.date(2023,4,24))
]

employee_addr_data = [
    (100, "New York", "NY", "123 Main St", "United States", datetime.date(2023,1,27)),
    (101, "London", "England", "456 Park Ave", "United Kingdom", datetime.date(2023,2,9)),
    (102, "Paris", "ële-de-France", "789 Rue de la Paix", "France", datetime.date(2023,4,7)),
    (103, "Tokyo", "Tokyo", "1-2-3 Shibuya", "Japan", datetime.date(2023,2,24)),
    (104, "Sydney", "NSW", "456 George St", "Australia", datetime.date(2023,4,24))
]

employee_info_data = [
    (100, 5000, True, False, datetime.date(2020,1,15), datetime.date(2023,1,27)),
    (101, 6000, True, True, datetime.date(2019,5,10), datetime.date(2023,2,9)),
    (102, 5500, True, False, datetime.date(2021,3,22), datetime.date(2023,4,7)),
    (103, 5200, True, True, datetime.date(2018,11,30), datetime.date(2023,2,24)),
    (104, 5800, True, False, datetime.date(2017,9,12), datetime.date(2023,4,24))
]

df_employee = spark.createDataFrame(data=employee_data, schema=employee_schema)
df_employee_addr = spark.createDataFrame(data=employee_addr_data, schema=employee_addr_schema)
df_employee_info = spark.createDataFrame(data=employee_info_data, schema=employee_info_schema)

df_employee.write.format("delta").mode("overwrite").saveAsTable("employee")
df_employee_addr.write.format("delta").mode("overwrite").saveAsTable("employee_address")
df_employee_info.write.format("delta").mode("overwrite").saveAsTable("employee_information")

In [0]:
%sql
select * from employee_information;

employee_id,salary,is_fte,is_remote,employment_date,create_update_date
100,5000,True,False,2020-01-15,2023-01-27
101,6000,True,True,2019-05-10,2023-02-09
103,5200,True,True,2018-11-30,2023-02-24
104,5800,True,False,2017-09-12,2023-04-24
102,5500,True,False,2021-03-22,2023-04-07


In [0]:
# This cell is creating a employee (driving), employee_address (referential table) and employee_information(referential table) delta table from a csv file

from pyspark.sql.functions import lit
from pyspark.sql.functions import concat, hash
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StringType

container_name = "source"

##employee table
df_employee_csv = spark.read.option("header",True).option("delimiter", ";").csv(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/employees_historical.csv")
df_employee_csv = df_employee_csv.select("EMPLOYEE_ID","FIRST_NAME","LAST_NAME","EMAIL", "DT")

df_employee_csv.write.format("delta").mode("overwrite").saveAsTable("employee")

##address table
df_address_csv = spark.read.option("header",True).option("delimiter", ";").csv(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/employee_address_historical.csv")
df_address_csv = df_address_csv.select("EMPLOYEE_ID","CITY","REGION","STREET_ADDRESS", "COUNTRY", "DT")

df_address_csv.write.format("delta").mode("overwrite").saveAsTable("employee_address")

##info table
df_info_csv = spark.read.option("header",True).option("delimiter", ";").csv(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/employee_information_historical.csv")
df_info_csv = df_info_csv.select("EMPLOYEE_ID","SALARY","IS_FTE","IS_REMOTE", "EMPLOYMENT_DATE", "DT")


df_info_csv.write.format("delta").mode("overwrite").saveAsTable("employee_information")

In [0]:
%sql
ALTER TABLE employee SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE employee_address SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE employee_information SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

#### Create a Target Employee entity gathering info from employee, employee_address and employee_information tables

- Driving table : Employee
- Referential table 1: Employee address
- Referential table 2 : Employee information

In [0]:
employee_df = spark.sql("""
                        select 
                        md5(concat(employee.EMPLOYEE_ID,employee.FIRST_NAME,employee.LAST_NAME,employee.EMAIL, employee.create_update_date)) as id,
                        employee.employee_id,
                        employee.first_name,
                        employee.last_name,
                        employee.email,
                        address.city,
                        address.region,
                        address.street_address,
                        address.country,
                        info.salary,
                        info.is_fte,
                        info.is_remote,
                        info.employment_date,
                        employee.create_update_date as employee_create_update_date,
                        address.create_update_date as address_create_update_date,
                        info.create_update_date as info_create_update_date,
                        int(1) as address_commit_version,
                        int(1) as info_commit_version,
                        int(1) as employee_commit_version,

                        ---scd
                        int(1) as valid_flag,
                        greatest(address.create_update_date, info.create_update_date, employee.create_update_date) as valid_from,  ---take the greatest of all the dates. This gives a viewpoint that this record was latest updated from the greatest date
                        timestamp("9999-12-31") as valid_to
                        from employee employee
                        inner join employee_address address on employee.employee_id=address.employee_id
                        inner join employee_information info on employee.employee_id=info.employee_id
                        """)

In [0]:
# This cell prepares a target_employee table from the above tables.

employee_df.write.format("delta").mode("overwrite").saveAsTable("target_employee")
employee_df.write.format("delta").mode("overwrite").saveAsTable("target_employee_historical")

#### Query target table - Employee

In [0]:
%sql
select * from default.target_employee limit 10;

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to
d4d0569aa28e28d83a3d9a8888a30a27,101,Neena,Kochhar,neena.kocchar@contoso.com,London,England,456 Park Ave,United Kingdom,6000,True,True,2019-05-10,2023-02-09,2023-02-09,2023-02-09,1,1,1,1,2023-02-09,9999-12-31T00:00:00.000+0000
be613e2a8166f377f65967bd9073188d,102,Lex,De Haan,lex.de-haan@contoso.com,Paris,ële-de-France,789 Rue de la Paix,France,5500,True,False,2021-03-22,2023-04-07,2023-04-07,2023-04-07,1,1,1,1,2023-04-07,9999-12-31T00:00:00.000+0000
5a60257eabea5111d4e8aa0cf56fb109,103,Alexander,Hunold,alexander.hunold@contoso.com,Tokyo,Tokyo,1-2-3 Shibuya,Japan,5200,True,True,2018-11-30,2023-02-24,2023-02-24,2023-02-24,1,1,1,1,2023-02-24,9999-12-31T00:00:00.000+0000
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,123 Main St,United States,5000,True,False,2020-01-15,2023-01-27,2023-01-27,2023-01-27,1,1,1,1,2023-01-27,9999-12-31T00:00:00.000+0000
3b2c89840dc66c48dedbbb8ecac15ada,104,Bruce,Ernst,bruce.ernst@contoso.com,Sydney,NSW,456 George St,Australia,5800,True,False,2017-09-12,2023-04-24,2023-04-24,2023-04-24,1,1,1,1,2023-04-24,9999-12-31T00:00:00.000+0000


In [0]:
%sql


-- original create_update_date of records
-- 100,(2023,1,27)), (101,(2023,2,9)),(102, (2023,4,7)),103, 2023,2,24)), (104, (2023,4,24))

-- This cell applies some changes to the employee table.
--ids - [300,100]

insert into employee values ('300','John','Doe','john.doe', '2023-04-04');
update employee set FIRST_NAME = 'Steven2', create_update_date = '2023-07-13' where EMPLOYEE_ID = '100';
update employee set FIRST_NAME = 'Steven3' , create_update_date = '2023-07-17' where EMPLOYEE_ID = '100';


-- This cell applies some changes to the employee_address table.
--ids - [102,104,100]

update employee_address set CITY = 'Lille', create_update_date = '2023-07-15' where EMPLOYEE_ID = '102';
update employee_address set CITY = 'NewCastle', create_update_date = '2023-07-14' where EMPLOYEE_ID = '104';
update employee_address set STREET_ADDRESS = '456 Baker Street', create_update_date='2023-07-13' where EMPLOYEE_ID = '100';

-- This cell applies some changes to the employee_information table.
--ids - [108,104,102]

update employee_information set SALARY = 7000,create_update_date = '2023-07-10' where EMPLOYEE_ID = '103';
update employee_information set SALARY = 8000, create_update_date= '2023-07-15' where EMPLOYEE_ID = '102';
update employee_information set IS_FTE= FALSE, create_update_date = '2023-07-14' where EMPLOYEE_ID = '104';


select * from employee order by EMPLOYEE_ID;
select * from employee_address order by EMPLOYEE_ID;
select * from employee_information order by EMPLOYEE_ID;


employee_id,salary,is_fte,is_remote,employment_date,create_update_date
100,5000,True,False,2020-01-15,2023-01-27
101,6000,True,True,2019-05-10,2023-02-09
102,8000,True,False,2021-03-22,2023-07-15
103,7000,True,True,2018-11-30,2023-07-10
104,5800,False,False,2017-09-12,2023-07-14


#### Get Referential Table Changes - employee_address and employee_information

In [0]:
# get all the records from employee_address which has commit version bigger than max_commit_version in target_employee
# get max commit_version from target_employee for address and information (referential table)

import pyspark.sql.functions as F

max_commit_employee_version = spark.read.format("delta").table("target_employee").agg({"employee_commit_version": "max"}).collect()[0][0]
max_commit_address_version = spark.read.format("delta").table("target_employee").agg({"address_commit_version": "max"}).collect()[0][0]

max_commit_information_version = spark.read.format("delta").table("target_employee").agg({"info_commit_version": "max"}).collect()[0][0]


changes_address_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", (max_commit_address_version+1)).table('employee_address').\
                                                                                            where(~F.col('_change_type').eqNullSafe('update_preimage'))

changes_information_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", (max_commit_information_version+1)).table('employee_information').\
                                                                                            where(~F.col('_change_type').eqNullSafe('update_preimage'))

##driving
changes_employee_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", (max_commit_employee_version+1)).table('employee').\
                                                                                            where(~F.col('_change_type').eqNullSafe('update_preimage'))                                                                                            

changes_address_df.createOrReplaceTempView("changes_address")
changes_information_df.createOrReplaceTempView("changes_information")
changes_employee_df.createOrReplaceTempView("changes_employee")

display(changes_address_df)
display(changes_information_df)      
display(changes_employee_df)   


#ids should be
#address - --ids - [102,104,100]
#info --ids - [102,104,108]
#employee - --ids - [300,100]

employee_id,city,region,street_address,country,create_update_date,_change_type,_commit_version,_commit_timestamp
102,Lille,ële-de-France,789 Rue de la Paix,France,2023-07-15,update_postimage,2,2023-08-11T12:57:08.000+0000
100,New York,NY,456 Baker Street,United States,2023-07-13,update_postimage,4,2023-08-11T12:57:12.000+0000
104,NewCastle,NSW,456 George St,Australia,2023-07-14,update_postimage,3,2023-08-11T12:57:10.000+0000


employee_id,salary,is_fte,is_remote,employment_date,create_update_date,_change_type,_commit_version,_commit_timestamp
104,5800,False,False,2017-09-12,2023-07-14,update_postimage,4,2023-08-11T12:57:18.000+0000
103,7000,True,True,2018-11-30,2023-07-10,update_postimage,2,2023-08-11T12:57:14.000+0000
102,8000,True,False,2021-03-22,2023-07-15,update_postimage,3,2023-08-11T12:57:16.000+0000


employee_id,first_name,last_name,email,create_update_date,_change_type,_commit_version,_commit_timestamp
100,Steven3,King,steven.king@contoso.com,2023-07-17,update_postimage,4,2023-08-11T12:57:05.000+0000
100,Steven2,King,steven.king@contoso.com,2023-07-13,update_postimage,3,2023-08-11T12:57:03.000+0000
300,John,Doe,john.doe,2023-04-04,insert,2,2023-08-11T12:57:01.000+0000


#### Create Referential Changes df - read entire main table -employee and only changes for address and information

##### To get referential table changes - we need to filter out if change has happened for a certain id. 
We can use the below to filter out :-
- coalesce(address._commit_version, info._commit_version) as changed_commit

In [0]:
from pyspark.sql.functions import get_json_object, udf, lit, col

referential_changes_employee_df = spark.sql("""
                        select 
                        md5(concat(employee.employee_id,employee.first_name,employee.last_name,employee.email,employee.employee_create_update_date)) as id,
                        employee.employee_id,
                        employee.first_name,
                        employee.last_name,
                        employee.email,
                        address.city,
                        address.region,
                        address.street_address,
                        address.country,
                        info.salary,
                        info.is_fte,
                        info.is_remote,
                        info.employment_date,
                        employee.employee_create_update_date as employee_create_update_date,
                        address.create_update_date as address_create_update_date,
                        info.create_update_date as info_create_update_date,
                        address._commit_version as address_commit_version,
                        info._commit_version as info_commit_version,

                        --we dont consider changes from driving table
                        String(null) as employee_commit_version,

                        --scd columns
                        int(1) as valid_flag,  --set as 1 as latest update
                        case when address._commit_version is not null then timestamp(address.create_update_date) else timestamp(info.create_update_date) END as valid_from, 
                        --- if it's a change from address table, then use address.DT 
                        --- as this update is valid from when that address's record was updated
                        ---otherwise info table

                        timestamp("9999-12-31") as valid_to, --set as latest record
                        coalesce(address._commit_version, info._commit_version) as changed_commit ---used for filtering records if there is an update from either address or info table

                        --scd columns 
                        from `default`.target_employee employee  ---read the entire employee table to get all employee ids

                        ---we do left join instead of inner join to get all changes.
                        ---there can be ids that changed for info table but didnt change for address table

                        left join changes_address address on employee.employee_id=address.employee_id
                        left join changes_information info on employee.employee_id=info.employee_id
                        where employee.valid_flag=1 --- get all employee ids where valid_flag=1
                        """)

referential_changes_employee_df.createOrReplaceTempView("referential_changes_employee")
display(referential_changes_employee_df.filter(col("changed_commit").isNotNull()))


##ids - [100,102,104,108]

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,changed_commit
be613e2a8166f377f65967bd9073188d,102,Lex,De Haan,lex.de-haan@contoso.com,Lille,ële-de-France,789 Rue de la Paix,France,8000.0,True,False,2021-03-22,2023-04-07,2023-07-15,2023-07-15,2.0,3.0,,1,2023-07-15T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2
5a60257eabea5111d4e8aa0cf56fb109,103,Alexander,Hunold,alexander.hunold@contoso.com,,,,,7000.0,True,True,2018-11-30,2023-02-24,,2023-07-10,,2.0,,1,2023-07-10T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-01-27,2023-07-13,,4.0,,,1,2023-07-13T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4
3b2c89840dc66c48dedbbb8ecac15ada,104,Bruce,Ernst,bruce.ernst@contoso.com,NewCastle,NSW,456 George St,Australia,5800.0,False,False,2017-09-12,2023-04-24,2023-07-14,2023-07-14,3.0,4.0,,1,2023-07-14T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,3


#### Create Driving table changes df - read only changes from driving table -employee

In [0]:
from pyspark.sql.functions import get_json_object, udf, lit, col

driving_changes_employee_df = spark.sql("""
                        select 
                        md5(concat(employee.employee_id,employee.first_name,employee.last_name,employee.email,employee.create_update_date)) as id,
                        employee.employee_id,
                        employee.first_name,
                        employee.last_name,
                        employee.email,
                        address.city,
                        address.region,
                        address.street_address,
                        address.country,

                        info.salary,
                        info.is_fte,
                        info.is_remote,
                        info.employment_date,

                        employee.create_update_date as employee_create_update_date,
                        address.create_update_date as address_create_update_date,
                        info.create_update_date as info_create_update_date,
                        address._commit_version as address_commit_version,
                        info._commit_version as info_commit_version,
                        employee._commit_version as employee_commit_version,

                        ---scd columns
                        int(1) as valid_flag,
                        timestamp(employee.create_update_date) as valid_from,
                        timestamp("9999-12-31") as valid_to,
                        employee._commit_version as changed_commit

                        ---scd columns
                        from changes_employee employee  ---read the entire employee table to get employee ids
                        left join changes_address address on employee.employee_id=address.employee_id
                        left join changes_information info on employee.employee_id=info.employee_id
                        """)

driving_changes_employee_df.createOrReplaceTempView("driving_changes_employee")
display(driving_changes_employee_df.filter(col("changed_commit").isNotNull()))

##ids - [100,300]

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,changed_commit
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-17,2023-07-13,,4.0,,4,1,2023-07-17T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4
5e64f88f256b598f6c8a801b8d560285,100,Steven2,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-13,2023-07-13,,4.0,,3,1,2023-07-13T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,3
523d25c653e123eafe592d1dc8792ca1,300,John,Doe,john.doe,,,,,,,,,2023-04-04,,,,,2,1,2023-04-04T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2


#### union referential and driving table changes to get all the changes together

In [0]:
%sql
create or replace temp view changes_employee_target as
select * from driving_changes_employee where changed_commit is not null 
union
select * from referential_changes_employee where changed_commit is not null 

In [0]:
%sql
select * from changes_employee_target where employee_id = '100';

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,changed_commit
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-17,2023-07-13,,4,,4.0,1,2023-07-17T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4
5e64f88f256b598f6c8a801b8d560285,100,Steven2,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-13,2023-07-13,,4,,3.0,1,2023-07-13T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,3
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-01-27,2023-07-13,,4,,,1,2023-07-13T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4


#### We will demonstrate two types of updates 
- Capturing only latest updates and not retaining historical chain of updates for a id
- Capturing the historical chain of updates for in detail analysis

#### Method 1: Capture only latest updates: If multiple updates are coming from sources for each record, only take latest
- Multiple updates on the same row
- The problem is we need to have one unique id in the CDF, so obviously, we can use group by, but we need to pick up the latest changes as well so we do not mess up the order in which the changes were applied. i.e. we want to preserve the same update order from source to target

In [0]:
employee_all_updates = spark.sql(""" select * from (SELECT *, row_number() over (partition by employee_id order by changed_commit desc) as rank
          FROM changes_employee_target) order by employee_id, changed_commit desc""")

##Temp View for all updates
employee_all_updates.createOrReplaceTempView("employee_all_updates")
employee_latest_updates=spark.sql("""select * from employee_all_updates where rank=1""")
employee_old_updates=spark.sql("""select * from employee_all_updates where rank>1""")
##latest updates temp view
employee_latest_updates.createOrReplaceTempView("employee_latest_updates")

In [0]:
%sql
select * from employee_latest_updates;

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,changed_commit,rank
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-17,2023-07-13,,4.0,,4.0,1,2023-07-17T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4,1
be613e2a8166f377f65967bd9073188d,102,Lex,De Haan,lex.de-haan@contoso.com,Lille,ële-de-France,789 Rue de la Paix,France,8000.0,True,False,2021-03-22,2023-04-07,2023-07-15,2023-07-15,2.0,3.0,,1,2023-07-15T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2,1
5a60257eabea5111d4e8aa0cf56fb109,103,Alexander,Hunold,alexander.hunold@contoso.com,,,,,7000.0,True,True,2018-11-30,2023-02-24,,2023-07-10,,2.0,,1,2023-07-10T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2,1
3b2c89840dc66c48dedbbb8ecac15ada,104,Bruce,Ernst,bruce.ernst@contoso.com,NewCastle,NSW,456 George St,Australia,5800.0,False,False,2017-09-12,2023-04-24,2023-07-14,2023-07-14,3.0,4.0,,1,2023-07-14T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,3,1
523d25c653e123eafe592d1dc8792ca1,300,John,Doe,john.doe,,,,,,,,,2023-04-04,,,,,2.0,1,2023-04-04T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2,1


#### Coalesce with target employee table to capture columns which haven't changed

- Because changes for an id can happen in any of the referential tables, when we get the changes, other columns from other tables will show null as nothing changed there
- to demonstrate the correct view of the record we coalesce with target employee table to retain values of the other columns
- This doesn't cover the edge case scenario : if the actual update is null for a column

In [0]:
changes_df =spark.sql("""
                      select 
                        String(null) as id, --- we will use this value to capture the update of a record using it's natural key which is employee_id
                        coalesce(employee_latest_updates.employee_id, target_employee.employee_id) as employee_id,
                        coalesce(employee_latest_updates.first_name,target_employee.first_name) as first_name,
                        coalesce(employee_latest_updates.last_name, target_employee.last_name) as last_name,
                        coalesce(employee_latest_updates.email, target_employee.email) as email,
                        coalesce(employee_latest_updates.city, target_employee.city) as city,
                        coalesce(employee_latest_updates.region, target_employee.region) as region,
                        coalesce(employee_latest_updates.street_address, target_employee.street_address) as street_address,
                        coalesce(employee_latest_updates.country, target_employee.country) as country,
                        coalesce(employee_latest_updates.salary, target_employee.salary) as salary,
                        coalesce(employee_latest_updates.is_fte, target_employee.is_fte) as is_fte,
                        coalesce(employee_latest_updates.is_remote, target_employee.is_remote) as is_remote,
                        coalesce(employee_latest_updates.employment_date, target_employee.employment_date) as employment_date,
                        coalesce(employee_latest_updates.employee_create_update_date, target_employee.employee_create_update_date) as employee_create_update_date,
                        coalesce(employee_latest_updates.address_create_update_date, target_employee.address_create_update_date) as address_create_update_date,
                        coalesce(employee_latest_updates.info_create_update_date, target_employee.info_create_update_date) as info_create_update_date,
                        coalesce(employee_latest_updates.address_commit_version, target_employee.address_commit_version) as address_commit_version,
                        coalesce(employee_latest_updates.info_commit_version, target_employee.info_commit_version) as info_commit_version,
                        coalesce(employee_latest_updates.employee_commit_version, target_employee.employee_commit_version) as employee_commit_version,

                        --scd
                        int(1) as valid_flag,

                        --- We are combining updates coming from multiple tables.Hence, to identify when the entire record is valid from, we consider greatest date of all changes
                        --- we take the greatest of dates into consideration as the last change came on that date 
                        --- we do lose some history ,as if address change came in earlier
                        --- the row is valid from a later date from which employee table change came in
                        --- consider id=100, street address changed on 2023-07-13 but name hadn't changed to Steven3 yet (it changed on 2023-07-17)
                        --- valid_from of entire record then becomes 2023-07-17 ,as Steven3 is the name change from then and 456 Baker street even though came on 2023-07-13 is still valid from 2023-07-17

                        timestamp(coalesce(
                            greatest(employee_latest_updates.employee_create_update_date,
                            employee_latest_updates.address_create_update_date,
                            employee_latest_updates.info_create_update_date), target_employee.valid_from)) as valid_from,

                        timestamp("9999-12-31") as valid_to,
                        employee_latest_updates.changed_commit as _commit_version

                        --scd
                        from employee_latest_updates  
                        ---we use left join to consider new. inserts as well
                        left join `default`.target_employee on employee_latest_updates.employee_id=target_employee.employee_id
                      """)
display(changes_df)

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,_commit_version
,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000.0,True,False,2020-01-15,2023-07-17,2023-07-13,2023-01-27,4.0,1.0,4,1,2023-07-17T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4
,102,Lex,De Haan,lex.de-haan@contoso.com,Lille,ële-de-France,789 Rue de la Paix,France,8000.0,True,False,2021-03-22,2023-04-07,2023-07-15,2023-07-15,2.0,3.0,1,1,2023-07-15T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2
,103,Alexander,Hunold,alexander.hunold@contoso.com,Tokyo,Tokyo,1-2-3 Shibuya,Japan,7000.0,True,True,2018-11-30,2023-02-24,2023-02-24,2023-07-10,1.0,2.0,1,1,2023-07-10T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2
,104,Bruce,Ernst,bruce.ernst@contoso.com,NewCastle,NSW,456 George St,Australia,5800.0,False,False,2017-09-12,2023-04-24,2023-07-14,2023-07-14,3.0,4.0,1,1,2023-07-14T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,3
,300,John,Doe,john.doe,,,,,,,,,2023-04-04,,,,,2,1,2023-04-04T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2


#### Method 1: Prepare Merge Dataframe

In [0]:
# This cell prepares a dataframe which will be used to apply the merge statement to take the updates to target_employee table. Key items -
# 1. For records which are getting update, add the old records to the dataframe from the delta table employees
# 2. For records which are bringing the updates, populate the id column for them. Keep in mind, the id is same as the id of the old records. This is the surrogate key.
# 3. For new records, add id column value by using the has function.
# 4. Add a new column named merge_key. This column has the same value as the id column, except for the records which are bringing the updates, the merge key is empty. This is done so that the the merge statement we are using later can treat them as new record.

from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Only keep the latest record for each employee
window_spec = Window.partitionBy("employee_id").orderBy(desc("_commit_version"))
changes_df = changes_df.withColumn("rank", rank().over(window_spec))
changes_df = changes_df.filter(changes_df["rank"] < 2)

changed_employee_ids = changes_df.select(collect_list(changes_df['employee_id'])).collect()[0][0]


employees_df = spark.read.format("delta").table("default.target_employee")

merge_df = changes_df.unionByName(
                                    employees_df.where(col("EMPLOYEE_ID").isin(changed_employee_ids)), allowMissingColumns=True
                                  )
# This done to generate new ids for the brand new incoming records
new_employee_ids = merge_df.\
                        groupBy("employee_id").count().where(F.col('count') < 2).\
                        select(collect_list(merge_df['employee_id'])).collect()[0][0]
merge_df = merge_df.withColumn("merge_key",
                        F.when(F.col('employee_id').isin(new_employee_ids), md5(concat("employee_id","first_name","last_name","email"))).
                        otherwise(col('id')))                       

# This is done to take the ids for the old records to the updated records
window_spec = Window.partitionBy("employee_id").orderBy(desc("id"))
merge_df = merge_df.withColumn("id2",
                        F.when(F.col('merge_key').isNull(), lag("merge_key",1).over(window_spec)).
                        otherwise(col('merge_key')))

merge_df = merge_df.drop('id')
merge_df = merge_df.withColumnRenamed("id2", "id")

# merge_df = merge_df.\
#                 where(~F.col('_change_type').eqNullSafe('delete'))

display(merge_df)

employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,_commit_version,rank,merge_key,id
100,Steven,King,steven.king@contoso.com,New York,NY,123 Main St,United States,5000.0,True,False,2020-01-15,2023-01-27,2023-01-27,2023-01-27,1.0,1.0,1,1,2023-01-27T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,,,014830e89472a8ac527a5c6b2c1e5fe5,014830e89472a8ac527a5c6b2c1e5fe5
100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000.0,True,False,2020-01-15,2023-07-17,2023-07-13,2023-01-27,4.0,1.0,4,1,2023-07-17T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,4.0,1.0,,014830e89472a8ac527a5c6b2c1e5fe5
102,Lex,De Haan,lex.de-haan@contoso.com,Paris,ële-de-France,789 Rue de la Paix,France,5500.0,True,False,2021-03-22,2023-04-07,2023-04-07,2023-04-07,1.0,1.0,1,1,2023-04-07T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,,,be613e2a8166f377f65967bd9073188d,be613e2a8166f377f65967bd9073188d
102,Lex,De Haan,lex.de-haan@contoso.com,Lille,ële-de-France,789 Rue de la Paix,France,8000.0,True,False,2021-03-22,2023-04-07,2023-07-15,2023-07-15,2.0,3.0,1,1,2023-07-15T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2.0,1.0,,be613e2a8166f377f65967bd9073188d
103,Alexander,Hunold,alexander.hunold@contoso.com,Tokyo,Tokyo,1-2-3 Shibuya,Japan,5200.0,True,True,2018-11-30,2023-02-24,2023-02-24,2023-02-24,1.0,1.0,1,1,2023-02-24T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,,,5a60257eabea5111d4e8aa0cf56fb109,5a60257eabea5111d4e8aa0cf56fb109
103,Alexander,Hunold,alexander.hunold@contoso.com,Tokyo,Tokyo,1-2-3 Shibuya,Japan,7000.0,True,True,2018-11-30,2023-02-24,2023-02-24,2023-07-10,1.0,2.0,1,1,2023-07-10T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2.0,1.0,,5a60257eabea5111d4e8aa0cf56fb109
104,Bruce,Ernst,bruce.ernst@contoso.com,Sydney,NSW,456 George St,Australia,5800.0,True,False,2017-09-12,2023-04-24,2023-04-24,2023-04-24,1.0,1.0,1,1,2023-04-24T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,,,3b2c89840dc66c48dedbbb8ecac15ada,3b2c89840dc66c48dedbbb8ecac15ada
104,Bruce,Ernst,bruce.ernst@contoso.com,NewCastle,NSW,456 George St,Australia,5800.0,False,False,2017-09-12,2023-04-24,2023-07-14,2023-07-14,3.0,4.0,1,1,2023-07-14T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,3.0,1.0,,3b2c89840dc66c48dedbbb8ecac15ada
300,John,Doe,john.doe,,,,,,,,,2023-04-04,,,,,2,1,2023-04-04T00:00:00.000+0000,9999-12-31T00:00:00.000+0000,2.0,1.0,b8bd1eb9ea11502aca3200866cf29705,b8bd1eb9ea11502aca3200866cf29705


#### Method 1: Apply merge statement

In [0]:
# This cell applies the merge statement using the merge dataframe prepared in the previous cell.
from delta.tables import *
from pyspark.sql.functions import current_timestamp

# Recommendation is to use abfss path. This is purely for demonstrative purpose
target_employee_table = DeltaTable.forPath(spark, f"dbfs:/user/hive/warehouse/target_employee") 

exec_time = current_timestamp()
infinite_end = to_timestamp(lit("9999-12-31"))

merge = target_employee_table.alias('employee').merge(merge_df.alias('employee_update'),
                                          "employee.id = employee_update.merge_key") \
                                        .whenMatchedUpdate(set=
                                                           {
                                                                "id": "employee_update.id",
                                                                "FIRST_NAME": "employee_update.FIRST_NAME",
                                                                "EMPLOYEE_ID": "employee_update.EMPLOYEE_ID",
                                                                "LAST_NAME": "employee_update.LAST_NAME",
                                                                "EMAIL": "employee_update.EMAIL",
                                                                "CITY": "employee_update.CITY",
                                                                "REGION": "employee_update.REGION",
                                                                "STREET_ADDRESS": "employee_update.STREET_ADDRESS",
                                                                "COUNTRY": "employee_update.COUNTRY",
                                                                "SALARY": "employee_update.SALARY",
                                                                "IS_FTE": "employee_update.IS_FTE",
                                                                "IS_REMOTE": "employee_update.IS_REMOTE",
                                                                "EMPLOYMENT_DATE": "employee_update.EMPLOYMENT_DATE",
                                                                "employee_create_update_date": "employee_update.employee_create_update_date",
                                                                "address_create_update_date": "employee_update.address_create_update_date",
                                                                "info_create_update_date": "employee_update.info_create_update_date",
                                                                "address_commit_version": "employee_update.address_commit_version",
                                                                "info_commit_version": "employee_update.info_commit_version",
                                                                "employee_commit_version": "employee_update.employee_commit_version",
                                                                "valid_from": "employee.valid_from",
                                                                "valid_to": exec_time,
                                                                "valid_flag": lit(0)
                                                           }) \
                                        .whenNotMatchedInsert(values=
                                                            {
                                                                "id": "employee_update.id",
                                                                "FIRST_NAME": "employee_update.FIRST_NAME",
                                                                "EMPLOYEE_ID": "employee_update.EMPLOYEE_ID",
                                                                "LAST_NAME": "employee_update.LAST_NAME",
                                                                "EMAIL": "employee_update.EMAIL",
                                                                "CITY": "employee_update.CITY",
                                                                "REGION": "employee_update.REGION",
                                                                "STREET_ADDRESS": "employee_update.STREET_ADDRESS",
                                                                "COUNTRY": "employee_update.COUNTRY",
                                                                "SALARY": "employee_update.SALARY",
                                                                "IS_FTE": "employee_update.IS_FTE",
                                                                "IS_REMOTE": "employee_update.IS_REMOTE",
                                                                "EMPLOYMENT_DATE": "employee_update.EMPLOYMENT_DATE",
                                                                "employee_create_update_date": "employee_update.employee_create_update_date",
                                                                "address_create_update_date": "employee_update.address_create_update_date",
                                                                "info_create_update_date": "employee_update.info_create_update_date",
                                                                "address_commit_version": "employee_update.address_commit_version",
                                                                "info_commit_version": "employee_update.info_commit_version",
                                                                "employee_commit_version": "employee_update.employee_commit_version",
                                                                "valid_from": exec_time,
                                                                "valid_to": infinite_end,
                                                                "valid_flag": lit(1)
                                                           })

merge.execute()

#### Method 1: Check Results

In [0]:
%sql
select * from default.target_employee where EMPLOYEE_ID in (102,100,104,108, 500);

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,123 Main St,United States,5000,True,False,2020-01-15,2023-01-27,2023-01-27,2023-01-27,1,1,1,0,2023-01-27,2023-08-11T12:11:25.988+0000
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-17,2023-07-13,2023-01-27,4,1,4,1,2023-08-11,9999-12-31T00:00:00.000+0000
be613e2a8166f377f65967bd9073188d,102,Lex,De Haan,lex.de-haan@contoso.com,Paris,ële-de-France,789 Rue de la Paix,France,5500,True,False,2021-03-22,2023-04-07,2023-04-07,2023-04-07,1,1,1,0,2023-04-07,2023-08-11T12:11:25.988+0000
be613e2a8166f377f65967bd9073188d,102,Lex,De Haan,lex.de-haan@contoso.com,Lille,ële-de-France,789 Rue de la Paix,France,8000,True,False,2021-03-22,2023-04-07,2023-07-15,2023-07-15,2,3,1,1,2023-08-11,9999-12-31T00:00:00.000+0000
3b2c89840dc66c48dedbbb8ecac15ada,104,Bruce,Ernst,bruce.ernst@contoso.com,Sydney,NSW,456 George St,Australia,5800,True,False,2017-09-12,2023-04-24,2023-04-24,2023-04-24,1,1,1,0,2023-04-24,2023-08-11T12:11:25.988+0000
3b2c89840dc66c48dedbbb8ecac15ada,104,Bruce,Ernst,bruce.ernst@contoso.com,NewCastle,NSW,456 George St,Australia,5800,False,False,2017-09-12,2023-04-24,2023-07-14,2023-07-14,3,4,1,1,2023-08-11,9999-12-31T00:00:00.000+0000


#### Method 2: Historical chaining of records. We will capture all the updates that happens for a certain id and maintain the chain of history in those updates

In [0]:
%sql
-- tmp table
create or replace temp view employee_chain as select 
  id,
  employee_id,
  first_name,
  last_name,
  email,
  city,
  region,
  street_address,
  country,
  salary,
  is_fte,
  is_remote,
  employment_date,
  employee_create_update_date,
  address_create_update_date,
  info_create_update_date,
  address_commit_version,
  info_commit_version,
  employee_commit_version,
  valid_flag,
  valid_from,
  changed_commit,
  lead(greatest(employee_create_update_date, address_create_update_date, info_create_update_date)) over (partition by employee_id order by employee_create_update_date) as valid_to,
   ---take whatever greatest value of dates
  row_number() over (partition by EMPLOYEE_ID order by employee_create_update_date) rn 
from changes_employee_target

##### The LEAD function takes the next start date for the same natural key, ordered by date (dt). The ROW_NUMBER marks with 1 the oldest record in the source data for the natural key in the dimension.

In [0]:
%sql
select * from employee_chain;

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,changed_commit,valid_to,rn
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-01-27,2023-07-13,,4.0,,,1,2023-07-13T00:00:00.000+0000,4,2023-07-13,1
5e64f88f256b598f6c8a801b8d560285,100,Steven2,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-13,2023-07-13,,4.0,,3.0,1,2023-07-13T00:00:00.000+0000,3,2023-07-17,2
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,,,,,2023-07-17,2023-07-13,,4.0,,4.0,1,2023-07-17T00:00:00.000+0000,4,,3
be613e2a8166f377f65967bd9073188d,102,Lex,De Haan,lex.de-haan@contoso.com,Lille,ële-de-France,789 Rue de la Paix,France,8000.0,True,False,2021-03-22,2023-04-07,2023-07-15,2023-07-15,2.0,3.0,,1,2023-07-15T00:00:00.000+0000,2,,1
5a60257eabea5111d4e8aa0cf56fb109,103,Alexander,Hunold,alexander.hunold@contoso.com,,,,,7000.0,True,True,2018-11-30,2023-02-24,,2023-07-10,,2.0,,1,2023-07-10T00:00:00.000+0000,2,,1
3b2c89840dc66c48dedbbb8ecac15ada,104,Bruce,Ernst,bruce.ernst@contoso.com,NewCastle,NSW,456 George St,Australia,5800.0,False,False,2017-09-12,2023-04-24,2023-07-14,2023-07-14,3.0,4.0,,1,2023-07-14T00:00:00.000+0000,3,,1
523d25c653e123eafe592d1dc8792ca1,300,John,Doe,john.doe,,,,,,,,,2023-04-04,,,,,2.0,1,2023-04-04T00:00:00.000+0000,2,,1


#### Method 2: Coalesce with target employee table to capture columns which haven't changed

#### Method 2 : For the sake of illustration we use only one id to show historical chaining of updates is captured
- Target table --> `default`.target_employee_historical

In [0]:
changes_df =spark.sql("""
                      select 
                        coalesce(md5(concat(employee_latest_updates.EMPLOYEE_ID,employee_latest_updates.FIRST_NAME,employee_latest_updates.LAST_NAME,employee_latest_updates.EMAIL, employee_latest_updates.employee_create_update_date)),target_employee.id) as id,
                        coalesce(employee_latest_updates.EMPLOYEE_ID, target_employee.EMPLOYEE_ID) as EMPLOYEE_ID,
                        coalesce(employee_latest_updates.FIRST_NAME,target_employee.FIRST_NAME) as FIRST_NAME,
                        coalesce(employee_latest_updates.LAST_NAME, target_employee.LAST_NAME) as LAST_NAME,
                        coalesce(employee_latest_updates.EMAIL, target_employee.EMAIL) as EMAIL,
                        coalesce(employee_latest_updates.CITY, target_employee.CITY) as CITY,
                        coalesce(employee_latest_updates.REGION, target_employee.REGION) as REGION,
                        coalesce(employee_latest_updates.STREET_ADDRESS, target_employee.STREET_ADDRESS) as STREET_ADDRESS,
                        coalesce(employee_latest_updates.COUNTRY, target_employee.COUNTRY) as COUNTRY,
                        coalesce(employee_latest_updates.SALARY, target_employee.SALARY) as SALARY,
                        coalesce(employee_latest_updates.IS_FTE, target_employee.IS_FTE) as IS_FTE,
                        coalesce(employee_latest_updates.IS_REMOTE, target_employee.IS_REMOTE) as IS_REMOTE,
                        coalesce(employee_latest_updates.EMPLOYMENT_DATE, target_employee.EMPLOYMENT_DATE) as EMPLOYMENT_DATE,

                        coalesce(employee_latest_updates.employee_create_update_date, target_employee.employee_create_update_date) as employee_create_update_date,
                        coalesce(employee_latest_updates.address_create_update_date, target_employee.address_create_update_date) as address_create_update_date,
                        coalesce(employee_latest_updates.info_create_update_date, target_employee.info_create_update_date) as info_create_update_date,

                        coalesce(employee_latest_updates.address_commit_version, target_employee.address_commit_version) as address_commit_version,
                        coalesce(employee_latest_updates.info_commit_version, target_employee.info_commit_version) as info_commit_version,
                        coalesce(employee_latest_updates.employee_commit_version, target_employee.employee_commit_version) as employee_commit_version,
                        employee_latest_updates.valid_flag,
                        employee_latest_updates.valid_from,
                        employee_latest_updates.valid_to,
                        employee_latest_updates.changed_commit as _commit_version,
                        employee_latest_updates.rn
                        from employee_chain  employee_latest_updates
                        ---we use left join to consider new inserts as well
                        left join `default`.target_employee_historical  target_employee on employee_latest_updates.employee_id=target_employee.employee_id
                      """)

## For the sake of illustration we use only one id to show historical chaining of updates is captured
changes_df = changes_df.filter(col("EMPLOYEE_ID")== '100')
changes_df.createOrReplaceTempView("changes")

##store the changes in a separate table
changes_df.write.format("delta").mode("overwrite").saveAsTable("changes_history")
display(changes_df)

id,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,CITY,REGION,STREET_ADDRESS,COUNTRY,SALARY,IS_FTE,IS_REMOTE,EMPLOYMENT_DATE,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,_commit_version,rn
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-01-27,2023-07-13,2023-01-27,4,1,1,1,2023-07-13T00:00:00.000+0000,2023-07-13,4,1
5e64f88f256b598f6c8a801b8d560285,100,Steven2,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-13,2023-07-13,2023-01-27,4,1,3,1,2023-07-13T00:00:00.000+0000,2023-07-17,3,2
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-17,2023-07-13,2023-01-27,4,1,4,1,2023-07-17T00:00:00.000+0000,,4,3


#### Method 2 : update valid_flag=1 for highest rank/latest record

In [0]:
%sql
UPDATE changes_history
SET valid_flag = CASE WHEN rn = (SELECT MAX(rn) FROM changes_history) THEN 1 ELSE 0 END;


num_affected_rows
3


#### Method 2 : update valid_to= "infinite end date" for highest rank/latest record

In [0]:
%sql
UPDATE changes_history
SET valid_to = CASE WHEN rn = (SELECT MAX(rn) FROM changes_history) THEN timestamp('9999-12-31') ELSE valid_to END;

num_affected_rows
3


#### Method 2: In case of historical chaining, the weightage of driving table rules over changes that come from referential table

In [0]:
%sql
select * from changes_history;

id,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,CITY,REGION,STREET_ADDRESS,COUNTRY,SALARY,IS_FTE,IS_REMOTE,EMPLOYMENT_DATE,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to,_commit_version,rn
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-01-27,2023-07-13,2023-01-27,4,1,1,0,2023-07-13T00:00:00.000+0000,2023-07-13,4,1
5e64f88f256b598f6c8a801b8d560285,100,Steven2,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-13,2023-07-13,2023-01-27,4,1,3,0,2023-07-13T00:00:00.000+0000,2023-07-17,3,2
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-17,2023-07-13,2023-01-27,4,1,4,1,2023-07-17T00:00:00.000+0000,9999-12-31,4,3


#### Method 2: Proceed to close the "current" record in Target table using update

In [0]:
%sql
UPDATE target_employee_historical AS t
SET valid_to = (
    SELECT first(valid_to) --- rn=1 stored in changes_history corresponds to original record in target table . Use the calculated valid_to to hence update original record
    FROM changes_history AS c
    WHERE c.employee_id = t.employee_id AND c.rn = 1
),
valid_flag = 0
WHERE EMPLOYEE_ID IN (
    SELECT employee_id
    FROM changes_history
    WHERE rn = 1
) 
---latest records now become stale records
and 
WHERE valid_flag = 1;;


num_affected_rows
1


#### Method 2: Add the new source data to the dimension with insert

In [0]:
%sql
insert into target_employee_historical
select 
  id,
  employee_id,
  first_name,
  last_name,
  email,
  city,
  region,
  street_address,
  country,
  salary,
  is_fte,
  IS_REMOTE,
  employment_date,
  employee_create_update_date,
  address_create_update_date,
  info_create_update_date,
  address_commit_version,
  info_commit_version,
  employee_commit_version,
  valid_flag,
  valid_from,
  valid_to
from changes_history where rn > 1 ---for updates to a natural key

num_affected_rows,num_inserted_rows
2,2


#### Method 2: Check updates

In [0]:
%sql
select * from target_employee_historical where EMPLOYEE_ID in (100);

id,employee_id,first_name,last_name,email,city,region,street_address,country,salary,is_fte,is_remote,employment_date,employee_create_update_date,address_create_update_date,info_create_update_date,address_commit_version,info_commit_version,employee_commit_version,valid_flag,valid_from,valid_to
5e64f88f256b598f6c8a801b8d560285,100,Steven2,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-13,2023-07-13,2023-01-27,4,1,3,0,2023-07-13,2023-07-17T00:00:00.000+0000
08f663dec02e58679ba6e6af5cf67882,100,Steven3,King,steven.king@contoso.com,New York,NY,456 Baker Street,United States,5000,True,False,2020-01-15,2023-07-17,2023-07-13,2023-01-27,4,1,4,1,2023-07-17,9999-12-31T00:00:00.000+0000
014830e89472a8ac527a5c6b2c1e5fe5,100,Steven,King,steven.king@contoso.com,New York,NY,123 Main St,United States,5000,True,False,2020-01-15,2023-01-27,2023-01-27,2023-01-27,1,1,1,0,2023-01-27,2023-07-13T00:00:00.000+0000
