In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading From Source**

In [0]:
df = spark.sql("select * from databricks_cata.silver.customers_silver")

### **Remove Duplicates**

In [0]:
df = df.dropDuplicates(subset=['customer_id'])

In [0]:
df.limit(10).display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


## ### **Dividing New Vs Old Records**

In [0]:
if init_load_flag == 0:
    df_old=spark.sql('''select DimCustomerKey, customer_id, create_date, update_date from databricks_cata.gold.DimCustomers''')

else:
    df_old=spark.sql('''select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date from databricks_cata.silver.customers_silver where 1=0''')


In [0]:
df_old.display()

DimCustomerKey,customer_id,create_date,update_date


### **Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
  .withColumnRenamed("customer_id", "old_customer_id")\
  .withColumnRenamed("create_date", "old_create_date")\
  .withColumnRenamed("update_date", "old_update_date")

### **Applying Joins With The Old Records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old["old_customer_id"], "left")

In [0]:
df_join.display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,,,,
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,,,,
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,,,,
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,,,,
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,,,,
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,,,,
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,,,,
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,,,,
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,,,,
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,,,,


### **Seperating New vs Old Records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())

In [0]:
df_old = df_join.filter(df_join['old_DimCustomerKey'].isNotNull())

### **Preparing df_old**

In [0]:
# dropping all the columns which are not required

df_old = df_old.drop("old_customer_id", "old_update_date")

# Renaming "old_DimCustomerKey" column to "DimCustomerKey"

df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

# Renaming "old_create_date" column to "create date"

df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

# Recreating "update_date" column with current timestamp

df_old = df_old.withColumn("update_date", current_timestamp())

In [0]:
df_old.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,create_date,update_date


### **Preparing df_new**

In [0]:
# dropping all the columns which are not required

df_new = df_new.drop("old_DimCustomerKey", "old_customer_id", "old_update_date", "old_create_date")

# Recreating "update_date", "current_date" column with current timestamp

df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp())

In [0]:
df_new.display()

customer_id,email,city,state,domains,full_name,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,2025-12-23T14:59:37.648Z,2025-12-23T14:59:37.648Z


### **Surrogate Key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

customer_id,email,city,state,domains,full_name,update_date,create_date,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,1
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,2
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,3
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,4
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,5
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,6
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,7
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,8
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,9
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,2025-12-23T14:59:38.344Z,2025-12-23T14:59:38.344Z,10


In [0]:
df.limit(10).display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


### **Adding Max Surrogate Key**

In [0]:
if init_load_flag ==1:
  max_surrogate_key = 0

else:
  df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")

  # Converting df_maxsur to max_surrogate_key
  max_surrogate_key = df_maxsur.collect()[0]["max_surrogate_key"]


In [0]:
df_new = df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))

### **Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

customer_id,email,city,state,domains,full_name,update_date,create_date,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,1
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,2
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,3
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,4
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,5
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,6
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,7
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,8
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,9
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,2025-12-23T14:59:39.668Z,2025-12-23T14:59:39.668Z,10


### **SCD Type - 1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(spark,"abfss://gold@sriharsan1databricks.dfs.core.windows.net/DimCustomers")
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
     df_final.write.mode("overwrite")\
        .format("delta")\
        .option("path","abfss://gold@sriharsan1databricks.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("databricks_cata.gold.DimCustomers")

In [0]:
%sql
select * from databricks_cata.gold.dimcustomers

customer_id,email,city,state,domains,full_name,update_date,create_date,DimCustomerKey
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,1
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,2
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,3
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,4
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,5
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,6
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,7
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,8
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,9
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,2025-12-23T14:59:45.715Z,2025-12-23T14:59:45.715Z,10
