In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:

init_load_flag=int(dbutils.widgets.get("init_load_flag"))

####**Data Reading**

In [0]:
df=spark.sql("select * from databricks_catalog.silver.customers")

In [0]:
df=df.dropDuplicates(subset=["customer_id"])
df.limit(10).display()

# ## **Dividing New Vs Old records**

In [0]:
if init_load_flag==0:
  df_old=spark.sql("select Dimcustomerkey,customer_id,create_date,update_date from databricks_catalog.gold.DimCustomer")

else:
  df_old=spark.sql("select 0 Dimcustomerkey,0 customer_id,0 create_date,0 update_date from databricks_catalog.silver.customers where 1=0")

In [0]:
df_old.display()

#####Renaming Columns of df_old

In [0]:
df_old=df_old.withColumnRenamed("Dimcustomerkey","old_DimCustomerKey")\
    .withColumnRenamed("customer_id","old_customer_id")\
    .withColumnRenamed("create_date","old_create_date")\
    .withColumnRenamed("update_date","old_update_date")

#####Applying join with old records

In [0]:
df_join=df.join(df_old,df.customer_id==df_old.old_customer_id,"left")
df_join.display()

#####Separating New and old records 

In [0]:
df_new=df_join.filter(df_join.old_DimCustomerKey.isNull())
df_old=df_join.filter(df_join.old_DimCustomerKey.isNotNull())

df_new.display()
df_old.display()
                    

#####Preparing df_old

In [0]:
#Dropping all columns which are not required

df_old=df_old.drop("old_customer_id","old_update_date")

#Renaming old_DimCustomerKey column to DimCustomerKey

df_old=df_old.withColumnRenamed("old_DimCustomerKey","DimCustomerKey")


#Renaming old_create_date column to create_date

df_old=df_old.withColumnRenamed("old_create_date","create_date")
df_old=df_old.withColumn('create_date',to_timestamp(col("create_date")))

#Recreating update_date column  

df_old=df_old.withColumn("update_date",current_timestamp())

In [0]:
df_old.display()

#####Preparing df_new

In [0]:
df_new=df_new.drop("old_DimCustomerKey","old_customer_id","old_create_date","old_update_date")

#Recreating create_date and update_date columns with current timestamp

df_new=df_new.withColumn("create_date",current_timestamp())
df_new=df_new.withColumn("update_date",current_timestamp())

In [0]:
df_new.display()

####Surrogate Key - From 1

In [0]:
df_new=df_new.withColumn('DimCustomerKey',monotonically_increasing_id()+lit(1))

#####Adding max surrogate key

In [0]:
if init_load_flag==1:
  max_surrogate_key=0

else:
  df_max_surrogate_key=spark.sql("select max(DimCustomerKey) from databricks_catalog.gold.DimCustomer")

  #converting  df_max_surrogate_key to max_surrogate_key  
  max_surrogate_key=df_max_surrogate_key.collect()[0]['max(DimCustomerKey)']


In [0]:
df_new=df_new.withColumn('DimCustomerKey',lit(max_surrogate_key)+col("DimCustomerKey"))

#####Union of df_old and df_new

In [0]:
df_final=df_old.unionByName(df_new)
df_final.display()

## ####SCD Type -1

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_catalog.gold.DimCustomer"):
    dlt_obj=DeltaTable.forPath(spark,"abfss://gold@databrickseteprojectsa.dfs.core.windows.net/DimCustomer")
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.customer_id=src.customer_id")\
        .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
                .execute()

else:
    df_final.write.mode("overwrite").format("delta")\
        .option("path","abfss://gold@databrickseteprojectsa.dfs.core.windows.net/DimCustomer")\
            .saveAsTable("databricks_catalog.gold.DimCustomer")

In [0]:
%sql

Select * from  databricks_catalog.gold.DimCustomer