In [45]:
from pyspark.sql.functions import * 
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

In [46]:
spark = SparkSession.builder \
        .appName("SCD Implementation application") \
        .config("spark.sql.shuffle.partitions",3) \
        .master("local[2]") \
        .getOrCreate()

In [47]:
DATE_FORMAT = "yyyy-MM-dd"
future_date = "9999-12-31"
source_url = "../data/source/"
destination_url = "../data/target/"
primary_key = ["customerid"]
slowly_changing_cols = [ "email", "phone", "address", "city", "state", "zipcode"]
implementation_cols = ["effective_date", "end_date", "active_flag"]

In [48]:
customers_source_schema = "customerid long,firstname string, lastname string, email string, phone string, address string, city string, state string, zipcode long"

In [49]:
customers_target_schema = "customerid long,firstname string, lastname string, email string, phone string, address string, city string, state string, zipcode long, customer_skey long, effective_date date, end_date date, active_flag boolean"

In [50]:
customers_source_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_source_schema) \
.load(source_url)

In [51]:
customers_source_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|
|         5|     Robert|   Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|       Caguas|   PR|    725|
|         6|       Mary|    Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ|   7055|
|         7|    Melissa|   Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High 

In [52]:
window_def = Window.orderBy("customerid")

In [54]:
enhanced_customers_source_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_source_schema) \
.load(source_url) \
.withColumn("customer_skey", row_number().over(window_def)) \
.withColumn("effective_date", date_format(current_date(), DATE_FORMAT)) \
.withColumn("end_date", date_format(lit(future_date), DATE_FORMAT)) \
.withColumn("active_flag", lit(True))

In [55]:
enhanced_customers_source_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|            1|    2024-07-28|9999-12-31|       true|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|            2|    2024-07-28|9999-12-31|       true|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|            3|    2024-07-28|9999-12-31|       true|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Lit

In [19]:
enhanced_customers_source_df.write.mode('overwrite') \
.option("header",True) \
.option("delimiter",",") \
.csv(destination_url)

In [56]:
customers_target_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_target_schema) \
.load(destination_url)

In [57]:
customers_target_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|            1|    2024-07-28|9999-12-31|       true|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|            2|    2024-07-28|9999-12-31|       true|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|            3|    2024-07-28|9999-12-31|       true|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Lit

In [58]:
customers_source_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|
|         5|     Robert|   Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|       Caguas|   PR|    725|
|         6|       Mary|    Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ|   7055|
|         7|    Melissa|   Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High 

In [60]:
max_sk = customers_target_df.agg({"customer_skey": "max"}).collect()[0][0]

In [61]:
print(max_sk)

12433


In [63]:
# Reading the customers source dataframe again to read the new customers data
customers_source_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_source_schema) \
.load(source_url)

customers_source_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX| 7108 Belmont Avenue|       Newark|   NJ|   7103|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|5592 Crystal Rive...|        Tampa|   FL|  34423|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|1184 Boat Lake Drive|     Richmond|   VA|  23220|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|
|         5|     Robert|   Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|       Caguas|   PR|    725|
|         6|       Mary|    Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ|   7055|
|         7|    Melissa|   Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High 

In [64]:
active_customers_target_df = customers_target_df.where(col("active_flag")==True)

In [65]:
inactive_customers_target_df = customers_target_df.where(col("active_flag")==False)

In [66]:
active_customers_target_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|            1|    2024-07-28|9999-12-31|       true|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|            2|    2024-07-28|9999-12-31|       true|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|            3|    2024-07-28|9999-12-31|       true|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Lit

In [68]:
active_customers_target_df.join(customers_source_df, "customerid", "full_outer").show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|customerid|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|  firstname| lastname|    email|    phone|             address|         city|state|zipcode|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|            1|    2024-07-28|9999-12-31|       true|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX| 7108 Belmont Avenue|       Newark|   NJ|   7103|
|         2|

In [70]:
def column_renamer(df, suffix, append):
   
    if append:
        new_column_names = list(map(lambda x: x+suffix, df.columns))
        
    else:
        new_column_names = list(map(lambda x: x.replace(suffix,""), df.columns))
        
    return df.toDF(*new_column_names)

def get_hash(df, keys_list):
 
    columns = [col(column) for column in keys_list]
    
    if columns:
        return df.withColumn("hash_md5", md5(concat_ws("", *columns)))
    else:
        return df.withColumn("hash_md5", md5(lit(1)))


In [71]:
active_customers_target_df_hash = column_renamer(get_hash(active_customers_target_df, slowly_changing_cols), suffix="_target", append=True)

customers_source_df_hash = column_renamer(get_hash(customers_source_df, slowly_changing_cols), suffix="_source", append=True)


In [72]:
active_customers_target_df_hash.show()

+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+---------------------+---------------+------------------+--------------------+
|customerid_target|firstname_target|lastname_target|email_target|phone_target|      address_target|  city_target|state_target|zipcode_target|customer_skey_target|effective_date_target|end_date_target|active_flag_target|     hash_md5_target|
+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+---------------------+---------------+------------------+--------------------+
|                1|         Richard|      Hernandez|   XXXXXXXXX|   XXXXXXXXX|  6303 Heather Plaza|  Brownsville|          TX|         78521|                   1|           2024-07-28|     9999-12-31|              true|f2c1345c132ad8174...|
|                2|            Mary|

In [73]:
customers_source_df_hash.show()

+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+
|customerid_source|firstname_source|lastname_source|email_source|phone_source|      address_source|  city_source|state_source|zipcode_source|     hash_md5_source|
+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+
|                1|         Richard|      Hernandez|   XXXXXXXXX|   XXXXXXXXX| 7108 Belmont Avenue|       Newark|          NJ|          7103|5c47f014e92644a46...|
|                2|            Mary|        Barrett|   XXXXXXXXX|   XXXXXXXXX|5592 Crystal Rive...|        Tampa|          FL|         34423|b13597ee5c51c23f4...|
|                3|             Ann|          Smith|   XXXXXXXXX|   XXXXXXXXX|1184 Boat Lake Drive|     Richmond|          VA|         23220|962342583e9751c48...|
|                4|   

In [74]:
merged_df = active_customers_target_df_hash.join(customers_source_df_hash, col("customerid_source") ==  col("customerid_target") , "full_outer") \
.withColumn("Action", when(col("hash_md5_source") == col("hash_md5_target")  , 'NOCHANGE')\
.when(col("customerid_source").isNull(), 'DELETE')\
.when(col("customerid_target").isNull(), 'INSERT')\
.otherwise('UPDATE'))

In [75]:
merged_df.show()

+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+---------------------+---------------+------------------+--------------------+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+--------+
|customerid_target|firstname_target|lastname_target|email_target|phone_target|      address_target|  city_target|state_target|zipcode_target|customer_skey_target|effective_date_target|end_date_target|active_flag_target|     hash_md5_target|customerid_source|firstname_source|lastname_source|email_source|phone_source|      address_source|  city_source|state_source|zipcode_source|     hash_md5_source|  Action|
+-----------------+----------------+---------------+------------+------------+--------------------+-------------+------------+--------------+--------------------+----------------

In [76]:
unchanged_records = column_renamer(merged_df.filter(col("action") == 'NOCHANGE'), suffix="_target", append=False).select(active_customers_target_df.columns)

In [77]:
unchanged_records.show()

+----------+-----------+----------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname|  lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+----------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|         4|       Mary|     Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|            4|    2024-07-28|9999-12-31|       true|
|         6|       Mary|     Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ|   7055|            6|    2024-07-28|9999-12-31|       true|
|         7|    Melissa|    Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High Concession|       Caguas|   PR|    725|            7|    2024-07-28|9999-12-31|       true|
|        10|    Melissa|     Smith|XXXXXXXXX|XXXXXXXXX|859

In [79]:
insert_records = column_renamer(merged_df.filter(col("action") == 'INSERT'), suffix="_source", append=False) \
                .select(customers_source_df.columns)\
                .withColumn("row_number",row_number().over(window_def))\
                .withColumn("customer_skey",col("row_number") + max_sk)\
                .withColumn("effective_date",date_format(current_date(),DATE_FORMAT))\
                .withColumn("end_date",date_format(lit(future_date),DATE_FORMAT))\
                .withColumn("active_flag", lit(True))\
                .drop("row_number")

insert_records.show()

+----------+---------+--------+---------+---------+---------------+----------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname|lastname|    email|    phone|        address|      city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+---------+--------+---------+---------+---------------+----------+-----+-------+-------------+--------------+----------+-----------+
|     12436|    Grace|  Parker|XXXXXXXXX|XXXXXXXXX|890 Pine Avenue|Townsville|   TX|  54321|        12434|    2024-07-28|9999-12-31|       true|
|     12437|   Connor|   Evans|XXXXXXXXX|XXXXXXXXX| 567 Oak Street|  Cityview|   CA|  98765|        12435|    2024-07-28|9999-12-31|       true|
+----------+---------+--------+---------+---------+---------------+----------+-----+-------+-------------+--------------+----------+-----------+



In [80]:
max_sk = insert_records.agg({"customer_skey": "max"}).collect()[0][0]

In [81]:
print(max_sk)

12435


In [82]:
update_records = column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_target", append=False)\
                .select(active_customers_target_df.columns)\
                .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\
                .withColumn("active_flag", lit(False))\
            .unionByName(
            column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_source", append=False)\
                .select(customers_source_df.columns)\
                .withColumn("effective_date",date_format(current_date(),DATE_FORMAT))\
                .withColumn("end_date",date_format(lit(future_date),DATE_FORMAT))\
                .withColumn("row_number",row_number().over(window_def))\
                .withColumn("customer_skey",col("row_number")+ max_sk)\
                .withColumn("active_flag", lit(True))\
                .drop("row_number")
                )


In [83]:
update_records.show()

+----------+---------+----------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname|  lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+---------+----------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|         1|  Richard| Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|            1|    2024-07-28|2024-07-28|      false|
|         2|     Mary|   Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|            2|    2024-07-28|2024-07-28|      false|
|         3|      Ann|     Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|            3|    2024-07-28|2024-07-28|      false|
|         5|   Robert|    Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River 

In [84]:
max_sk = update_records.agg({"customer_skey": "max"}).collect()[0][0]

In [85]:
print(max_sk)

12535


In [86]:
delete_records = column_renamer(merged_df.filter(col("action") == 'DELETE'), suffix="_target", append=False)\
                .select(active_customers_target_df.columns)\
                .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\
                .withColumn("active_flag", lit(False))

delete_records.show()

+----------+---------+--------+---------+---------+----------------+-----------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname|lastname|    email|    phone|         address|       city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+---------+--------+---------+---------+----------------+-----------+-----+-------+-------------+--------------+----------+-----------+
|     12435|    Laura|  Horton|XXXXXXXXX|XXXXXXXXX|5736 Honey Downs|Summerville|   SC|  29483|        12433|    2024-07-28|2024-07-28|      false|
+----------+---------+--------+---------+---------+----------------+-----------+-----+-------+-------------+--------------+----------+-----------+



In [87]:
resultant_df = inactive_customers_target_df \
            .unionByName(unchanged_records)\
            .unionByName(insert_records)\
            .unionByName(update_records)\
            .unionByName(delete_records)

resultant_df.show()

+----------+-----------+----------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname|  lastname|    email|    phone|             address|         city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+----------+---------+---------+--------------------+-------------+-----+-------+-------------+--------------+----------+-----------+
|         4|       Mary|     Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|            4|    2024-07-28|9999-12-31|       true|
|         6|       Mary|     Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ|   7055|            6|    2024-07-28|9999-12-31|       true|
|         7|    Melissa|    Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High Concession|       Caguas|   PR|    725|            7|    2024-07-28|9999-12-31|       true|
|        10|    Melissa|     Smith|XXXXXXXXX|XXXXXXXXX|859

In [88]:
spark.stop()