## Processing silver to gold dimension - Model

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text('incremental_flag', '0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

In [0]:
df_src = spark.sql('''
                      select distinct(Model_ID) as Model_ID, Model_Category from parquet.`abfss://silver@carmpdatalake.dfs.core.windows.net/transformed_data`
                      ''')

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    df_sink = spark.sql('''
                      select dim_model_key, Model_ID, Model_Category from cars_catalog.gold.dim_model
                      ''')
else:
    df_sink = spark.sql('''
                      select 1 as dim_model_key, Model_ID, Model_Category from parquet.`abfss://silver@carmpdatalake.dfs.core.windows.net/transformed_data` 
                      where 1 = 0
                      ''')

In [0]:
df_filter = df_src.join(df_sink, df_src['Model_ID'] == df_sink['Model_ID'], 'left')\
                  .select(df_src['Model_ID'], df_src['Model_Category'], df_sink['dim_model_key'] )

In [0]:
df_filter_old = df_filter.filter(col('dim_model_key').isNotNull())

In [0]:
df_filter_new = df_filter.filter(col('dim_model_key').isNull())\
                         .select(df_src['Model_ID'], df_src['Model_Category'])

In [0]:
if (incremental_flag == '0'):
    max_value = 0
else: 
    max_value_df = spark.sql(" select max(dim_model_key) from cars_catalog.gold.dim_model")  
    max_value = max_value_df.collect()[0][0]

In [0]:
df_filter_new = df_filter_new.withColumn('dim_model_key', max_value + monotonically_increasing_id() + lit(1))

In [0]:
df_final = df_filter_new.union(df_filter_old)

## UPSERT

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@carmpdatalake.dfs.core.windows.net/dim_model')
    delta_tbl.alias('t').merge(df_final.alias('s'), 't.dim_model_key = s.dim_model_key')\
                        .whenMatchedUpdateAll()\
                        .whenNotMatchedInsertAll()\
                        .execute()
else:
    df_final.write.format('delta')\
        .mode('overwrite')\
        .option('path', 'abfss://gold@carmpdatalake.dfs.core.windows.net/dim_model')\
        .saveAsTable('cars_catalog.gold.dim_model')