In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Flag Parameter 

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

# Creating Dimension Model  

### Fetch Relative Columns

In [0]:
df_src = spark.sql('''
SELECT DISTINCT(Dealer_ID), DealerName FROM parquet.`abfss://silver@storazuredatabricksproj.dfs.core.windows.net/carsales`
''')

In [0]:
display(df_src)

### Dim Model Sink - Initial and Incremental (just bring the schema if table NOT EXISTS)

In [0]:
if spark.catalog.tableExists('data_catalog.gold.dim_dealer'):

    df_sink = spark.sql('''
    SELECT dim_dealer_key, Dealer_ID, DealerName
    FROM data_catalog.gold.dim_dealer
    ''')

else:

    df_sink = spark.sql('''
    SELECT 1 as dim_dealer_key, Dealer_ID, DealerName
    FROM PARQUET.`abfss://silver@storazuredatabricksproj.dfs.core.windows.net/carsales`
    WHERE 1=0
    ''')

### Filtering new records and old records 

In [0]:
df_filter = df_src.join(df_sink, df_src.Dealer_ID == df_sink.Dealer_ID, 'left').select(df_src.Dealer_ID, df_src.DealerName, df_sink.dim_dealer_key)

**df_filter_old**

In [0]:
df_filter_old = df_filter.filter(df_filter.dim_dealer_key.isNotNull())

**df_filter_new** 

In [0]:
df_filter_new = df_filter.filter(df_filter.dim_dealer_key.isNull()).select(df_src.Dealer_ID, df_src.DealerName)

In [0]:
display(df_filter_new)

###  Create Surrogate Key

**Fetch the max surrogate key for existing table**

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("SELECT max(dim_dealer_key) FROM data_catalog.gold.dim_dealer")
    max_value = max_value_df.collect()[0][0]+1

**Create surrogate key column and add the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_dealer_key', max_value + monotonically_increasing_id())

### Create Final DF - df_filter_old + df_filter_new 

In [0]:
df_final = df_filter_new.union(df_filter_old)

# SCD TYPE - 1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#Incremental run    
if spark.catalog.tableExists('data_catalog.gold.dim_dealer'):
    delta_table = DeltaTable.forPath(spark, 'abfss://gold@storazuredatabricksproj.dfs.core.windows.net/dim_dealer')

    delta_table.alias("trg").merge(df_final.alias("src"), "trg.dim_dealer_key = src.dim_dealer_key")\
                            .whenMatchedUpdateAll()\
                            .whenNotMatchedInsertAll()\
                            .execute()    

#Initial run
else:
    df_final.write.format('delta')\
        .mode('overwrite')\
        .option("path","abfss://gold@storazuredatabricksproj.dfs.core.windows.net/dim_dealer")\
        .saveAsTable("data_catalog.gold.dim_dealer")

In [0]:
%sql
SELECT * FROM data_catalog.gold.dim_dealer