**Create a flag parameter**


In [0]:
dbutils.widgets.text('Incremental_flag', '0')
incremental_flag = dbutils.widgets.get('Incremental_flag')
print(incremental_flag)

0


## Dimension Model 

**Import pyspark libs**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

**Dimension = dim_model**

In [0]:
df_dim_model_src = spark.sql('''
                    SELECT DISTINCT(Dealer_ID) AS Dealer_ID, DealerName
                    FROM PARQUET.`abfss://silver@carssalesprojectdl.dfs.core.windows.net/carssales`
                     '''
                     )

In [0]:
df_dim_model_src.display()

Branch_ID,BranchName
BR0131,Audi Motors
BR0760,Healey Motors
BR0789,Hillman Motors
BR0938,Isotta Fraschini Motors
BR1040,Lada Motors
BR1693,Saleen Motors
BR1792,Simca do Brasil Motors
BR1799,Simca do Brasil Motors
BR1955,Toyota Motors
BR1978,Turner Motors


**Dimension table for Initial and Incremental load**

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
    df_dim_model_tgt = spark.sql('''
                                SELECT Dealer_ID, DealerName, dim_Dealer_Key 
                                FROM cars_catalog.gold.dim_dealer
                                ''')

else:
    df_dim_model_tgt = spark.sql('''
                                SELECT Dealer_ID, DealerName, 1 AS dim_Dealer_Key 
                                FROM PARQUET.`abfss://silver@carssalesprojectdl.dfs.core.windows.net/carssales`
                                WHERE 1=0
                                ''')

In [0]:
df_dim_model_tgt.display()

Dealer_ID,DealerName,dim_Dealer_Key


**DF filters - Old and New**

In [0]:
df_filter = df_dim_model_src.join(df_dim_model_tgt, df_dim_model_src['Dealer_ID'] == df_dim_model_tgt['Dealer_ID'], 'left').select(df_dim_model_src['Dealer_ID'],  df_dim_model_src['DealerName'], df_dim_model_tgt['dim_Dealer_Key'])

In [0]:
df_filter.display()

Dealer_ID,DealerName,dim_Dealer_Key
DLR0058,Fiat do Brasil Motors,
DLR0107,Land Rover Motors,
DLR0129,Mia Motors,
DLR0111,Lotus Motors,
DLR0085,Humber Motors,
DLR0001,AC Cars Motors,
DLR0218,Lagonda Motors,
DLR0082,Honda Motors,
DLR0063,Ford do Brasil Motors,
DLR0193,Tazzari Motors,


In [0]:
df_filter_old = df_filter.filter(col('dim_Dealer_key').isNotNull())

In [0]:
df_filter_old.display()

Dealer_ID,DealerName,dim_Dealer_Key


In [0]:
df_filter_new = df_filter.filter(col('dim_Dealer_Key').isNull())

In [0]:
df_filter_new.count()

267

**Create Surrogate Key**

In [0]:
if incremental_flag == '0':
    max_value = 1
else:
    max_value = spark.sql("SELECT MAX(dim_Dealer_Key) FROM cars_catalog.gold.dim_dealer").collect()[0][0]+1

In [0]:
df_filter_new = df_filter_new.withColumn('dim_dealer_Key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

Dealer_ID,DealerName,dim_dealer_Key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10


In [0]:
df_final = df_filter_old.union(df_filter_new)

In [0]:
df_final.display()

Dealer_ID,DealerName,dim_Dealer_Key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10


# SCD - Type-1 (Upsert)

In [0]:
from delta.tables import DeltaTable

In [0]:
# Incremental Run
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
   delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@carssalesprojectdl.dfs.core.windows.net/dim_dealer')

   delta_tbl.alias('trg').merge(df_final.alias('src'), 'trg.dim_Dealer_key == src.dim_Dealer_Key')\
                        .whenMatchedUpdateAll()\
                        .whenNotMatchedInsertAll()\
                        .execute()

# Initial Run
else:
    df_final.write.format('delta')\
            .mode('overwrite')\
            .option('path', 'abfss://gold@carssalesprojectdl.dfs.core.windows.net/dim_dealer')\
            .saveAsTable('cars_catalog.gold.dim_dealer')

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_dealer

Dealer_ID,DealerName,dim_Dealer_Key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10
