In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

### **Data Reading From Source**

In [0]:
df = spark.sql("SELECT * FROM databricks_catalog.silver.customers")
df.display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


**Removing Duplicates**

In [0]:
df = df.dropDuplicates(['customer_id'])
df.display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


**Dividing Old Vs New Records**

In [0]:
# incremental load
# We have table created in the gold layer
if spark.catalog.tableExists("databricks_catalog.gold.dim_customers"):
  df_old = spark.sql('''SELECT dim_customer_key, customer_id, create_date, update_date 
                     FROM databricks_catalog.gold.dim_customers''')
else:
  df_old = spark.sql('''SELECT 0 dim_customer_key, 0 customer_id, 0 create_date, 0 update_date 
                     FROM databricks_catalog.silver.customers WHERE 1=0''')

df_old.display()

dim_customer_key,customer_id,create_date,update_date
1,C01220,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
2,C01579,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
3,C01155,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
4,C01943,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
5,C01554,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
6,C00767,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
7,C01097,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
8,C01674,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
9,C00215,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
10,C00587,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z


**Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("dim_customer_key", "old_dim_customer_key")\
                .withColumnRenamed("customer_id", "old_customer_id")\
                .withColumnRenamed("create_date", "old_create_date")\
                .withColumnRenamed("update_date", "old_update_date")

df_old.display()

old_dim_customer_key,old_customer_id,old_create_date,old_update_date
1,C01220,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
2,C01579,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
3,C01155,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
4,C01943,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
5,C01554,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
6,C00767,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
7,C01097,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
8,C01674,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
9,C00215,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
10,C00587,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z


**Join of Old And New Records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')
df_join.display()

customer_id,email,city,state,domains,full_name,old_dim_customer_key,old_customer_id,old_create_date,old_update_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,C01220,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,C01579,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,C01155,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,C01943,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,C01554,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,C00767,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,C01097,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,C01674,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,C00215,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,C00587,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z


**Separating Old Vs New Records**

In [0]:
df_new = df_join.filter(col("old_dim_customer_key").isNull())
df_old = df_join.filter(col("old_dim_customer_key").isNotNull())
df_new.display()
df_old.display()

customer_id,email,city,state,domains,full_name,old_dim_customer_key,old_customer_id,old_create_date,old_update_date


customer_id,email,city,state,domains,full_name,old_dim_customer_key,old_customer_id,old_create_date,old_update_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,C01220,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,C01579,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,C01155,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,C01943,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,C01554,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,C00767,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,C01097,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,C01674,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,C00215,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,C00587,2025-11-05T22:30:26.176Z,2025-11-05T22:34:12.335Z


**Preparing df_old**

In [0]:
# Dropping all the records which are not required
df_old = df_old.drop("old_customer_id", "old_update_date")

# Renaming "old_create_date" column to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")\
                .withColumnRenamed("old_dim_customer_key", "dim_customer_key")
df_old = df_old.withColumn("create_date", to_timestamp("create_date"))

# Recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())
df_old.display()

customer_id,email,city,state,domains,full_name,dim_customer_key,create_date,update_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,2025-11-05T22:30:26.176Z,2025-11-05T22:36:22.704Z


**Preparing df_new**

In [0]:
# Dropping all the records which are not required
df_new = df_new.drop("old_customer_id", "old_dim_customer_key", "old_update_date", "old_create_date")

# Recreating "create_date" & "update_date" column with current timestamp
df_new = df_new.withColumn("create_date", current_timestamp())\
                .withColumn("update_date", current_timestamp())

df_new.display()

customer_id,email,city,state,domains,full_name,create_date,update_date


**Adding max surrogate key**

In [0]:
if spark.catalog.tableExists("databricks_catalog.gold.dim_customers"):
  df_max = spark.sql("SELECT MAX(dim_customer_key) FROM databricks_catalog.gold.dim_customers")
  max_surrogate_key = df_max.collect()[0][0]
else:
  max_surrogate_key = 0
  
display(max_surrogate_key)

2000

In [0]:
df_new = df_new.withColumn("dim_customer_key", monotonically_increasing_id() + max_surrogate_key + 1)
df_new.display()

customer_id,email,city,state,domains,full_name,create_date,update_date,dim_customer_key


**Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)
df_final.display()

customer_id,email,city,state,domains,full_name,create_date,update_date,dim_customer_key
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,1
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,2
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,3
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,4
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,5
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,6
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,7
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,8
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,9
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,2025-11-05T22:30:26.176Z,2025-11-05T22:36:25.244Z,10


In [0]:
if spark.catalog.tableExists("databricks_catalog.gold.dim_customers"):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databricksete3.dfs.core.windows.net/dim_customers")
    dlt_obj.alias("target").merge(df_final.alias("source"), "target.dim_customer_key = source.dim_customer_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_final.write.format("delta")\
                    .mode("overwrite")\
                    .option("path", "abfss://gold@databricksete3.dfs.core.windows.net/dim_customers")\
                    .saveAsTable("databricks_catalog.gold.dim_customers")

In [0]:
%sql
SELECT * FROM databricks_catalog.gold.dim_customers

customer_id,email,city,state,domains,full_name,create_date,update_date,dim_customer_key
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,1
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,2
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,3
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,4
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,5
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,6
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,7
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,8
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,9
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,10


In [0]:
spark.read.format("delta")\
            .load("abfss://gold@databricksete3.dfs.core.windows.net/dim_customers").display()

customer_id,email,city,state,domains,full_name,create_date,update_date,dim_customer_key
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,1
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,2
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,3
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,4
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,5
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,6
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,7
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,8
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,9
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,2025-11-05T22:30:26.176Z,2025-11-05T22:36:26.918Z,10
