In [0]:
#import libraries

from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

###CREATING FLAG PARAMETER

In [0]:
dbutils.widgets.text("incremental_flag","0")

In [0]:
#incremental_flag = 0 means first load

incremental_flag = dbutils.widgets.get("incremental_flag")

###CREATING DIMENSION MODEL

In [0]:
%sql
SELECT * FROM parquet.`abfss://projects@projectstorageaccount1.dfs.core.windows.net/curated/CAR SALES/`;

Branch_ID,Dealer_ID,Model_ID,Revenue,Units_Sold,Date_ID,Day,Month,Year,BranchName,DealerName,Product_Name,Model_Category,ItemPrice
BR9546,DLR0060,Jee-M10,7223451,1,DT01246,28,5,2020,Premier Motors,"Fisker, Karma Motors",Jeep,Jee,7223451.0
BR9666,DLR0062,Jee-M12,22093020,3,DT01246,30,5,2020,Puma Motors,Ford Australia Motors,Jeep,Jee,7364340.0
BR9726,DLR0063,Jee-M13,22372413,3,DT01247,31,5,2020,Power Ranger Motors,Ford do Brasil Motors,Jeep,Jee,7457471.0
XYZ9726,XYZ0063,ZYXM13,22372413,3,DT01247,31,5,2020,DataFam Motors,Datafam Dealers,Surprise,ZYXM13,7457471.0


###Fetch related columns

In [0]:
model_src_df = spark.sql('''
                         SELECT DISTINCT Model_ID, Model_Category 
                         FROM parquet.`abfss://projects@projectstorageaccount1.dfs.core.windows.net/curated/CAR SALES/`;
                         '''
                        )


In [0]:
model_src_df.display()

Model_ID,Model_Category
Jee-M10,Jee
ZYXM13,ZYXM13
Jee-M12,Jee
Jee-M13,Jee


###Dim Model sink - Initial and Incremental

In [0]:
if spark.catalog.tableExists("car_sales_catalog.refined.dim_model"):
    
    model_sink_df  = spark.sql('''
                            SELECT Dim_Model_Key, Model_ID, Model_Category 
                            FROM car_sales_catalog.refined.dim_model
                            '''
                        )
else:
    model_sink_df  = spark.sql('''
                            SELECT 1 as Dim_Model_Key, Model_ID, Model_Category 
                            FROM parquet.`abfss://projects@projectstorageaccount1.dfs.core.windows.net/curated/CAR SALES/`
                            WHERE 1 = 0;
                            '''
                        )

In [0]:
model_sink_df.display()

Dim_Model_Key,Model_ID,Model_Category
1,Mah-M167,Mah
2,Che-M47,Che
3,Toy-M205,Toy
4,BMW-M249,BMW
5,Mer-M122,Mer
6,Hon-M215,Hon
7,Nis-M82,Nis
8,Toy-M206,Toy
9,Mar-M139,Mar
10,Ren-M207,Ren


###Filtering New Records and Old Records

In [0]:
df_filter = model_src_df.join(model_sink_df, model_src_df.Model_ID == model_sink_df.Model_ID, "left").select(model_src_df.Model_ID, model_src_df.Model_Category,model_sink_df.Dim_Model_Key )

In [0]:
df_filter.display()

Model_ID,Model_Category,Dim_Model_Key
Jee-M10,Jee,49.0
ZYXM13,ZYXM13,
Jee-M12,Jee,227.0
Jee-M13,Jee,271.0


**df_filter_old**

In [0]:
df_filter_old = df_filter.filter(col('Dim_Model_Key').isNotNull())  

In [0]:
df_filter_old.display()

Model_ID,Model_Category,Dim_Model_Key
Jee-M10,Jee,49
Jee-M12,Jee,227
Jee-M13,Jee,271


**df_filter_new**

In [0]:
df_filter_new = df_filter.filter(col('Dim_Model_Key').isNull()).select(model_src_df.Model_ID, model_src_df.Model_Category)

In [0]:
df_filter_new.display()

Model_ID,Model_Category
ZYXM13,ZYXM13


###CREATE Surrogate key

####Fetch max surrogate key from existing table

In [0]:
if incremental_flag == '0':
    max_value = 1
else:
    max_value = spark.sql("SELECT max(Dim_Model_Key) FROM car_sales_catalog.refined.dim_model").collect()[0][0] + 1

In [0]:
df_filter_new = df_filter_new.withColumn('Dim_Model_Key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

Model_ID,Model_Category,Dim_Model_Key
ZYXM13,ZYXM13,278


###Creating final df - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

Model_ID,Model_Category,Dim_Model_Key
ZYXM13,ZYXM13,278
Jee-M10,Jee,49
Jee-M12,Jee,227
Jee-M13,Jee,271


###SCD Type 1 UPSERT

In [0]:
#Incremental run

if spark.catalog.tableExists('car_sales_catalog.refined.dim_model'):
    deltaTable = DeltaTable.forPath(spark, "abfss://projects@projectstorageaccount1.dfs.core.windows.net/refined/CAR SALES/dim_model")

    deltaTable.alias("target").merge(df_final.alias("source"), "target.Dim_Model_Key == source.Dim_Model_Key")\
                                .whenMatchedUpdateAll()\
                                .whenNotMatchedInsertAll()\
                                .execute()    

#Initial run
else:
    df_final.write.format("delta")\
                    .mode("overwrite")\
                    .option("path","abfss://projects@projectstorageaccount1.dfs.core.windows.net/refined/CAR SALES/dim_model" )\
                    .saveAsTable("car_sales_catalog.refined.dim_model")