In [0]:
%sql
create or replace table DimData(CID INT,
                                FirstName string,
                                LastName string,
                                City string,
                                ContactNo INT,
                                isActive String,
                                StartDate timestamp,
                                Enddate timestamp)
                                USING delta
                                LOCATION '/FILESTORE/TABLES/SCD2DEMO'

**Existing data in dimension table in warehouse or you can say existing data in target table**


| CID | FirstName |  LastName | City     | ContactNo | IsActive | StartDate | EndDate

| 1   | Narendra  | Modi      |Ahmedabad | 412       |  Y       | XXXXX     | 9999-12-31

| 2   | Rahul     | Gandhi    |Delhi     | 512       |  Y       | XXXXX     | 9999-12-31

| 3   | AMit      | Shah      |Ahmedabad | 612       |  Y       | XXXXX     | 9999-12-31





In [0]:
%sql
insert into DimData values(1,'Narendra','Modi','Ahmedabad',412,'Y',current_timestamp(),'9999-12-31');
insert into DimData values(2,'Rahul','Gandhi','Delhi',512,'Y',current_timestamp(),'9999-12-31');
insert into DimData values(3,'Amit','Shah','Ahmedabad',612,'Y',current_timestamp(),'9999-12-31');



num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from DimData

CID,Firstname,LastName,City,ContactNo,isActive,StartDate,Enddate
1,Narendra,Modi,Ahmedabad,412,Y,2023-09-19T21:52:01.884+0000,9999-12-31T00:00:00.000+0000
3,Amit,Shah,Ahmedabad,612,Y,2023-09-19T21:52:08.872+0000,9999-12-31T00:00:00.000+0000
2,Rahul,Gandhi,Delhi,512,Y,2023-09-19T21:52:05.702+0000,9999-12-31T00:00:00.000+0000


In [0]:
#create a table instance named DimDataInstance so that it can be used in pyspark rather than using sql syntax 
from delta import *
DimDataInstance=DeltaTable.forPath(spark,"/FILESTORE/TABLES/SCD2DEMO")


In [0]:
#create a data frame using table instance
TargetDF=DimDataInstance.toDF()
display(TargetDF)

CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate
1,Narendra,Modi,Ahmedabad,412,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000
3,Amit,Shah,Ahmedabad,612,Y,2023-09-19T22:40:32.572+0000,9999-12-31T00:00:00.000+0000
2,Rahul,Gandhi,Delhi,512,Y,2023-09-19T22:40:28.802+0000,9999-12-31T00:00:00.000+0000


In [0]:
#define a schema to be used for source dataframe
from pyspark.sql.types import *
from pyspark.sql.functions import *
sourceschema=StructType([StructField("SourceCID", StringType(), True),\
                    StructField("SourceFirstName", StringType(), True),\
                    StructField("SourceLastName", StringType(), True),\
                    StructField("SourceCity", StringType(), True),\
                    StructField("SourceContactno", IntegerType(), True)])

In [0]:
#give data for source table and use the schema created earlier to create source dataframe
sourcedata=[(1,'Narendra','Modi','Delhi',412),#changed city
(2,'Rahul','Gandhi','Delhi',512), # no change in this record
(4,'Aditya','Nath','Lucknow',612)] #new record
SourceDF=spark.createDataFrame(data=sourcedata,schema=sourceschema)
display(SourceDF)

SourceCID,SourceFirstName,SourceLastName,SourceCity,SourceContactno
1,Narendra,Modi,Delhi,412
2,Rahul,Gandhi,Delhi,512
4,Aditya,Nath,Lucknow,612


In [0]:
#now we have SourceDF with 1 updated, 1 deleted and 1 new record and TargetDF which is to be updated (upsert). Please note TargetDF is pointing to a table instance and hence update to that data frame will be reflected in underlyfing table named DimData

JoinDF = SourceDF.join(TargetDF,(SourceDF.SourceCID==TargetDF.CID) &\
                        (TargetDF.isActive=="Y"),"fullouter")\
                        .select(SourceDF["*"],\
                            TargetDF["*"])
display(JoinDF)

FilterDF=JoinDF.filter(xxhash64(JoinDF.SourceFirstName,JoinDF.SourceLastName,JoinDF.SourceCity)!=
                       xxhash64(JoinDF.FirstName,JoinDF.LastName,JoinDF.City))

display(FilterDF)


SourceCID,SourceFirstName,SourceLastName,SourceCity,SourceContactno,CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate
1.0,Narendra,Modi,Delhi,412.0,1.0,Narendra,Modi,Ahmedabad,412.0,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000
2.0,Rahul,Gandhi,Delhi,512.0,2.0,Rahul,Gandhi,Delhi,512.0,Y,2023-09-19T22:40:28.802+0000,9999-12-31T00:00:00.000+0000
,,,,,3.0,Amit,Shah,Ahmedabad,612.0,Y,2023-09-19T22:40:32.572+0000,9999-12-31T00:00:00.000+0000
4.0,Aditya,Nath,Lucknow,612.0,,,,,,,,


SourceCID,SourceFirstName,SourceLastName,SourceCity,SourceContactno,CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate
1.0,Narendra,Modi,Delhi,412.0,1.0,Narendra,Modi,Ahmedabad,412.0,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000
,,,,,3.0,Amit,Shah,Ahmedabad,612.0,Y,2023-09-19T22:40:32.572+0000,9999-12-31T00:00:00.000+0000
4.0,Aditya,Nath,Lucknow,612.0,,,,,,,,


In [0]:
#source CID null means deleted record
#target CID null means new record


#add a column named MergeKey and assign TargetCID to it
MergeDF=FilterDF.withColumn("MergeKey",FilterDF.CID)
display(MergeDF)



SourceCID,SourceFirstName,SourceLastName,SourceCity,SourceContactno,CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate,MergeKey
1.0,Narendra,Modi,Delhi,412.0,1.0,Narendra,Modi,Ahmedabad,412.0,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000,1.0
,,,,,3.0,Amit,Shah,Ahmedabad,612.0,Y,2023-09-19T22:40:32.572+0000,9999-12-31T00:00:00.000+0000,3.0
4.0,Aditya,Nath,Lucknow,612.0,,,,,,,,,


In [0]:
#when matched, existing record with same CID willbe updated(made inactive and end date). So in order to handle the changed record, we need to close old record but also need to insert new record. Hence we need 2 copies of that record. In our case its CID=1 which needs to be first updated and then inserted for new active record.

#create a dummy record for all the CHANGED records. Changed records are those which has source CID and target CID as well. 
#make mergekey as null for this dummy record
DummyDF=FilterDF.filter(FilterDF.SourceCID.isNotNull()&FilterDF.CID.isNotNull()).withColumn("MergeKey",lit(None))
display(DummyDF)

SourceCID,SourceFirstName,SourceLastName,SourceCity,SourceContactno,CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate,MergeKey
1,Narendra,Modi,Delhi,412,1,Narendra,Modi,Ahmedabad,412,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000,


In [0]:
#now make a union of old data frame and this dummy record to make final data frame which will be used as source dataframe for upsert

ScdDF=MergeDF.union(DummyDF)
display(ScdDF)

SourceCID,SourceFirstName,SourceLastName,SourceCity,SourceContactno,CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate,MergeKey
1.0,Narendra,Modi,Delhi,412.0,1.0,Narendra,Modi,Ahmedabad,412.0,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000,1.0
,,,,,3.0,Amit,Shah,Ahmedabad,612.0,Y,2023-09-19T22:40:32.572+0000,9999-12-31T00:00:00.000+0000,3.0
4.0,Aditya,Nath,Lucknow,612.0,,,,,,,,,
1.0,Narendra,Modi,Delhi,412.0,1.0,Narendra,Modi,Ahmedabad,412.0,Y,2023-09-19T22:40:23.335+0000,9999-12-31T00:00:00.000+0000,


In [0]:
DimDataInstance.alias("target").merge(
    source= ScdDF.alias("source"), 
    condition="target.CID=source.MergeKey and target.isActive='Y'"
). whenMatchedUpdate (set=
                      {
                          "isActive":"'N'",
                          "EndDate": "current_date"
                          
                      }
).whenNotMatchedInsert(values=
                       {"CID":"source.SourceCID",
                        "FirstName":"source.SourceFirstName",
                        "LastName":"source.SourceLastName",
                        "City":"source.SourceCity",
                        "ContactNo":"source.SourceContactNo",
                        "isActive":"'Y'",
                        "StartDate":"current_date",
                        "EndDate":"""to_date('9999-12-31','yyyy-MM-dd')"""

}
                       ).execute()


In [0]:
%sql
select * from DimData

CID,FirstName,LastName,City,ContactNo,isActive,StartDate,Enddate
1,Narendra,Modi,Ahmedabad,412,N,2023-09-19T22:40:23.335+0000,2023-09-19T00:00:00.000+0000
3,Amit,Shah,Ahmedabad,612,N,2023-09-19T22:40:32.572+0000,2023-09-19T00:00:00.000+0000
4,Aditya,Nath,Lucknow,612,Y,2023-09-19T00:00:00.000+0000,9999-12-31T00:00:00.000+0000
1,Narendra,Modi,Delhi,412,Y,2023-09-19T00:00:00.000+0000,9999-12-31T00:00:00.000+0000
2,Rahul,Gandhi,Delhi,512,Y,2023-09-19T22:40:28.802+0000,9999-12-31T00:00:00.000+0000
