In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import current_date, current_timestamp, date_format, to_date, date_sub, round, sum, concat_ws, md5, col, lit, monotonically_increasing_id, when, row_number

In [2]:
spark = SparkSession.builder. \
                    master("local"). \
                    appName("SCD_Type_2_Implementation") .\
                    getOrCreate()

In [3]:
# URL to connect to my postgresql local server
url = "jdbc:postgresql://localhost:5432/retail_db"

In [4]:

def column_renamer(df, suffix, append):

    if append:
        new_column_names = list(map(lambda x: x + suffix, df.columns))
    else:
        new_column_names = list(map(lambda x: x.replace(suffix, ""), df.columns))

    return df.toDF(*new_column_names)

def get_hash(df, keys_list):

    columns = [col(column) for column in keys_list]

    if columns:
        hash_values = df.withColumn("hash_md5", md5(concat_ws("_", *columns)))
    else:
        hash_values = df.withColumn("hash_md5", md5(lit(1)))

    return hash_values
        

In [6]:
def read_dataframe(self, table_name: str, mode: str):

    df = spark.read.format("bigquery"). \
        option("project", "PROJECT_ID"). \
        load(f"pyspark_scd.{table_name}")
    
    return df

def write_dataframe(self, df, table_name: str, mode: str) -> None:

    df.write.format("bigquery") \
            .option("project", "PROJECT_ID") \
            .option("parentProject", "PROJECT_ID") \
            .option("table", f"pyspark_scd.{table_name}") \
            .option("writeMethod", "direct") \
            .mode(mode) \
            .save()
    return

In [358]:
window_spec  = Window.orderBy("customerid")

customers_df = spark.read.format("jdbc"). \
                            option("url", url). \
                            option("driver", "org.postgresql.Driver"). \
                            option("dbtable", "source"). \
                            option("user", os.environ.get("user_name")). \
                            option("password", os.environ.get("pdb_pass")). \
                            load(). \
                            withColumn("sk_customer_id", row_number().over(window_spec)).\
                            withColumn("effective_date", date_format(current_date(), "yyyy-MM-dd")). \
                            withColumn("expiration_date", date_format(lit("9999-12-31"), "yyyy-MM-dd")). \
                            withColumn("is_current", lit(True))

customers_df.show()

# customers_df = spark.read. \
#                     options(header=True, delimiter=",", inferSchema=True). \
#                     csv('data/customers_mockaroo/customers_5.csv'). \
#                     withColumn("sk_customer_id", monotonically_increasing_id() + 1).\
#                     withColumn("effective_date", date_format(current_date(), "yyyy-MM-dd")). \
#                     withColumn("expiration_date", date_format(lit("9999-12-31"), "yyyy-MM-dd")). \
#                     withColumn("is_current", lit(True))

# customers_df.show()

+----------+-----+----------+----------+------------+--------------------+-------------------+--------+--------------+--------------+---------------+----------+
|CustomerID|Title|first_name| last_name|company_name|               email|              phone|zip_code|sk_customer_id|effective_date|expiration_date|is_current|
+----------+-----+----------+----------+------------+--------------------+-------------------+--------+--------------+--------------+---------------+----------+
|         1| Mrs.|     Tiena|     Lowey|        Eare|tlowey0@barnesand...|  +1 (904) 136-7774|   32230|             1|    2023-11-27|     9999-12-31|      true|
|         2| Mrs.|   Patrica|McGrowther|      google|pmcgrowther1@frie...|  +1 (772) 846-7575|   34981|             2|    2023-11-27|     9999-12-31|      true|
|         3|  Mr.|     Wilma|     Pryce|    Feedfire| wpryce2@4shared.com| +66 (425) 212-5029|   10250|             3|    2023-11-27|     9999-12-31|      true|
|         4|  Mr.|  Ethelind|    B

In [319]:

customers_df.write.format("jdbc"). \
            mode('overwrite'). \
            option("url", url). \
            option("driver", "org.postgresql.Driver"). \
            option("dbtable", "incremental"). \
            option("user", os.environ.get("user_name")). \
            option("password", os.environ.get("pdb_pass")). \
            save()
            

In [359]:
customers_current = spark.read.format("jdbc"). \
                            option("url", url). \
                            option("driver", "org.postgresql.Driver"). \
                            option("dbtable", "source"). \
                            option("user", os.environ.get("user_name")). \
                            option("password", os.environ.get("pdb_pass")). \
                            load()

customers_history = spark.read.format("jdbc"). \
                                option("url", url). \
                                option("driver", "org.postgresql.Driver"). \
                                option("dbtable", "incremental"). \
                                option("user", os.environ.get("user_name")). \
                                option("password", os.environ.get("pdb_pass")). \
                                load(). \
                                withColumn("effective_date", to_date("effective_date")). \
                                withColumn("expiration_date", to_date("expiration_date"))
                                

customers_current.show()
customers_history.show()

+----------+-----+----------+----------+------------+--------------------+-------------------+--------+
|CustomerID|Title|first_name| last_name|company_name|               email|              phone|zip_code|
+----------+-----+----------+----------+------------+--------------------+-------------------+--------+
|         1| Mrs.|     Tiena|     Lowey|        Eare|tlowey0@barnesand...|  +1 (904) 136-7774|   32230|
|         3|  Mr.|     Wilma|     Pryce|    Feedfire| wpryce2@4shared.com| +66 (425) 212-5029|   10250|
|         4|  Mr.|  Ethelind|    Boydon|  Divanoodle|eboydon3@national...|+385 (310) 593-6426|   51216|
|         6|  Mr.|   Aravind|   Jarpala|     netflix|    test@netflix.com|   +1 (91) 794-6599|   11005|
|         2| Mrs.|   Patrica|McGrowther|      google|pmcgrowther1@frie...|  +1 (772) 846-7575|   34981|
+----------+-----+----------+----------+------------+--------------------+-------------------+--------+

+----------+-----+----------+----------+------------+----------

##### Hashing & Merging

In [360]:
max_sk = customers_history.agg({"sk_customer_id": "max"}).collect()[0][0]
print("Maximum surrogate Key id", max_sk)
scd_columns = ["company_name", "email", "phone", "zip_code"]

df_history_open = customers_history.where(col("is_current"))
df_history_closed = customers_history.where(col("is_current") == lit(False))


customers_history_open_hash = column_renamer(get_hash(df_history_open, scd_columns), suffix="_history", append=True)

customers_current_hash = column_renamer(get_hash(customers_current, scd_columns), suffix="_current", append=True)


merged_df = customers_history_open_hash\
            .join(customers_current_hash, col("CustomerID_current") ==  col("CustomerID_history"), how="full_outer")\
            .withColumn("Action", when(col("hash_md5_current") == col("hash_md5_history")  , 'NOCHANGE')\
            .when(col("CustomerID_current").isNull(), 'DELETE')\
            .when(col("CustomerID_history").isNull(), 'INSERT')\
            .otherwise('UPDATE'))

merged_df.show()
df_history_closed.show()


Maximum surrogate Key id 7
+------------------+-------------+------------------+-----------------+--------------------+--------------------+-------------------+----------------+----------------------+----------------------+-----------------------+------------------+--------------------+------------------+-------------+------------------+-----------------+--------------------+--------------------+-------------------+----------------+--------------------+--------+
|CustomerID_history|Title_history|first_name_history|last_name_history|company_name_history|       email_history|      phone_history|zip_code_history|sk_customer_id_history|effective_date_history|expiration_date_history|is_current_history|    hash_md5_history|CustomerID_current|Title_current|first_name_current|last_name_current|company_name_current|       email_current|      phone_current|zip_code_current|    hash_md5_current|  Action|
+------------------+-------------+------------------+-----------------+--------------------+-

##### NO CHANGE 

In [361]:
window_spec  = Window.orderBy("customerid")
no_change = column_renamer(merged_df.filter(col("action") == 'NOCHANGE'), suffix="_history", append=False). \
                                    select(df_history_open.columns)

no_change.show()

+----------+-----+----------+----------+------------+--------------------+-------------------+--------+--------------+--------------+---------------+----------+
|CustomerID|Title|first_name| last_name|company_name|               email|              phone|zip_code|sk_customer_id|effective_date|expiration_date|is_current|
+----------+-----+----------+----------+------------+--------------------+-------------------+--------+--------------+--------------+---------------+----------+
|         1| Mrs.|     Tiena|     Lowey|        Eare|tlowey0@barnesand...|  +1 (904) 136-7774|   32230|             1|    2023-11-27|     9999-12-31|      true|
|         2| Mrs.|   Patrica|McGrowther|      google|pmcgrowther1@frie...|  +1 (772) 846-7575|   34981|             6|    2023-11-27|     9999-12-31|      true|
|         3|  Mr.|     Wilma|     Pryce|    Feedfire| wpryce2@4shared.com| +66 (425) 212-5029|   10250|             3|    2023-11-27|     9999-12-31|      true|
|         4|  Mr.|  Ethelind|    B

##### INSERT

In [362]:

customers_insert = column_renamer(merged_df.filter(col("action") == 'INSERT'), suffix="_current", append=False). \
                                                    select(customers_current.columns). \
                                                    withColumn("effective_date", date_format(current_date(), "yyyy-MM-dd")). \
                                                    withColumn("expiration_date", date_format(lit("9999-12-31"), "yyyy-MM-dd")). \
                                                    withColumn("row_number",row_number().over(window_spec)). \
                                                    withColumn("sk_customer_id",col("row_number")+ max_sk). \
                                                    withColumn("is_current", lit(True)). \
                                                    drop("row_number")

    
customers_insert.show()

+----------+-----+----------+---------+------------+-----+-----+--------+--------------+---------------+--------------+----------+
|CustomerID|Title|first_name|last_name|company_name|email|phone|zip_code|effective_date|expiration_date|sk_customer_id|is_current|
+----------+-----+----------+---------+------------+-----+-----+--------+--------------+---------------+--------------+----------+
+----------+-----+----------+---------+------------+-----+-----+--------+--------------+---------------+--------------+----------+



##### DELETE

In [363]:
if customers_insert.isEmpty():
    max_sk = merged_df.agg({"sk_customer_id_history": "max"}).collect()[0][0]
else:
    max_sk = customers_insert.agg({"sk_customer_id": "max"}).collect()[0][0]
print("Maximum surrogate Key Id: ", max_sk)
customers_delete = column_renamer(merged_df.filter(col("action") == 'DELETE'), suffix="_history", append=False). \
                                            select(df_history_open.columns). \
                                            withColumn("expiration_date", date_format(current_date(), "yyyy-MM-dd")). \
                                            withColumn("is_current", lit(False))

customers_delete.show()

Maximum surrogate Key Id:  7
+----------+-----+----------+---------+------------+--------------------+-----------------+--------+--------------+--------------+---------------+----------+
|CustomerID|Title|first_name|last_name|company_name|               email|            phone|zip_code|sk_customer_id|effective_date|expiration_date|is_current|
+----------+-----+----------+---------+------------+--------------------+-----------------+--------+--------------+--------------+---------------+----------+
|         5|  Mr.| Westleigh|   Armell|  Divanoodle|warmell4@national...|+1 (549) 794-6599|   11005|             5|    2023-11-27|     2023-11-27|     false|
+----------+-----+----------+---------+------------+--------------------+-----------------+--------+--------------+--------------+---------------+----------+



##### UPDATE

In [364]:
customers_update = column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_history", append=False). \
                                            select(df_history_open.columns). \
                                            withColumn("expiration_date", date_format(current_date(), "yyyy-MM-dd")). \
                                            withColumn("is_current", lit(False)). \
                    unionByName(
                    column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_current", append=False). \
                                            select(customers_current.columns). \
                                            withColumn("effective_date", date_format(current_date(), "yyyy-MM-dd")). \
                                            withColumn("expiration_date", date_format(lit("9999-12-31"), "yyyy-MM-dd")). \
                                            withColumn("row_number",row_number().over(window_spec)). \
                                            withColumn("sk_customer_id",col("row_number")+ max_sk). \
                                            withColumn("is_current", lit(True)). \
                                            drop("row_number")
                                            )

customers_update.show()

+----------+-----+----------+---------+------------+-----+-----+--------+--------------+--------------+---------------+----------+
|CustomerID|Title|first_name|last_name|company_name|email|phone|zip_code|sk_customer_id|effective_date|expiration_date|is_current|
+----------+-----+----------+---------+------------+-----+-----+--------+--------------+--------------+---------------+----------+
+----------+-----+----------+---------+------------+-----+-----+--------+--------------+--------------+---------------+----------+



##### UNION ALL DF

In [365]:
final_df = df_history_closed.unionByName(no_change). \
                            unionByName(customers_insert). \
                            unionByName(customers_delete). \
                            unionByName(customers_update)
final_df.show()


final_df.write.format("jdbc"). \
            mode('overwrite'). \
            option("url", url). \
            option("driver", "org.postgresql.Driver"). \
            option("dbtable", "scd"). \
            option("user", os.environ.get("user_name")). \
            option("password", os.environ.get("pdb_pass")). \
            save()

sample = spark.read.format("jdbc"). \
                option("url", url). \
                option("driver", "org.postgresql.Driver"). \
                option("dbtable", "scd"). \
                option("user", os.environ.get("user_name")). \
                option("password", os.environ.get("pdb_pass")). \
                load()
                
sample.write.format("jdbc"). \
            mode('overwrite'). \
            option("url", url). \
            option("driver", "org.postgresql.Driver"). \
            option("dbtable", "incremental"). \
            option("user", os.environ.get("user_name")). \
            option("password", os.environ.get("pdb_pass")). \
            save()


+----------+-----+----------+----------+------------+--------------------+-------------------+--------+--------------+--------------+---------------+----------+
|CustomerID|Title|first_name| last_name|company_name|               email|              phone|zip_code|sk_customer_id|effective_date|expiration_date|is_current|
+----------+-----+----------+----------+------------+--------------------+-------------------+--------+--------------+--------------+---------------+----------+
|         2| Mrs.|   Patrica|McGrowther|    Wikkibox|pmcgrowther1@frie...|  +1 (772) 846-7575|   34981|             2|    2023-11-27|     2023-11-27|     false|
|         1| Mrs.|     Tiena|     Lowey|        Eare|tlowey0@barnesand...|  +1 (904) 136-7774|   32230|             1|    2023-11-27|     9999-12-31|      true|
|         2| Mrs.|   Patrica|McGrowther|      google|pmcgrowther1@frie...|  +1 (772) 846-7575|   34981|             6|    2023-11-27|     9999-12-31|      true|
|         3|  Mr.|     Wilma|     

In [366]:
spark.stop()