In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
df = spark.sql("select * from project1_catalog.silver.customers_silver")

In [0]:
df.display()

In [0]:
df = df.dropDuplicates(subset=['customer_id'])

### **New vs Old records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomersKey, customer_id, create_date,update_date from project1_catalog.gold.DimCustomers''')

else:
    df_old = spark.sql('''select 0 DimCustomersKey, 0 customer_id, 0 create_date,0 update_date from project1_catalog.silver.customers_silver where 1=0''')

In [0]:
df_old.display()

In [0]:
df_old = df_old.withColumnRenamed("DimCustomersKey", "old_DimCustomersKey")\
    .withColumnRenamed("customer_id", "old_customer_id")\
    .withColumnRenamed("create_date", "old_create_date")\
    .withColumnRenamed("update_date", "old_update_date")

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')

In [0]:
df_join.display()

In [0]:
df_new = df_join.filter(df_join['old_DimCustomersKey'].isNull())

In [0]:
df_new.display()

In [0]:
df_old = df_join.filter(df_join['old_DimCustomersKey'].isNotNull())
display(df_old)

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop("old_customer_id","old_update_date")

# Renaming "old_DimCustomersKey" column to "DimCustomersKey"
df_old= df_old.withColumnRenamed("old_DimCustomersKey","DimCustomersKey")

# Renaming "old_create_date" column to "create_date"
df_old= df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))

# Recreating "update_date" column
df_old= df_old.withColumn("update_date",current_timestamp())

### **Prepering df_new**

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop("old_DimCustomersKey","old_customer_id","old_update_date","old_create_date")

# Recreating "update_date" column
df_new= df_new.withColumn("update_date",current_timestamp())
df_new= df_new.withColumn("create_date",current_timestamp())

In [0]:
df_new.display()

In [0]:
df_new = df_new.withColumn("DimCustomersKey",monotonically_increasing_id()+lit(1)) #because monotonically increasing id starts from zero
df.limit(10).display()

### **Adding MAX surrogate key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key =0

else:
    df_maxsur = spark.sql("select max(DimCustomersKey) as max_surrogate_key from project1_catalog.gold.DimCustomers")
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("DimCustomersKey",lit(max_surrogate_key)+col("DimCustomersKey"))

In [0]:
df_new.printSchema()

In [0]:
df_old.display()

### **Union of df_old and df_new**

In [0]:
df_final =df_new.unionByName(df_old)

In [0]:
df_final.display()

### **SCD 1: Upsert Condition**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("project1_catalog.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(
        spark, "abfss://gold@project1ds.dfs.core.windows.net/DimCustomers"
    )

    dlt_obj.alias("trg").merge(
        df_final.alias("src"), "trg.DimCustomersKey = src.DimCustomersKey"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    df_final.write.mode("overwrite").option(
        "path", "abfss://gold@project1ds.dfs.core.windows.net/DimCustomers"
    ).saveAsTable("project1_catalog.gold.DimCustomers")

In [0]:
%sql
select * from project1_catalog.gold.dimcustomers