In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
dbutils.widgets.text("init_load_flag", "0")
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
df = spark.sql("select * from databricks_cata.silver.customers_silver")
df.display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


### **Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset = ['customer_id'])
df.display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


### **Surrogate key - All the values**

In [0]:
df =  df.withColumn("DimCustomerKey",monotonically_increasing_id() + lit(1))
df.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10


### **Dividing Old V/S New Records**

In [0]:
%python
# Check if the DimCustomers table exists
table_exists = spark.catalog.tableExists("databricks_cata.gold.DimCustomers")

if init_load_flag == 0 and table_exists:
    df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date
                       from databricks_cata.gold.DimCustomers ''')
else:
    # Return empty DataFrame with correct schema for initial load or when table doesn't exist
    df_old = spark.sql('''select cast(0 as bigint) as DimCustomerKey, 
                                 cast(0 as bigint) as customer_id, 
                                 cast(null as timestamp) as create_date, 
                                 cast(null as timestamp) as update_date
                       from databricks_cata.silver.customers_silver where 1 = 0 ''')

In [0]:
df_old.display()

DimCustomerKey,customer_id,create_date,update_date


### **Renaming columns of df_olf**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
                .withColumnRenamed("customer_id", "old_customer_id")\
                .withColumnRenamed("create_date", "old_create_date")\
                .withColumnRenamed("update_date", "old_update_date")

In [0]:
df_old.display()

old_DimCustomerKey,old_customer_id,old_create_date,old_update_date


### **Applying join with old records**

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['old_customer_id'], 'left')
df_join.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,,,,
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,,,,
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,,,,
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,,,,
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,,,,
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,,,,
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,,,,
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,,,,
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,,,,
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,,,,


### **seperating new vs old**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())

In [0]:
df_old = df_join.filter(df_join['old_DimCustomerKey'].isNotNull())

### **Preparing Df_old**

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop('old_DimCustomerKey', 'old_customer_id', 'old_update_date')

# Renaming "old_create_date"  column to "create_date"
df_old = df_old.withColumnRenamed('old_create_date', 'create_date')
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

#Recreating "update_date" column with current timestamp
df_old = df_old.withColumn('update_date', current_timestamp())


In [0]:
df_old.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,create_date,update_date


In [0]:
df_new.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,,,,
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,,,,
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,,,,
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,,,,
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,,,,
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,,,,
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,,,,
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,,,,
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,,,,
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,,,,


### **preparing old df**

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop('old_DimCustomerKey', 'old_customer_id', 'old_update_date','old_create_date')

#Recreating "update_date",current_date column with current timestamp
df_new = df_new.withColumn('update_date', current_timestamp())
df_new = df_new.withColumn('create_date', current_timestamp())


In [0]:
df_new.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,2026-02-01T20:09:13.983Z,2026-02-01T20:09:13.983Z


### **Surrogate key - from 1**

In [0]:
df_new =  df_new.withColumn("DimCustomerKey",monotonically_increasing_id() + lit(1))
df_new.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,2026-02-01T20:09:14.844Z,2026-02-01T20:09:14.844Z


### **Adding Max Surrogate key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    # Check if table exists before querying
    if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
        df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")
        # Converting df_maxsur to max_surrogate_key variable
        max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']
        # Handle NULL case (empty table)
        if max_surrogate_key is None:
            max_surrogate_key = 0
    else:
        # Table doesn't exist yet, treat as initial load
        max_surrogate_key = 0

In [0]:
df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key) + col("DimCustomerKey"))

### **union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,2026-02-01T20:09:16.051Z,2026-02-01T20:09:16.051Z


### **SCD Type 1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
  dlt_obj = DeltaTable.forPath(spark,"abfss://gold@dedatabricksproject.dfs.core.windows.net/DimCustomers")

  dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
      .execute()
else:
  df_final.write.mode("overwrite")\
  .format("delta")\
  .option("path","abfss://gold@dedatabricksproject.dfs.core.windows.net/DimCustomers")\
  .saveAsTable("databricks_cata.gold.DimCustomers")

In [0]:
%sql
select * from databricks_cata.gold.dimcustomers

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,1,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00045,dkhan@hotmail.com,North Kara,OK,hotmail.com,Matthew Lee,2,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00049,kristinawalsh@gmail.com,New Heatherside,IA,gmail.com,Mike Harvey,3,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00056,cruzkristen@hotmail.com,Catherineberg,WA,hotmail.com,Tina Cantu,4,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00062,kimdennis@yahoo.com,Kaitlynburgh,MI,yahoo.com,Shelley Holland,5,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00094,rogersrandall@gonzales.org,Harrisonmouth,AK,gonzales.org,Hannah Olson,6,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00104,gloria15@yahoo.com,New Danielle,VA,yahoo.com,Patrick Meadows,7,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00105,christine52@kennedy.com,Crystalmouth,AL,kennedy.com,April Wright,8,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00126,bonniewhite@smith.com,Gregoryton,WA,smith.com,Amanda King,9,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
C00152,tara25@guzman-nelson.net,Ashleyside,ID,guzman-nelson.net,David Salazar,10,2026-02-01T20:09:17.888Z,2026-02-01T20:09:17.888Z
