In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable


# Data Reading

In [0]:
df_cust = spark.sql("select * from aws_db_retail_metastore.silver.customers_silver")

In [0]:
df_cust.display()

In [0]:
df_cust.count()

In [0]:
## removing duplicates
df_cust = df_cust.dropDuplicates(subset=['customer_id'])

df_cust.count()


# Dividing old records vs new records 

In [0]:
if spark.catalog.tableExists("aws_db_retail_metastore.gold.dim_customers"):
    
    df_temp = spark.sql("""select dim_cust_id,customer_id,create_date, update_date
                       from aws_db_retail_metastore.gold.dim_customers""")

else:
    
    df_temp = spark.sql("""select 0 as dim_cust_id,0 as customer_id,0 as create_date,0 as update_date
                       from aws_db_retail_metastore.silver.customers_silver
                       where 1=0""")

In [0]:
df_temp.display()

In [0]:
# renaming old df columns for easier dropping later
df_temp = df_temp.withColumnRenamed("customer_id","customer_id_old")\
    .withColumnRenamed("create_date","create_date_old")\
        .withColumnRenamed("update_date","update_date_old")\
            

 Applying joins to identify new and old records. if joins result null for the dim_cust_id column, then it is a new record

In [0]:
df_join = df_cust.join(df_temp,df_cust.customer_id == df_temp.customer_id_old,"left")


In [0]:
# initial load will show all new columns as null
df_join.display()


Separting the values old vs new

In [0]:
df_new = df_join.filter(df_join["dim_cust_id"].isNull())

df_old = df_join.filter(df_join["dim_cust_id"].isNotNull())



preparing df_old by adding create and update dates


In [0]:
# dropping unnecessary colunmns
df_old = df_old.drop("customer_id_old","update_date_old")


#making create date same as old create date as this will not chnage
df_old = df_old.withColumnRenamed("create_date_old","create_date")
df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))

# recreating the update date column with current timestamp

df_old = df_old.withColumn("update_date",current_timestamp())



In [0]:
df_old.display()


preparing df_new in the same way

In [0]:
# dropping unnecessary colunmns
df_new = df_new.drop("customer_id_old","update_date_old","create_date_old")




# recreating the update date column and create date column with current timestamp for initial

df_new = df_new.withColumn("update_date",current_timestamp())
df_new = df_new.withColumn("create_date",current_timestamp())


In [0]:
df_new.display()

Adding surrogate keys now

In [0]:
df_new = df_new.withColumn("dim_cust_id",monotonically_increasing_id()+lit(1))


Adding max surrogate key condition to handle records after initial load

In [0]:
if spark.catalog.tableExists("aws_db_retail_metastore.gold.dim_customers"):
    df_maxsur = spark.sql("select max(dim_cust_id) as max_surr_key from aws_db_retail_metastore.gold.dim_customers")
    max_surr_key = df_maxsur.collect()[0]["max_surr_key"]
else:
    max_surr_key = 0

df_new = df_new.withColumn("dim_cust_id",df_new["dim_cust_id"]+lit(max_surr_key))


At end, to combine both the df to add the new records in existing table

In [0]:
df_new.display()

In [0]:
df_final = df_old.unionByName(df_new)

df_final.display()


SCD TYPE 1

In [0]:
#implementing upsert in databricks

if spark.catalog.tableExists("aws_db_retail_metastore.gold.dim_customers"):
    #table exists so upsert here

    dlt_obj = DeltaTable.forPath(spark,"s3://argha-associatede-s3/gold/dim_customers")

    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.dim_cust_id = src.dim_cust_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    # if initiaL load
    df_final.write.mode("overwrite")\
        .format("delta")\
        .option("path","s3://argha-associatede-s3/gold/dim_customers")\
        .saveAsTable("aws_db_retail_metastore.gold.dim_customers")
    
    

In [0]:
df_test = spark.sql("select * from aws_db_retail_metastore.gold.dim_customers")

df_test.orderBy("dim_cust_id").display()