In [0]:
dbutils.widgets.dropdown(
    "init_load_flag",      
    "1",                   
    ["0", "1"],             
    "Init Load Flag (1=Full, 0=Incremental)" 
)


# Prepare the notebook with a flag, to initial or incremental load.

- We add a parameter 1 or 0, 1 is for initial load and 0 for incremental load.

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = spark.sql('SELECT * FROM databricks_cat.silver.customers')\
    .dropDuplicates(subset=['customer_id'])

### **Aggregate a surrogate key**

In [0]:
df = df.withColumn('DimCustomerKey', monotonically_increasing_id() + lit(1))

# **Divide the data with new vs old records for initial or incremental**

In [0]:
if init_load_flag == 0: 
  df_old = spark.sql('''
                     SELECT 
                     DimCustomerKey, 
                     customer_id, 
                     create_date, 
                     update_date 
                     FROM databricks_cat.gold.DimCustomers
                     ''')
else:
    df_old = spark.sql('''
                       SELECT 
                       0 DimCustomerKey, 
                       0 customer_id, 
                       0 create_date,
                       0 update_date 
                       FROM databricks_cat.silver.customers 
                       WHERE 1=0
                       ''')

In [0]:
df_old = df_old.withColumnRenamed('DimCustomerKey', 'old_DimCustomerKey')\
    .withColumnRenamed('customer_id', 'old_customer_id')\
    .withColumnRenamed('create_date', 'old_create_date')\
    .withColumnRenamed('update_date', 'old_update_date')


### **Applying join with the old records**

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['old_customer_id'], 'left')

### **Separating new vs old records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())\
    .drop('old_DimCustomerKey', 'old_customer_id', 'old_create_date', 'old_update_date')\
    .withColumn('create_date', current_timestamp())\
    .withColumn('update_date', col('create_date'))

In [0]:
df_old = df_join.filter(df_join['old_DimCustomerKey'].isNotNull())\
    .withColumn('DimCustomerKey', col('old_DimCustomerKey'))\
    .withColumn('create_date', to_timestamp(col('old_create_date')))\
    .withColumn('update_date', current_timestamp())\
    .drop('old_DimCustomerKey', 'old_customer_id', 'old_create_date', 'old_update_date')

### **Adding Max Surrogate Key**
- If we apply incremental load, we need to column DimCustomer Key take the Max value for the old records

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql('''SELECT MAX(DimCustomerKey) AS max_surrogate_key FROM databricks_cat.gold.DimCustomers''').collect()[0]

    #Converting in a variable
    max_surrogate_key = df_maxsur['max_surrogate_key']

### **Adding the max surrogate key to the df_new**

In [0]:
df_new = df_new.withColumn('DimCustomerKey', lit(max_surrogate_key) + col('DimCustomerKey'))

In [0]:
df_final = df_new.unionByName(df_old)

### Applying the upsert/update data

In [0]:
from delta.tables import DeltaTable

In [0]:
from delta.tables import DeltaTable

table_name = "databricks_cat.gold.DimCustomers"
table_path = "abfss://gold@databrickseterjed.dfs.core.windows.net/DimCustomers"

if spark.catalog.tableExists(table_name):
    dlt_obj = DeltaTable.forName(spark, table_name)

    (dlt_obj.alias("trg")
        .merge(
            df_final.alias("src"),
            "trg.DimCustomerKey = src.DimCustomerKey"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    (df_final.write
        .format("delta")
        .mode("overwrite")
        .option("path", table_path)
        .saveAsTable(table_name)
    )


  