In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [0]:
%sql
select * from parquet.`abfss://silver@cardata1.dfs.core.windows.net/data`

#### Filter new & existing records from silver layer

In [0]:
# Fetch relevant columns for model table from the silver layer
df_silver = spark.sql('''
                select distinct(model_id) as model_id, model_category 
                from parquet.`abfss://silver@cardata1.dfs.core.windows.net/data`
                      
                      ''')

In [0]:
df_silver.display()

In [0]:
# Fetch model table from the gold layer
if spark.catalog.tableExists('cars_catalog.gold.model'):
   df_gold = spark.sql('''
              select model_key, model_id, model_category 
              from cars_catalog.gold.model
              ''')
else:
  df_gold = spark.sql('''
                      select 1 as model_key, model_id, model_category 
                      from parquet.`abfss://silver@cardata1.dfs.core.windows.net/data`
                      where 1=0
        ''')

In [0]:
df_gold.display()

In [0]:
# Left Join
df_filter = df_silver.join(df_gold, df_silver['model_id'] == df_gold['model_id'], how='left')\
            .select(df_silver['model_id'], df_silver['model_category'], df_gold['model_key'])

In [0]:
df_filter.display()

In [0]:
# Filter existing records
df_filter_existing = df_filter.filter(df_filter['model_key'].isNotNull())
df_filter_existing.display()

In [0]:
# Filter new records
df_filter_new = df_filter.filter(df_filter['model_key'].isNull())\
                     .select(df_filter['model_id'], df_filter['model_category'])
                     

df_filter_new.display()

#### Insert Surrogate_Key value in new records

In [0]:
# find max_value of model_key
if spark.catalog.tableExists('cars_catalog.gold.model'):
  max_value_df = spark.sql('''
              select max(model_key) 
              from cars_catalog.gold.model
              ''')
  max_value = max_value_df.collect()[0][0] 
else:
  max_value = 0

In [0]:
# Insert model_key value in new records
df_filter_new = df_filter_new.withColumn('model_key', max_value + monotonically_increasing_id() + 1)

In [0]:
df_filter_new.display()

In [0]:
# Merge existing and new records
df_final = df_filter_existing.union(df_filter_new)
df_final.display()

#### SCD - Type 1 (Upsert)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.model'):

    # Incremental Run
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@cardata1.dfs.core.windows.net/model")
    
    delta_tbl.alias("t").merge(df_final.alias("s"), "s.model_id = t.model_id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

else:
    
    # Initial Run
    df_final.write.format('delta')\
        .mode('overwrite')\
        .option('path', 'abfss://gold@cardata1.dfs.core.windows.net/model')\
        .saveAsTable('cars_catalog.gold.model')


In [0]:
%sql
select * from cars_catalog.gold.model