In [0]:
from pyspark.sql.functions import*
from pyspark.sql.types import*

# CREATING PARAMETER

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag =dbutils.widgets.get('incremental_flag')

# CREATING DIMENSION

In [0]:
df_src=spark.sql('''SELECT DISTINCT Dealer_ID, DealerName
FROM parquet.`abfss://silver@carsaleslake.dfs.core.windows.net/carsales`''')

In [0]:
df_src.display()

**dim_branch sink - Initial and Incremental**

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
    df_sink=spark.sql('''
    SELECT dim_dealer_key,Dealer_ID, DealerName
    FROM cars_catalog.gold.dim_dealer
    ''') 
else:
    df_sink=spark.sql('''
    SELECT 1 as dim_dealer_key,Dealer_ID, DealerName
    FROM parquet.`abfss://silver@carsaleslake.dfs.core.windows.net/carsales`
    WHERE 1=0''')

### Filtering new and old records

In [0]:
df_filter=df_src.join(df_sink, df_src['Dealer_ID']==df_sink['Dealer_ID'],'left').select(df_src['Dealer_ID'], df_src['DealerName'], df_sink['dim_dealer_key'])

In [0]:
df_filter.display()

**df_filter_old** 

In [0]:
df_filter_old=df_filter.filter(col('dim_dealer_key').isNotNull())

**df_filter_new**

In [0]:
df_filter_new=df_filter.filter(col('dim_dealer_key').isNull()).select(df_src['Dealer_ID'], df_src['DealerName'])

#Create Surrogate Key

**Fetch max Surrogate Key from existing table**

In [0]:
if (incremental_flag == '0'):
    max_value =0
else:
    max_value = spark.sql("select max(dim_dealer_key) from cars_catalog.gold.dim_dealer")
    max_value = max_value-df.collect()[0][0]

**Create Surrogate key column and ADD the max surrogate key**

In [0]:
df_filter_new=df_filter_new.withColumn("dim_dealer_key", max_value + monotonically_increasing_id())

## Creating final DF (Old + New)

In [0]:
df_final= df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

# SCD TYPE -1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#Incremental Run
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
    delta_tbl= DeltaTable.forPath(spark,"abfss://gold@carsaleslake.dfs.core.windows.net/dim_dealer")

    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_dealer_key = src.dim_dealer_key")\
                            .whenMatchedUpdateAll()\
                            .whenNotMatchedInsertAll()\
                               .execute()
#Initial run
else:
    df_final.write.format("delta").mode("overwrite").option("path","abfss://gold@carsaleslake.dfs.core.windows.net/dim_dealer").saveAsTable("cars_catalog.gold.dim_dealer")

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_dealer