## Creating flag parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
dbutils.widgets.get('incremental_flag')

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from delta.tables import DeltaTable

Creating a **Dimension Model**

In [0]:
model_df_src = spark.sql('''
    select Distinct(Model_ID), model_category
    from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`
''')

In [0]:
display(model_df_src)

In [0]:
model_df_sink = spark.sql('''
    select 1 as dim_model_key, Model_ID, model_category
    from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`
    where 1 = 0
'''
)
model_df_sink.display()

**DIM MODEL SINK INITIAL, INCREMENTAL**

In [0]:
if spark.catalog.tableExists('sales.gold.dim_model'):
    model_df_sink = spark.sql('''
        select dim_model_key, Model_ID, model_category
        from sales.gold.dim_model
    ''')
else:
    model_df_sink = spark.sql('''
        select 1 as dim_model_key, Model_ID, model_category
        from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`
        where 1 = 0
    ''')

In [0]:
dim_model_df = model_df_src.join(model_df_sink, model_df_src['Model_ID'] == model_df_sink['Model_ID'],how = 'left')\
    .select(model_df_src['Model_ID'],model_df_src['model_category'],model_df_sink['dim_model_key'])
display(dim_model_df)

**Create Surrogate Key**

In [0]:
if dbutils.widgets.get('incremental_flag') == '0':
    max_value_model = 1
else:
    max_value_model = spark.sql("select max(dim_model_key) from sales.gold.dim_model").collect()[0][0]

In [0]:
dim_model_df = dim_model_df.withColumn('dim_model_key', max_value_model+monotonically_increasing_id())
display(dim_model_df)

**Applying SCD Type 1 - Upsert**

In [0]:
if spark.catalog.tableExists('sales.gold.dim_model'):
    deltatable = DeltaTable.forPath(spark, 'abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_model')
    deltatable.alias('tgt').merge(dim_model_df.alias('src'), 'tgt.dim_model_key = src.dim_model_key')\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
            .execute()
else:
    #Initial run
    dim_model_df.write.mode('overwrite').format('delta')\
        .option('path','abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_model')\
            .saveAsTable('sales.gold.dim_model')

## Creating second dimension table

**--Gold Branch Dimension Table--**

In [0]:
dim_branch_src = spark.sql(
    ''' select Distinct(Branch_ID), BranchName from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`'''
)
display(dim_branch_src)

In [0]:
dim_branch_sink = spark.sql(''' select 1 as dim_branch_key, Branch_ID, BranchName from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data` where 1 = 0''')
display(dim_branch_sink)

**Initial and incremental load**

In [0]:
if spark.catalog.tableExists('sales.gold.dim_branch'):
    dim_branch_sink = spark.sql('''
        select dim_branch_key, Branch_ID, BranchName
        from sales.gold.dim_branch
    ''')
else:
    dim_branch_sink = spark.sql('''
        select 1 as dim_branch_key, Branch_ID, BranchName
        from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`
        where 1 = 0
    ''')

In [0]:
dim_branch_df = dim_branch_src.join(dim_branch_sink,dim_branch_src['Branch_ID'] == dim_branch_sink['Branch_ID'],how = 'left')\
    .select(dim_branch_src['Branch_ID'],dim_branch_src['BranchName'],dim_branch_sink['dim_branch_key'])
display(dim_branch_df)

In [0]:
if dbutils.widgets.get('incremental_flag') == '0':
    max_value_branch = 1
else:
    max_value_branch = spark.sql('''select max(dim_branch_key) from sales.gold.dim_branch''').collect()[0][0]

In [0]:
dim_branch_df = dim_branch_df.withColumn('dim_branch_key', max_value_branch+monotonically_increasing_id())
display(dim_branch_df)

In [0]:
if spark.catalog.tableExists('sales.gold.dim_branch'):
    deltatable = DeltaTable.forPath(spark,'abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_branch')
    deltatable.alias('tgt').merge(dim_branch_df.alias('src'), 'tgt.dim_branch_key = src.dim_branch_key')\
        .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
                .execute()
else:
    dim_branch_df.write.format('delta')\
        .mode('overwrite')\
            .option('path','abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_branch')\
                .saveAsTable('sales.gold.dim_branch')

**GOLD DIM DEALER CREATION**

In [0]:
dim_dealer_src = spark.sql(''' Select Distinct(Dealer_ID), DealerName from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`''')
display(dim_dealer_src)

In [0]:
dim_dealer_sink = spark.sql(''' select 1 as dim_dealer_key, Dealer_ID, DealerName from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data` where 1=0''')
display(dim_dealer_sink)

In [0]:
if spark.catalog.tableExists('sales.gold.dim_dealer'):
    dim_dealer_sink = spark.sql('''
        select dim_dealer_key, Dealer_ID, DealerName
        from sales.gold.dim_dealer
    ''')
else:
    dim_dealer_sink = spark.sql('''
        select 1 as dim_dealer_key, Dealer_ID, DealerName
        from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`
        where 1 = 0
    ''')

In [0]:
dim_dealer_df = dim_dealer_src.join(dim_dealer_sink,dim_dealer_src['Dealer_ID'] == dim_dealer_sink['Dealer_ID'],how = 'left')\
    .select(dim_dealer_src['Dealer_ID'],dim_dealer_src['DealerName'],dim_dealer_sink['dim_dealer_key'])
display(dim_dealer_df)

In [0]:
if dbutils.widgets.get('incremental_flag') == '0':
    max_value_dealer = 1
else:
    max_value_dealer = spark.sql('''select max(dim_dealer_key) from sales.gold.dim_dealer''').collect()[0][0]

In [0]:
dim_dealer_df = dim_dealer_df.withColumn('dim_dealer_key', max_value_dealer+monotonically_increasing_id())
display(dim_dealer_df)

In [0]:
if spark.catalog.tableExists('sales.gold.dim_dealer'):
    deltatable = DeltaTable.forPath(spark,'abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_dealer')
    deltatable.alias('tgt').merge(dim_dealer_df.alias('src'), 'tgt.dim_dealer_key = src.dim_dealer_key')\
    .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
            .execute()
else:
    dim_dealer_df.write.format('delta')\
        .mode('overwrite')\
            .option('path','abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_dealer')\
                .saveAsTable('sales.gold.dim_dealer')

**Date dimension creation**

In [0]:
dim_date_src = spark.sql('''select Distinct(Date_ID) as Date_ID from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data`''')
display(dim_date_src)

In [0]:
if spark.catalog.tableExists('sales.gold.dim_date'):
    dim_date_sink = spark.sql(''' select dim_date_key, Date_ID from sales.gold.dim_date''')
else:
    dim_date_sink = spark.sql(''' select 1 as dim_date_key, Date_ID from parquet.`abfss://silver@dlsendtoendproject001.dfs.core.windows.net/silver_data` where 1=0''')


In [0]:
dim_date_df = dim_date_src.join(dim_date_sink,dim_date_src['Date_ID'] == dim_date_sink['Date_ID'],how='left')\
    .select(dim_date_src['Date_ID'],dim_date_sink['dim_date_key'])
display(dim_date_df)

In [0]:
if dbutils.widgets.get('incremental_flag') == '0':
    max_value_date = 1
else:
    max_value_date = spark.sql('''select max(dim_date_key) from sales.gold.dim_date''').collect()[0][0]

In [0]:
dim_date_df = dim_date_df.withColumn('dim_date_key', max_value_date+monotonically_increasing_id())
display(dim_date_df)

In [0]:
if spark.catalog.tableExists('sales.gold.dim_date'):
    deltatable = DeltaTable.forPath(spark,'abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_date')
    deltatable.alias('tgt').merge(dim_date_df.alias('src'),'tgt.dim_date_key = src.dim_date_key')\
        .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
                .execute()
else:
    dim_date_df.write.format('delta')\
        .mode('overwrite')\
            .option('path','abfss://gold@dlsendtoendproject001.dfs.core.windows.net/dim_date')\
                .saveAsTable('sales.gold.dim_date')