In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text("initial_load_flag", "0")

In [0]:
initial_load_flag = int(dbutils.widgets.get("initial_load_flag"))

### **Data Reading from Source Table (Silver Layer)**

In [0]:
df = spark.sql(
  "select * from rsa_cata.silver.customers_silver"
)

**Removing Duplicates**

In [0]:
df.dropDuplicates(subset=['customer_id'])

### **Dividing New vs Old Records**

-- Here in the else clause, I have considered 0 as all the column values in the else clause because I only want the table structure.

In [0]:
if initial_load_flag == 0:
  df_old = spark.sql(
    '''select DimCustomerKey, customer_id, create_date, update_date
       from rsa_cata.gold.DimCustomers'''
  )

else:
  df_old = spark.sql(
    '''select 0 as DimCustomerKey, 0 as customer_id, 0 as create_date, 0 as update_date
       from rsa_cata.silver.customers_silver where 1=0'''
  )

In [0]:
df_old.display()

**Renaming Columns of df_old for better column management**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
    .withColumnRenamed("customer_id", "old_customer_id")\
    .withColumnRenamed("create_date", "old_create_date")\
    .withColumnRenamed("update_date", "old_update_date")

**Applying Join with the Old Records**

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['old_customer_id'], 'left')


In [0]:
df_join.limit(5).display()

**Separating Old vs New Records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())
df_new.limit(5).display()

In [0]:
df_old = df_join.filter(df_join['old_DimcustomerKey'].isNotNull())
df_old.limit(5).display()

**Preparing df_old**

In [0]:
# Dropping the Unneccessary columns
#Here, I am dropping the old_update_date column and not the old_create_date column because create date will always be same but update date will change.
df_old = df_old.drop("old_customer_id", "old_update_date")

# Renaming "old_DimCustomerKey" to "DimCustomerKey"
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

# Renaming "old_create_date" to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn('create_date', to_timestamp(col('create_date')))

# Recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())

In [0]:
df_old.display()

**Preparing df_new**

In [0]:
# Dropping the Unneccessary columns
df_new = df_new.drop("old_DimCustomerKey", "old_customer_id", "old_create_date", "old_update_date")

# Recreating "update_date", "current_date" column with current timestamp
df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp())

In [0]:
df_new.limit(5).display()

**Surrogate Key - Starting from 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))

In [0]:
df_new.limit(5).display()

**Adding Max Surrogate Key for Incremental Load**

In [0]:
if initial_load_flag == 1:
    max_surrogate_key = 0
else:
    df_max_sur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from rsa_cata.gold.DimCustomers")
    # Converting df_max_sur to max_surrogate_key variable
    max_surrogate_key = df_max_sur.collect()[0]['max_surrogate_key']

In [0]:
# Adding the max_surrogate_key to the df_new

df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key)+col("DimCustomerKey"))

**Union of df_old & df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

**Removing Duplicate Customer Keys for smooth MERGE operation**

In [0]:
df_dedup = (
    df_final
    .withColumn("rn", row_number().over(Window.partitionBy("DimCustomerKey").orderBy(col("update_date").desc())))
    .filter(col("rn") == 1)
    .drop("rn")
)

### **SCD - 1**

In [0]:
# Using the MERGE condition

if spark.catalog.tableExists("rsa_cata.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@retailsalesanalysissanil.dfs.core.windows.net/DimCustomers")
    dlt_obj.alias("trg").merge(df_dedup.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_final.write.format("delta")\
        .mode("overwrite")\
        .option("path", "abfss://gold@retailsalesanalysissanil.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("rsa_cata.gold.DimCustomers")

In [0]:
%sql
select * from rsa_cata.gold.dimcustomers