In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

## Data Reading

In [0]:
df = spark.read.table("retail_cat.silver_sch.customer_silver")
display(df)

**Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset =["customer_id"])
display(df.limit(10))

## Dividing new vs old records

In [0]:
if init_load_flag == 0:

  df_old = spark.sql("select DimCustomerkey, customer_id, create_date, update_date from retail_cat.gold_sch.DimCustomers")
                    
else:
  
  df_old = spark.sql("select 0 as DimCustomerkey, 0 customer_id, 0 create_date,0 update_date from retail_cat.silver_Sch.customer_silver where 1=0")


In [0]:
df_old.display()

**Renaming columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerkey","DimCustomerkey_old")\
               .withColumnRenamed("customer_id","customer_id_old")\
               .withColumnRenamed("create_date","create_date_old")\
               .withColumnRenamed("update_date","update_date_old")


**Applying join with old records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.customer_id_old , "left")
df_join.display()

**Seperating new vs old records**

In [0]:
df_new = df_join.filter(df_join.DimCustomerkey_old.isNull())


In [0]:
df_old = df_join.filter(df_join.DimCustomerkey_old.isNotNull())


**Preparing old table**

In [0]:
#Dropping all columns which are not required.
df_old = df_old.drop("customer_id_old","update_date_old")

#Renaming DimCustomerkey_old to DimCustomerkey
df_old = df_old.withColumnRenamed("DimCustomerkey_old","DimCustomerkey")

#Renaming create_date_old column to create date
df_old = df_old.withColumnRenamed("create_date_old","create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

#Creating update date column with current timestamp
df_old = df_old.withColumn("update_date",lit(current_timestamp()))

df_old.display()

               


**Preparing df_new**

In [0]:
#Dropping all columns which are not required.
df_new= df_new.drop("customer_id_old","DimCustomerkey_old","update_date_old","create_date_old")


#Creating "update_date" ,"create-date" column with current timestamp
df_new = df_new.withColumn("update_date",lit(current_timestamp()))
df_new = df_new.withColumn("create_date",lit(current_timestamp()))


df_new.limit(10).display()

In [0]:
df_new.display()

**Surrogate key-From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerkey",monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

**Addimg max surrogate key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0

else:
    df_maxsurr = spark.sql("select max(DimCustomerkey) as max_surrogate_key from retail_cat.gold_sch.DimCustomers")
    max_surrogate_key = df_maxsurr.collect()[0]['max_surrogate_key']  #Converting df_maxsurr to maxsurrogate key variable

In [0]:
df_new = df_new.withColumn("DimCustomerkey",lit(max_surrogate_key)+ col("DimCustomerkey"))

In [0]:
df_new.limit(10).display()

**Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)
df_final.limit(10).display()

## SCD-Type 1

In [0]:
if spark.catalog.tableExists("retail_cat.gold_sch.DimCustomers"):
    print("Exists")
else:
    print("Not_Exists")

In [0]:
from delta.tables import DeltaTable


In [0]:
if  (spark.catalog.tableExists("retail_cat.gold_sch.DimCustomers")):
    
    dlt_obj =  DeltaTable.forName(spark,"retail_cat.gold_sch.DimCustomers")
 
    dlt_obj.alias("tgt").merge(df_final.alias("src"),"tgt.DimCustomerkey = src.DimCustomerkey")\
                     .whenMatchedUpdateAll()\
                     .whenNotMatchedInsertAll()\
                     .execute()
  
else:
     df_final.write.mode("overwrite")\
         .format("delta")\
         .option("path","abfss://gold@adlscardatalake.dfs.core.windows.net/DimCustomers")\
         .saveAsTable("retail_cat.gold_sch.DimCustomers")


In [0]:
%sql
select * from retail_cat.gold_sch.DimCustomers