In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text("incremental_flag",'0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

0


# CREATING DIMENSION Model

## # Fetch Relative Columns

In [0]:
df_src = spark.sql('''
select DISTINCT(Model_ID) as Model_ID, model_category 
from parquet.`abfss://silver@cararundatalake.dfs.core.windows.net/carsales`
''')

In [0]:
df_src.display()

Model_ID,model_category
Hon-M220,Hon
Hon-M215,Hon
Vol-M110,Vol
Vol-M260,Vol
BMW-M2,BMW
Acu-M60,Acu
Bui-M31,Bui
Hyu-M157,Hyu
Ren-M130,Ren
Tat-M179,Tat


### ## dim_model Sink - Initial and Incremental

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    df_sink = spark.sql('''
    select dim_model_key, Model_ID, model_category
    from cars_catalog.gold.dim_model
    ''')
else :
    df_sink = spark.sql('''
    select 1 as dim_model_key, Model_ID, model_category
    from parquet.`abfss://silver@cararundatalake.dfs.core.windows.net/carsales`
    where 1=0''')    


### Filtering new records and old records

In [0]:
df_filter = df_src.join(df_sink, df_src['Model_ID'] == df_sink['Model_ID'], 'left').select(df_src['Model_ID'], df_src
['model_category'], df_sink['dim_model_key'])


### df_filter_old

In [0]:
df_filter_old = df_filter.filter(col('dim_model_key').isNotNull())

### df_filter_new

In [0]:
df_filter_new = df_filter.filter(col('dim_model_key').isNull())

### Create Surrogate Key

**Fetch the max Surrogate Key from existing table**

In [0]:
if incremental_flag == '0':
    max_value = 1
else :
     max_value_df = spark.sql("select max(dim_model_key) from cars_catalog.gold.dim_model")
     max_value = max_value_df.collect()[0][0]+1

**Create Surrogate key column and ADD the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_model_key', max_value+monotonically_increasing_id())

In [0]:
df_filter_new.display()

Model_ID,model_category,dim_model_key
Hon-M220,Hon,1
Hon-M215,Hon,2
Vol-M110,Vol,3
Vol-M260,Vol,4
BMW-M2,BMW,5
Acu-M60,Acu,6
Bui-M31,Bui,7
Hyu-M157,Hyu,8
Ren-M130,Ren,9
Tat-M179,Tat,10


**Create Final DF - df_filter_old + df_filter_new**

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

Model_ID,model_category,dim_model_key
Hon-M220,Hon,1
Hon-M215,Hon,2
Vol-M110,Vol,3
Vol-M260,Vol,4
BMW-M2,BMW,5
Acu-M60,Acu,6
Bui-M31,Bui,7
Hyu-M157,Hyu,8
Ren-M130,Ren,9
Tat-M179,Tat,10


### SCD TYPE - 1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@cararundatalake.dfs.core.windows.net/dim_model")
    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.Model_ID = src.Model_ID")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
#Initial Run
else :
    df_final.write.format("delta")\
        .mode("overwrite")\
        .option("path","abfss://gold@cararundatalake.dfs.core.windows.net/dim_model")\
        .saveAsTable("cars_catalog.gold.dim_model")

    


In [0]:
%sql
select * from cars_catalog.gold.dim_model

Model_ID,model_category,dim_model_key
Hon-M220,Hon,1
Hon-M215,Hon,2
Vol-M110,Vol,3
Vol-M260,Vol,4
BMW-M2,BMW,5
Acu-M60,Acu,6
Bui-M31,Bui,7
Hyu-M157,Hyu,8
Ren-M130,Ren,9
Tat-M179,Tat,10
