In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Data Reading from source

## Creating parameter

In [0]:
dbutils.widgets.text('init_load_flag','1')
init_load_flag=int(dbutils.widgets.get('init_load_flag'))


In [0]:
df=spark.sql('select * from databricks_cata.silver.customers_silver')

## REMOVING DUBLICATES

In [0]:
df=df.dropDuplicates(subset=['customer_id'])

## **Dividing new vs old Records**

In [0]:
if init_load_flag==0:
  df_old=spark.sql('select DimCustomerKey,customer_id,create_date,update_date from databricks_cata.gold.DimCustomers')
else:
    df_old=spark.sql('select  0 DimCustomerKey, 0 customer_id,0 create_date,0 update_date from databricks_cata.silver.customers_silver where 1=0' )

## Renaming columns of df_old

In [0]:
df_old=df_old.withColumnRenamed('DimCustomerKey','old_DimCustomerKey')\
    .withColumnRenamed('customer_id','old_customer_id')\
    .withColumnRenamed('create_date','old_create_date')\
    .withColumnRenamed('update_date','old_update_date')

**Applying the joins with the old records**

In [0]:
df_join=df.join(df_old,df.customer_id==df_old.old_customer_id,'left')


**Seperating old vs New Records**

In [0]:
df_new=df_join.filter(df_join.old_DimCustomerKey.isNull())


In [0]:
df_old=df_join.filter(df_join.old_DimCustomerKey.isNotNull())

**preparing df_old**

In [0]:
# Dropping all the columns which are not required
df_old=df_old.drop('old_customer_id','old_update_date')
# Renaming old create_date column to create_date
df_old=df_old.withColumnRenamed('old_DimCustomerKey','DimCustomerKey')
df_old=df_old.withColumnRenamed('old_create_date','create_date')
df_old=df_old.withColumn('create_date',to_timestamp(df_old.create_date))
# Recreating update_date column with current timestamp
df_old=df_old.withColumn('update_date',current_timestamp())

**preparing df_new**

In [0]:
# Dropping all the columns which are not required
df_new=df_new.drop('old_DimCustomerKey','old_customer_id','old_update_date','old_create_date')
# creating create_date and update_date
df_new=df_new.withColumn('create_date',current_timestamp()).\
    withColumn('update_date',current_timestamp())

## Surrogate  Key- From 1

In [0]:
df_new=df_new.withColumn('DimCustomerKey',monotonically_increasing_id()+lit(1))

**Adding max surrogate key**

In [0]:
if init_load_flag==1:
    max_surrogate_key=0
else:
    df_max_surrogate_key=spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")
    ##converting df_max_surrogate_key to max_surrogate_key variable
    max_surrogate_key=df_max_surrogate_key.collect()[0]['max_surrogate_key']


In [0]:
df_new=df_new.withColumn('DimCustomerKey',lit(max_surrogate_key)+df_new.DimCustomerKey)

**union of df_old & df_new**

In [0]:
df_final=df_new.unionByName(df_old)

**upsert logic(scd type1)**

In [0]:
from delta.tables import DeltaTable

In [0]:
if init_load_flag==1:
    df_final.write.mode("overwrite")\
        .option('path','abfss://gold2026@storagespotifyproject20.dfs.core.windows.net/DimCustomers')\
        .saveAsTable("databricks_cata.gold.DimCustomers")
else:
    dlt_obj=DeltaTable.forPath(spark,'abfss://gold2026@storagespotifyproject20.dfs.core.windows.net/DimCustomers')
    dlt_obj.alias('trg').merge(df_final.alias('src'),'trg.DimCustomerKey=src.DimCustomerKey')\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()