In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
df = spark.sql("select * from databricks_catalog.silver.customers_silver")
df.display()

### **Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset=['customer_id'])

### **Dividing New vs Old Records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date 
                       from databricks_catalog.gold.DimCustomers''')
else:
    df_old = spark.sql('''select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date 
                       from databricks_catalog.silver.customers_silver where 1=0''')

In [0]:
df_old.display()

### **Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "DimCustomerKey_old")\
    .withColumnRenamed("customer_id", "customer_id_old")\
    .withColumnRenamed("create_date", "create_date_old")\
    .withColumnRenamed("update_date", "update_date_old")


### **Applying join with the old records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.customer_id_old, how='left')
df_join.display()

### **Seperating new and old records**

In [0]:
df_new = df_join.filter(df_join['customer_id_old'].isNull())
df_new.display()

In [0]:
df_old = df_join.filter(df_join['customer_id_old'].isNotNull())
df_old.display()

### **Preparing df_old**

In [0]:
#dropping unnecessary columns
df_old = df_old.drop("customer_id_old", "update_date_old")

#renaming "DimCustomerKey_old" to "DimCustomerKey"
df_old = df_old.withColumnRenamed("DimCustomerKey_old", "DimCustomerKey")

#renaming "create_date_old" to "create_date"
df_old = df_old.withColumnRenamed("create_date_old", "create_date")
df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))

#recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())

In [0]:
df_old.display()

### **Preparing df_new**

In [0]:
#dropping unnecessary columns
df_new = df_new.drop("DimCustomerKey_old", "customer_id_old", "update_date_old", "create_date_old")

#recreating "update_date", "create_date" column with current timestamp
df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp())

### **Surrogate Key**

In [0]:
df_new = df_new.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

In [0]:
df_old.display()

### **Adding max surrogate key**

In [0]:
if init_load_flag ==1:
    max_surrogate_key = 0

else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_catalog.gold.Dimcustomers")
    #Coverting df_maxsur to max_surrogate_key variable
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col('DimCustomerKey'))

### **Union of df_new and df_old**

In [0]:
df_final = df_new.unionByName(df_old)
df_final.display()

### **SCD type-1**

In [0]:
from delta.tables import *

In [0]:
if spark.catalog.tableExists("databricks_catalog.gold.Dimcustomers"):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databricksstorage222.dfs.core.windows.net/Dimcustomers")

    dlt_obj.alias("trg").merge(df_final.alias("src"),
    "trg.DimCustomerKey = src.DimCustomerKey") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
    
else:
    df_final.write.mode("overwrite") \
        .option("path", "abfss://gold@databricksstorage222.dfs.core.windows.net/Dimcustomers") \
        .format("delta") \
        .saveAsTable("databricks_catalog.gold.Dimcustomers")

In [0]:
%sql
select * from databricks_catalog.gold.dimcustomers