In [223]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.window import Window

In [224]:
#Global Connection Variables

url="jdbc:postgresql://localhost:5432/scd_2"
source_name='source_scd'
dest_name='dest_scd'
username='postgres'
password='Vish@08'

In [191]:
#Globally Used Variables

def_timestamp="9999-12-31 23:59:59"
key_list=["id"]
type2_cols=["company","role"]
scd2_cols = ["effective_date","expiration_date","current_flag"]
DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"

In [227]:
spark=SparkSession.builder.getOrCreate()

In [193]:
#Function for wrting to database
#option("truncate",True).\
def writetoDb(df,url,table_name,username,password):
    df.write.format("jdbc"). \
                option("url", url). \
                option("driver", "org.postgresql.Driver"). \
                option("dbtable", table_name). \
                option("user", username). \
                option("password", password). \
                mode("overwrite").save()
    return

In [194]:
#Function to read from Database
def readfromDb(url,table_name,username,password):
    df=spark.read.format("jdbc"). \
                option("url", url). \
                option("driver", "org.postgresql.Driver"). \
                option("dbtable", table_name). \
                option("user", username). \
                option("password", password). \
                load()
    return df

In [201]:
#Intially we have data in source table
src_df=spark.read.csv(r'D:\pyspark practice\Data_Processing\scd2\employee.csv',header=True)
src_df=src_df.withColumn('id',col('id').cast('int'))
src_df.show()

writetoDb(src_df,url,"source_scd",username,password)


+---+----------+----------+---------+--------------------+
| id|first_name| last_name|  company|                role|
+---+----------+----------+---------+--------------------+
|  1|  Crawford|    Hurich|    Fadeo|              Worker|
|  2|    Karlik|   Barthot|   Kwideo|Construction Manager|
|  4|     Elden|    Berger|    Jazzy|Construction Foreman|
|  5|       Jon|  Bairstow|    Engla|             Creator|
|  6|   Ferrell|Hungerford|  Feedbug|           Developer|
|  7|    Evonne|    Benoit|  Blogtag|Construction Manager|
|  8|     Nixie|  Goldring| Dabshots|       DataScientist|
|  9|Christophe| Churchill|Skynoodle|           Architect|
| 10|     Booth| Leathwood|   Yambee| Construction Worker|
+---+----------+----------+---------+--------------------+



In [202]:
##Taking current dataframe and adding necessary columns to to store in destination table

current_df=readfromDb(url,"source_scd",username,password)

window_spec=Window.orderBy('id')

current_df=current_df.withColumn('sk_id',row_number().over(window_spec)).\
            withColumn('effective_date',date_format(current_timestamp(),DATE_FORMAT)).\
            withColumn('expiration_date',date_format(lit(def_timestamp),DATE_FORMAT)).\
            withColumn('flag',lit(True))

current_df.show()

+---+----------+----------+---------+--------------------+-----+-------------------+-------------------+----+
| id|first_name| last_name|  company|                role|sk_id|     effective_date|    expiration_date|flag|
+---+----------+----------+---------+--------------------+-----+-------------------+-------------------+----+
|  1|  Crawford|    Hurich|    Fadeo|              Worker|    1|2023-11-28 14:19:33|9999-12-31 23:59:59|true|
|  2|    Karlik|   Barthot|   Kwideo|Construction Manager|    2|2023-11-28 14:19:33|9999-12-31 23:59:59|true|
|  4|     Elden|    Berger|    Jazzy|Construction Foreman|    3|2023-11-28 14:19:33|9999-12-31 23:59:59|true|
|  5|       Jon|  Bairstow|    Engla|             Creator|    4|2023-11-28 14:19:33|9999-12-31 23:59:59|true|
|  6|   Ferrell|Hungerford|  Feedbug|           Developer|    5|2023-11-28 14:19:33|9999-12-31 23:59:59|true|
|  7|    Evonne|    Benoit|  Blogtag|Construction Manager|    6|2023-11-28 14:19:33|9999-12-31 23:59:59|true|
|  8|     

In [199]:
#writing the current df to destination table

writetoDb(current_df,url,"dest_scd",username,password)

In [41]:
####SCD_2 Implemnatation
# 1)filtering open (true) records from dest table
# 2)Creating Dataframe having renamed(_history) dest table and adding hash value
#   Creating Dataframe having renamed(_current)  source table and adding hash value
# 3)Merging table with full outer join and adding Action Column(NOCHANGE<INSERT<UPDATE<DELETE)
# 4)Individually taking out all the rows with particular Action
# 5)Merging All the rows and pushing into destination

In [203]:
#Writing Renaming Function, Generating Hash

def rename(df,suffix,append):
    if append:
        new_cols=list(map(lambda x:x+suffix,df.columns))
    else:
        new_cols=list(map(lambda x:x.replace(suffix,""),df.columns))
    
    return df.toDF(*new_cols)

def getHash(df,key_list):
    columns = [col(column) for column in key_list]
    if columns:
        return df.withColumn("hash_md5", md5(concat_ws("", *columns)))
    else:
        return df.withColumn("hash_md5", md5(lit(1)))
    

In [204]:
#current_df

df_current=readfromDb(url,source_name,username,password)
df_current.show()

+---+----------+----------+---------+--------------------+
| id|first_name| last_name|  company|                role|
+---+----------+----------+---------+--------------------+
|  1|  Crawford|    Hurich|    Fadeo|              Worker|
|  2|    Karlik|   Barthot|   Kwideo|Construction Manager|
|  4|     Elden|    Berger|    Jazzy|Construction Foreman|
|  5|       Jon|  Bairstow|    Engla|             Creator|
|  6|   Ferrell|Hungerford|  Feedbug|           Developer|
|  7|    Evonne|    Benoit|  Blogtag|Construction Manager|
|  8|     Nixie|  Goldring| Dabshots|       DataScientist|
|  9|Christophe| Churchill|Skynoodle|           Architect|
| 10|     Booth| Leathwood|   Yambee| Construction Worker|
+---+----------+----------+---------+--------------------+



In [205]:
#history_df is incremental
df_history=readfromDb(url,"incremental",username,password)
df_history.show()


+---+----------+---------+----------+--------------------+-----+-------------------+-------------------+----+
| id|first_name|last_name|   company|                role|sk_id|     effective_date|    expiration_date|flag|
+---+----------+---------+----------+--------------------+-----+-------------------+-------------------+----+
|  1|  Crawford|   Hurich|     Fadeo|             Manager|    1|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
|  2|    Karlik|  Barthot|    Kwideo|Construction Manager|    2|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
|  3|      Ivar|    Riggs|Fivebridge|     Project Manager|    3|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
|  4|     Elden|   Berger|     Jazzy|Construction Foreman|    4|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
|  5|       Jon| Bairstow|     Engla|             Creator|    5|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
+---+----------+---------+----------+--------------------+-----+-------------------+-------------------+----+



In [206]:
#max_sk
def max_skey(df):
    max_sk = df.agg({"sk_id": "max"}).collect()[0][0]
    return max_sk

In [207]:
# filter out open records from df_history
# we don't need to do any changes in closed records
df_history_open = df_history.where(col("flag"))
df_history_closed = df_history.where(col("flag")==lit(False))

In [208]:
df_history_open_hash = rename(getHash(df_history_open, type2_cols), suffix="_history", append=True)
df_current_hash = rename(getHash(df_current, type2_cols), suffix="_current", append=True)

In [209]:
def merge_action(df_history_open_hash):
    df_merged = df_history_open_hash\
            .join(df_current_hash, col("id_current") ==  col("id_history"), how="full_outer")\
            .withColumn("Action", when(col("hash_md5_current") == col("hash_md5_history")  , 'NOCHANGE')\
            .when(col("id_current").isNull(), 'DELETE')\
            .when(col("id_history").isNull(), 'INSERT')\
            .otherwise('UPDATE'))
    return df_merged


In [210]:
##NO CHANGE
df_merged=merge_action(df_history_open_hash)
window_spec=Window.orderBy("id")

df_nochange=rename(df_merged.filter(col("Action")=='NOCHANGE'), suffix="_history", append=False).\
            select(df_history_open.columns)
df_nochange.show()

+---+----------+---------+-------+--------------------+-----+-------------------+-------------------+----+
| id|first_name|last_name|company|                role|sk_id|     effective_date|    expiration_date|flag|
+---+----------+---------+-------+--------------------+-----+-------------------+-------------------+----+
|  2|    Karlik|  Barthot| Kwideo|Construction Manager|    2|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
|  4|     Elden|   Berger|  Jazzy|Construction Foreman|    4|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
|  5|       Jon| Bairstow|  Engla|             Creator|    5|2023-11-28 14:16:10|9999-12-31 23:59:59|true|
+---+----------+---------+-------+--------------------+-----+-------------------+-------------------+----+



In [167]:
df_merged.show()

+----------+------------------+-----------------+---------------+--------------------+-------------+----------------------+-----------------------+------------+--------------------+----------+------------------+-----------------+---------------+--------------------+--------------------+--------+
|id_history|first_name_history|last_name_history|company_history|        role_history|sk_id_history|effective_date_history|expiration_date_history|flag_history|    hash_md5_history|id_current|first_name_current|last_name_current|company_current|        role_current|    hash_md5_current|  Action|
+----------+------------------+-----------------+---------------+--------------------+-------------+----------------------+-----------------------+------------+--------------------+----------+------------------+-----------------+---------------+--------------------+--------------------+--------+
|         1|          Crawford|           Hurich|          Fadeo|             Manager|            1|   2023-1

In [211]:
##INSERT
max_sk=max_skey(df_history)
def insert(df):
    df_insert = rename(df.filter(col("Action") == 'INSERT'), suffix="_current", append=False)\
                    .select(df_current.columns)\
                    .withColumn("effective_date",date_format(current_timestamp(),DATE_FORMAT))\
                    .withColumn("expiration_date",date_format(lit(def_timestamp),DATE_FORMAT))\
                    .withColumn("row_number",row_number().over(window_spec))\
                    .withColumn("sk_id",col("row_number")+ max_sk)\
                    .withColumn("flag", lit(True))\
                    .drop("row_number")
    return df_insert
    
df_insert=insert(df_merged)

df_insert.show()

+---+----------+----------+---------+--------------------+-------------------+-------------------+-----+----+
| id|first_name| last_name|  company|                role|     effective_date|    expiration_date|sk_id|flag|
+---+----------+----------+---------+--------------------+-------------------+-------------------+-----+----+
|  6|   Ferrell|Hungerford|  Feedbug|           Developer|2023-11-28 14:20:30|9999-12-31 23:59:59|    6|true|
|  7|    Evonne|    Benoit|  Blogtag|Construction Manager|2023-11-28 14:20:30|9999-12-31 23:59:59|    7|true|
|  8|     Nixie|  Goldring| Dabshots|       DataScientist|2023-11-28 14:20:30|9999-12-31 23:59:59|    8|true|
|  9|Christophe| Churchill|Skynoodle|           Architect|2023-11-28 14:20:30|9999-12-31 23:59:59|    9|true|
| 10|     Booth| Leathwood|   Yambee| Construction Worker|2023-11-28 14:20:30|9999-12-31 23:59:59|   10|true|
+---+----------+----------+---------+--------------------+-------------------+-------------------+-----+----+



In [212]:
max_sk=max_skey(df_insert)
print(max_sk)

10


In [213]:
#DELETED
df_deleted = rename(df_merged.filter(col("action") == 'DELETE'), suffix="_history", append=False)\
                .select(df_history_open.columns)\
                .withColumn("expiration_date", date_format(current_timestamp(),DATE_FORMAT))\
                .withColumn("flag", lit(False))

df_deleted.show()

+---+----------+---------+----------+---------------+-----+-------------------+-------------------+-----+
| id|first_name|last_name|   company|           role|sk_id|     effective_date|    expiration_date| flag|
+---+----------+---------+----------+---------------+-----+-------------------+-------------------+-----+
|  3|      Ivar|    Riggs|Fivebridge|Project Manager|    3|2023-11-28 14:16:10|2023-11-28 14:20:37|false|
+---+----------+---------+----------+---------------+-----+-------------------+-------------------+-----+



In [214]:
##UPDATE

df_update = rename(df_merged.filter(col("action") == 'UPDATE'), suffix="_history", append=False)\
                .select(df_history_open.columns)\
                .withColumn("expiration_date", date_format(current_timestamp(),DATE_FORMAT))\
                .withColumn("flag", lit(False))\
            .unionByName(
            rename(df_merged.filter(col("action") == 'UPDATE'), suffix="_current", append=False)\
                .select(df_current.columns)\
                .withColumn("effective_date",date_format(current_timestamp(),DATE_FORMAT))\
                .withColumn("expiration_date",date_format(lit(def_timestamp),DATE_FORMAT))\
                .withColumn("row_number",row_number().over(window_spec))\
                .withColumn("sk_id",col("row_number")+ max_sk)\
                .withColumn("flag", lit(True))\
                .drop("row_number")
                )

df_update.show()

+---+----------+---------+-------+-------+-----+-------------------+-------------------+-----+
| id|first_name|last_name|company|   role|sk_id|     effective_date|    expiration_date| flag|
+---+----------+---------+-------+-------+-----+-------------------+-------------------+-----+
|  1|  Crawford|   Hurich|  Fadeo|Manager|    1|2023-11-28 14:16:10|2023-11-28 14:20:43|false|
|  1|  Crawford|   Hurich|  Fadeo| Worker|   11|2023-11-28 14:20:43|9999-12-31 23:59:59| true|
+---+----------+---------+-------+-------+-----+-------------------+-------------------+-----+



In [215]:
##FINAL

df_final = df_history_closed\
            .unionByName(df_nochange)\
            .unionByName(df_insert)\
            .unionByName(df_deleted)\
            .unionByName(df_update)


df_final.show()

+---+----------+----------+----------+--------------------+-----+-------------------+-------------------+-----+
| id|first_name| last_name|   company|                role|sk_id|     effective_date|    expiration_date| flag|
+---+----------+----------+----------+--------------------+-----+-------------------+-------------------+-----+
|  2|    Karlik|   Barthot|    Kwideo|Construction Manager|    2|2023-11-28 14:16:10|9999-12-31 23:59:59| true|
|  4|     Elden|    Berger|     Jazzy|Construction Foreman|    4|2023-11-28 14:16:10|9999-12-31 23:59:59| true|
|  5|       Jon|  Bairstow|     Engla|             Creator|    5|2023-11-28 14:16:10|9999-12-31 23:59:59| true|
|  6|   Ferrell|Hungerford|   Feedbug|           Developer|    6|2023-11-28 14:20:50|9999-12-31 23:59:59| true|
|  7|    Evonne|    Benoit|   Blogtag|Construction Manager|    7|2023-11-28 14:20:50|9999-12-31 23:59:59| true|
|  8|     Nixie|  Goldring|  Dabshots|       DataScientist|    8|2023-11-28 14:20:50|9999-12-31 23:59:59

In [216]:
writetoDb(df_final,url,"dest_scd",username,password)

In [217]:
incremental_df=readfromDb(url,"dest_scd",username,password)
writetoDb(incremental_df,url,"incremental",username,password)

In [248]:
spark.stop()

In [174]:
df_final.show()

+---+----------+----------+---------+--------------------+-----+-------------------+-------------------+----+
| id|first_name| last_name|  company|                role|sk_id|     effective_date|    expiration_date|flag|
+---+----------+----------+---------+--------------------+-----+-------------------+-------------------+----+
|  1|  Crawford|    Hurich|    Fadeo|              Worker|    6|2023-11-28 11:58:55|9999-12-31 23:59:59|true|
|  2|    Karlik|   Barthot|   Kwideo|Construction Manager|    7|2023-11-28 11:58:55|9999-12-31 23:59:59|true|
|  4|     Elden|    Berger|    Jazzy|Construction Foreman|    8|2023-11-28 11:58:55|9999-12-31 23:59:59|true|
|  5|       Jon|  Bairstow|    Engla|             Creator|    9|2023-11-28 11:58:55|9999-12-31 23:59:59|true|
|  6|   Ferrell|Hungerford|  Feedbug|           Developer|   10|2023-11-28 11:58:55|9999-12-31 23:59:59|true|
|  7|    Evonne|    Benoit|  Blogtag|Construction Manager|   11|2023-11-28 11:58:55|9999-12-31 23:59:59|true|
|  8|     

In [246]:
dest_table=spark.read.format('jdbc'). \
     options(
         url='jdbc:postgresql://localhost:5432/scd_2',
         dbtable='information_schema.tables',
         user='postgres',
         password='Vish@08',
         driver='org.postgresql.Driver'). \
     load().filter(col('table_name')=='dest_scd').collect()

In [247]:
print(dest_table)

[Row(table_catalog='scd_2', table_schema='public', table_name='dest_scd', table_type='BASE TABLE', self_referencing_column_name=None, reference_generation=None, user_defined_type_catalog=None, user_defined_type_schema=None, user_defined_type_name=None, is_insertable_into='YES', is_typed='NO', commit_action=None)]
