In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("SCD2").getOrCreate()

In [531]:
import pyspark.sql.functions as f
import os
import pyspark.sql.types as t

#Defining Various pararmeters in order to make code generic

In [539]:
source_table="/data\scd2\source.txt" ## this path could be a hive table as well
target_table="/data//scd2//target//target.csv" ## this path could be a hive table as well
table_schema="id,name,dob,city,state,country,loaddate" # common columns in source and target table
joining_keys="id" #column used to perform join between source and target 
scd_columns="name,dob,city,state,country" # columns which will be used to perform SCD
multi_day="loaddate" # column which will be used to perform multiday SCD
hashing_column="surrogate_key" # hashed value stored in target in order to store hash value
scd_helpers="start_date,end_date,is_active" #columns which will be used to identify SCDs

In [533]:
source=spark.read.format("csv").load(source_table,header=True)
source.show()

+---+-------------+----------+----+-----------+-------+----------+
| id|         name|       dob|city|      state|country|  loaddate|
+---+-------------+----------+----+-----------+-------+----------+
|  1|       AKSHAY|1989-11-13|PUNE|MAHARASHTRA|  INDIA|2019-03-10|
|  1|  AKSHAY JAIN|1989-11-13|PUNE|MAHARASHTRA|  INDIA|2019-03-11|
|  2|  MONIKA JAIN|1992-07-26|PUNE|MAHARASHTRA|  INDIA|2019-03-10|
|  2|MONIKA A JAIN|1992-07-26|PUNE|MAHARASHTRA|  INDIA|2019-03-11|
|  3|   AYUSH JAIN|1991-09-01|PUNE|MAHARASHTRA|  INDIA|2019-03-11|
|  3|   AYUSH JAIN|1991-09-01|PUNE|MAHARASHTRA|  INDIA|2019-03-12|
+---+-------------+----------+----+-----------+-------+----------+



Preparing some keys / variables used for multi day processsing


In [534]:
##Preparing some variables to be used later for multi day processing
existing_rec_keys=joining_keys+','+hashing_column
i=0
days_to_process=source.select(multi_day).distinct().orderBy(multi_day).rdd.map(lambda x:str(x[0])).collect()


#Generating HASH values for the source data in order to facilitate comparison of data

In [535]:
source=source.withColumn(hashing_column,f.sha2(f.concat_ws("|",*scd_columns.split(",")),256))

Reading Data from target table (if exists) else creating a empty dataframe

In [536]:

if os.path.exists(target_table):
    target=spark.read.format("csv").load(target_table,header=True)

else:
    target_full_schema=hashing_column+","+table_schema+","+scd_helpers
    target_schema=t.StructType([t.StructField(fieldname,t.StringType(),True) for fieldname in target_full_schema.split(",")])
    target=spark.createDataFrame(sc.emptyRDD(),target_schema)
    
target.show()
target_schema=target.columns
#target_schema

+-------------+---+----+---+----+-----+-------+--------+----------+--------+---------+
|surrogate_key| id|name|dob|city|state|country|loaddate|start_date|end_date|is_active|
+-------------+---+----+---+----+-----+-------+--------+----------+--------+---------+
+-------------+---+----+---+----+-----+-------+--------+----------+--------+---------+



In [537]:

source_full=source
days_to_process

['2019-03-10', '2019-03-11', '2019-03-12']

# SCD2 Implementation
    Below is the code performing multi day SCD on test dataset, which can work with any data set.

In [538]:
while i<len(days_to_process):
    source=source_full.filter(condition=multi_day+"=='"+str(days_to_process[i])+"'")
    print('SCD2 started for ',days_to_process[i])
    new_rec=source.alias('s').join(target.alias('t'),how="left",on=joining_keys.split(",")).filter(condition="t."+joining_keys+" is null").select("s.*").withColumn('start_date',f.col('loaddate')).withColumn('end_date',f.lit('9999-12-31')).withColumn('is_active',f.lit('Y')).selectExpr(*target_schema)
    update_rec=source.alias('s').join(target.alias('t'),how="inner",on=joining_keys.split(",")).filter(condition="s."+hashing_column+"!=t."+hashing_column).select("s.*").withColumn('start_date',f.col('s.loaddate')).withColumn('end_date',f.lit('9999-12-31')).withColumn('is_active',f.lit('Y')).selectExpr(*target_schema)
    update_old_rec=source.alias('s').join(target.alias('t'),how="inner",on=joining_keys.split(",")).filter(condition="s."+hashing_column+"!=t."+hashing_column).select("t.*","s."+joining_keys+"").withColumn('end_date',f.col('t.loaddate')).withColumn('is_active',f.lit('N')).selectExpr(*target_schema)
    existing_rec=source.alias('s').join(target.alias('t'),how="inner",on=existing_rec_keys.split(",")).select(joining_keys,hashing_column,'t.*').selectExpr(*target_schema)
    delete_rec=source.alias('s').join(target.alias('t'),how="right",on=joining_keys.split(",")).filter(condition="s."+joining_keys+" is null").withColumn('end_date',f.col('t.'+multi_day)).withColumn('is_active',f.lit('N')).select('t.*','end_date','is_active').selectExpr(*target_schema)
    target_final=new_rec.unionAll(update_rec).unionAll(existing_rec).unionAll(delete_rec).unionAll(update_old_rec)
    ##Converting spark dataframe to pandas as I am working in windows . Usually overwrite the data directly into HIVE table
    target_final.toPandas().to_csv(target_table,index=False)
    target=target_final
    print('Data loading completed for ',days_to_process[i])
    if(i==len(days_to_process)-1):
        print('Data loading completed for all days')
    i+=1
    


('SCD2 started for ', '2019-03-10')
('Data loading completed for ', '2019-03-10')
('SCD2 started for ', '2019-03-11')
('Data loading completed for ', '2019-03-11')
('SCD2 started for ', '2019-03-12')
('Data loading completed for ', '2019-03-12')
Data loading completed for all days
