Loading Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("init_load_flag", "0")
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

#### Data Reading

## SOURCE FILE -> Silver_Customers

In [0]:
df = spark.sql("select * from databricks_etl_catalog.silver.customer_silver")
df.display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


### Removing Duplicates based on Primary key.
#### Primary Key: customer_id (unique key)

In [0]:
df.dropDuplicates(subset=['customer_id'])
df.limit(10).display()

customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


## **Dividing Old Vs. New Records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date 
                       from databricks_etl_catalog.gold.DimCustomers''')  # assuming this table (DimCustomers) exists, it will be created below.

else:
    df_old = spark.sql('''select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date 
                       from databricks_etl_catalog.silver.customer_silver where 1=0''')  # where 1=0; this only returns the column from schema. And we are putting 0 values to pseudo columns as they are not present in silver table


In [0]:
df_old.display()

DimCustomerKey,customer_id,create_date,update_date
8589934593,C00001,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934594,C00002,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934595,C00003,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934596,C00004,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934597,C00005,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934598,C00006,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934599,C00007,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934600,C00008,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934601,C00009,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
8589934602,C00010,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z


#### Renaming Columns of Old Records

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
                .withColumnRenamed("customer_id", "old_customer_id")\
                .withColumnRenamed("create_date", "old_create_date")\
                .withColumnRenamed("update_date", "old_update_date")

## **Applying Join with Old Records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')
df_join.display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,8589934593,C00001,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,8589934594,C00002,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,8589934595,C00003,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,8589934596,C00004,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,8589934597,C00005,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,8589934598,C00006,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,8589934599,C00007,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8589934600,C00008,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,8589934601,C00009,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,8589934602,C00010,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z


## **Seperating New Vs. Old Records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())    # where all records have null values for old_DimCustomerKey
df_new.limit(10).display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date


In [0]:
df_old = df_join.filter(df_join.old_DimCustomerKey.isNotNull())    # as this df_old is already added into Gold Layer 
df_old.limit(10).display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,8589934593,C00001,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,8589934594,C00002,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,8589934595,C00003,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,8589934596,C00004,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,8589934597,C00005,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,8589934598,C00006,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,8589934599,C00007,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8589934600,C00008,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,8589934601,C00009,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,8589934602,C00010,2025-07-18T22:53:34.216Z,2025-07-18T22:53:34.216Z


## **Preparing df_old Record**

In [0]:
# Dropping all columns that aren't required
df_old = df_old.drop('old_customer_id', 'old_update_date')

# Renaming 'Old_DimCustomerKey' to 'DimCustomerKey'
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

# Renaming old_create_date to create_date
df_old = df_old.withColumnRenamed("old_create_date", "create_date") 
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

# Recreating 'update_date' column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())


In [0]:
df_old.display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,create_date,update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,8589934593,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,8589934594,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,8589934595,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,8589934596,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,8589934597,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,8589934598,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,8589934599,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8589934600,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,8589934601,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,8589934602,2025-07-18T22:53:34.216Z,2025-07-18T23:00:34.285Z


## **Preparing df_new Records**

In [0]:
df_new.limit(10).display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,,,,
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,,,,
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,,,,
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,,,,
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,,,,
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,,,,
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,,,,
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,,,,
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,,,,
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,,,,


##### Removing columns like : old_DimCustomerKey, old_customer_id, old_create_date, old_update_date: because all having 'null' values. Creating new columns instead

In df_new, the column 'create_date' will have current_timestamp function to retrieve current date & time. As df_new are processed now unlike df_old which are already in gold layer existing.

In [0]:
# Dropping unnecessary columns that are not required now
df_new = df_new.drop('old_DimCustomerKey', 'old_customer_id', 'old_create_date', 'old_update_date')

# Recreting new columns "update_date" & "current_date" by using function current_timestamp()
df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp())

df_new.display()


customer_id,email,city,state,domains,full_name,update_date,create_date


#### Assigning surrogate key column to Data Frame

Surrogate Key: a pseudo/fake key that create in dimension & can easily apply joins between fact and dimension tables

Dim_Key: a surrogate key of our primary key [customer_id]

## **Surrogate Key - For 1**

Assigning dim surrogate key column to df_new

In [0]:
# Creating surrogate keys for primary keys
df_new = df_new.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))   
df_new.display()


customer_id,email,city,state,domains,full_name,update_date,create_date,DimCustomerKey


## **Adding Max Surrogate Key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0  # max surrogate key assigning to 0

else:
  df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_etl_catalog.gold.DimCustomers")
  # Converting df_maxsk to a list (max_sk)
  max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']   # here with collect function we can convert values (collected dataframe as df_maxsk) to a list
  

Adding surrogate keys monotronically as increasing in df_new now one by one.

In [0]:
# Adding the max_sk to the new records with increments
df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key)+col("DimCustomerKey"))  # For ex. 2000+1 , 2000+2, and so on.

## **Union of df_old & df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

customer_id,email,city,state,domains,full_name,update_date,create_date,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934593
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934594
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934595
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934596
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934597
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934598
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934599
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934600
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934601
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,2025-07-18T23:02:56.16Z,2025-07-18T22:53:34.216Z,8589934602


## **Upsert Table = Upload + Insert**

### SCD Type 1 : Overwrite

In [0]:
from delta.tables import DeltaTable

### **Merging files**

In [0]:

if spark.catalog.tableExists("databricks_etl_catalog.gold.DimCustomers"):
        # Saving data to gold folder created in Azure Data Lake Storage Gen2
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databricksetldatastorage.dfs.core.windows.net/DimCustomers")

    # Now merging & upsert the df_final to a target table
    dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey")\
                        .whenMatchedUpdateAll()\
                        .whenNotMatchedInsertAll()\
                        .execute()
        

else:
    df_final.write.mode("overwrite")\
        .format("delta")\
        .option("path", "abfss://gold@databricksetldatastorage.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("databricks_etl_catalog.gold.DimCustomers")


In [0]:
%sql
SELECT *
FROM databricks_etl_catalog.gold.dimcustomers

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,8589934593,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,8589934594,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,8589934595,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,8589934596,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,8589934597,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,8589934598,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,8589934599,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8589934600,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,8589934601,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,8589934602,2025-07-18T23:03:56.003Z,2025-07-18T22:53:34.216Z
