*Creating a sample dataframe to be inserted as a raw layer (bronze) Note that OBJVERSION IS IMPORTANT FOR THIS*

In [0]:
s = """
Name,Surname$,City,OBJVERSION
John,Doe,Bangalore,1
Jim,Jam,California,1
Allen,Solly,Unknown,1
"""
from io import StringIO
import pandas as pd
pdf = pd.read_csv(StringIO(s),sep=',')
pdf

Unnamed: 0,Name,Surname$,City,OBJVERSION
0,John,Doe,Bangalore,1
1,Jim,Jam,California,1
2,Allen,Solly,Unknown,1


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS SCD2 --creating a test database

In [0]:
d = spark.createDataFrame(pdf) #converting the pandas dataframe to spark dataframe for testing
display(d)
#getting the DDL statement dynamically to create the raw table
schema_json = d.schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)

Name,Surname$,City,OBJVERSION
John,Doe,Bangalore,1
Jim,Jam,California,1
Allen,Solly,Unknown,1


In [0]:
#Creating the raw table
spark.sql(f"""DROP TABLE IF EXISTS SCD2.testscd_two_raw;""")
spark.sql(f"""
CREATE TABLE SCD2.testscd_two_raw (
{ddl}
)

""")

In [0]:
#Insering the data to the raw table for testing
d.write.mode("overwrite").saveAsTable("SCD2.testscd_two_raw")

In [0]:
%sql
SELECT * FROM  SCD2.testscd_two_raw;

Name,Surname$,City,OBJVERSION
Allen,Solly,Unknown,1
John,Doe,Bangalore,1
Jim,Jam,California,1


*CREATE A DELTA TABLE FOR CLEANSED WITH ADDITIONAL FOUR COLUMNS*

In [0]:
spark.sql(f"""DROP TABLE IF EXISTS SCD2.testscd_two_cleansed;""")
spark.sql(f"""
CREATE TABLE SCD2.testscd_two_cleansed (
{ddl}

,CDC_FLAG                           STRING

,CDC_START_TS                        TIMESTAMP

,CDC_END_TS                          TIMESTAMP

,ACTIVE_IND                          STRING

)
USING DELTA LOCATION "/tmp/delta-table"
""")



*IMPLEMENTING THE FIRST ROUND OF CDC FOR ALL INSERTS*

In [0]:
import pyscd as CDC #https://pypi.org/project/pyscd/
from delta import DeltaTable

In [0]:
df_raw = spark.read.table("""SCD2.testscd_two_raw""")
df_cleansed_delta = DeltaTable.forPath(spark,"/tmp/delta-table")

In [0]:
CDC.update_del_records(df_raw,df_cleansed_delta,['Name'])

In [0]:
CDC.upsert_from_source(df_raw,df_cleansed_delta,['Name'])

In [0]:
%sql
SELECT * FROM  SCD2.testscd_two_cleansed;

Name,Surname$,City,OBJVERSION,CDC_FLAG,CDC_START_TS,CDC_END_TS,ACTIVE_IND
Allen,Solly,Unknown,1,I,2021-08-15T14:46:24.685+0000,9999-12-01T00:00:00.000+0000,Y
John,Doe,Bangalore,1,I,2021-08-15T14:46:24.685+0000,9999-12-01T00:00:00.000+0000,Y
Jim,Jam,California,1,I,2021-08-15T14:46:24.685+0000,9999-12-01T00:00:00.000+0000,Y


*CHANGING THE SOURCE DATA (NAME) -TEST FOR UPDATES AND DELETES*

In [0]:
#Deleting row Allen Solly and updating the row Jim Jam to Jim Jimmy (Note that primary key here is the Name column)
s = """
Name,Surname$,City,OBJVERSION
John,Doe,Bangalore,1
Jim,Jimmy,California,2
"""
from io import StringIO
import pandas as pd
pdf = pd.read_csv(StringIO(s),sep=',')
spark.createDataFrame(pdf).write.mode("overwrite").saveAsTable("SCD2.testscd_two_raw")

In [0]:
%sql
--CHECKING THE NEW RAW TABLE
SELECT * FROM SCD2.testscd_two_raw

Name,Surname$,City,OBJVERSION
Jim,Jimmy,California,2
John,Doe,Bangalore,1


In [0]:
CDC.update_del_records(df_raw,df_cleansed_delta,['Name'])

In [0]:
CDC.upsert_from_source(df_raw,df_cleansed_delta,['Name'])

In [0]:
%sql
SELECT * FROM  SCD2.testscd_two_cleansed;

Name,Surname$,City,OBJVERSION,CDC_FLAG,CDC_START_TS,CDC_END_TS,ACTIVE_IND
Allen,Solly,Unknown,1,D,2021-08-15T14:46:24.685+0000,2021-08-15T14:47:54.857+0000,N
Jim,Jimmy,California,2,I,2021-08-15T14:48:45.463+0000,9999-12-01T00:00:00.000+0000,Y
Jim,Jam,California,1,U,2021-08-15T14:46:24.685+0000,2021-08-15T14:48:45.463+0000,N
John,Doe,Bangalore,1,I,2021-08-15T14:46:24.685+0000,9999-12-01T00:00:00.000+0000,Y


*As we can see , the row which was deleted has been flagged as D and the row which was updated has been flagged as U both with ACTIVE_IND='Y' which means in all our queries we should be pulling ACTIVE_IND='Y' for the latest data*