# Creating Flag Parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

0


In [0]:
%sql
select * from parquet.`abfss://silver@thadiyalcardatalake.dfs.core.windows.net/carsales`
limit 10

Branch_ID,Dealer_ID,Model_ID,Revenue,Units_Sold,Date_ID,Day,Month,Year,BranchName,DealerName,Product_Name,model_category,RevPerUnit
BR0001,DLR0001,BMW-M1,13363978,2,DT00001,1,1,2017,AC Cars Motors,AC Cars Motors,BMW,BMW,6681989.0
BR0003,DLR0228,Hon-M218,17376468,3,DT00001,10,5,2017,AC Cars Motors,Deccan Motors,Honda,Hon,5792156.0
BR0004,DLR0208,Tat-M188,9664767,3,DT00002,12,1,2017,AC Cars Motors,Wiesmann Motors,Tata,Tat,3221589.0
BR0005,DLR0188,Hyu-M158,5525304,3,DT00002,16,9,2017,AC Cars Motors,Subaru Motors,Hyundai,Hyu,1841768.0
BR0006,DLR0168,Ren-M128,12971088,3,DT00003,20,5,2017,AC Cars Motors,Saab Motors,Renault,Ren,4323696.0
BR0008,DLR0128,Hon-M68,7321228,1,DT00004,28,4,2017,AC Cars Motors,Messerschmitt Motors,Honda,Hon,7321228.0
BR0009,DLR0108,Cad-M38,11379294,2,DT00004,31,12,2017,AC Cars Motors,Lexus Motors,Cadillac,Cad,5689647.0
BR0010,DLR0088,Mer-M8,11611234,2,DT00005,4,9,2017,AC Cars Motors,"IFA (including Trabant, Wartburg, Barkas) Motors",Mercedes-Benz,Mer,5805617.0
BR0011,DLR0002,BMW-M2,19979446,2,DT00005,2,1,2017,Acura Motors,Acura Motors,BMW,BMW,9989723.0
BR0011,DLR0069,Vol-M256,14181510,3,DT00006,9,5,2017,Acura Motors,Geo Motors,Volkswagen,Vol,4727170.0


#Creating Dimension Model

In [0]:
df_src = spark.sql('''
select Distinct(Branch_ID) as Branch_ID, BranchName from parquet.`abfss://silver@thadiyalcardatalake.dfs.core.windows.net/carsales`
''')


In [0]:
df_src.limit(10).display()

Branch_ID,BranchName
BR0131,Audi Motors
BR0760,Healey Motors
BR0789,Hillman Motors
BR0938,Isotta Fraschini Motors
BR1040,Lada Motors
BR1693,Saleen Motors
BR1792,Simca do Brasil Motors
BR1799,Simca do Brasil Motors
BR1955,Toyota Motors
BR1978,Turner Motors


### dim_branch sink - initial and incremental

In [0]:
if spark.catalog.tableExists('CarProject.gold.dim_branch'):
    df_sink = spark.sql(
                    "select dim_branch_key, Branch_ID, BranchName from carproject.gold.dim_branch"
                    )
else:
    df_sink = spark.sql('''
                    select 1 as dim_branch_key, Branch_ID, BranchName from parquet.`abfss://silver@thadiyalcardatalake.dfs.core.windows.net/carsales`
                    where 1=0
                    ''')


In [0]:
df_sink.display()

dim_branch_key,Branch_ID,BranchName


In [0]:
df_filter = df_src.join(df_sink, df_src.Branch_ID == df_sink.Branch_ID,'left').select(df_src.Branch_ID, df_src.BranchName, df_sink.dim_branch_key)
df_filter.display()

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,
BR0760,Healey Motors,
BR0789,Hillman Motors,
BR0938,Isotta Fraschini Motors,
BR1040,Lada Motors,
BR1693,Saleen Motors,
BR1792,Simca do Brasil Motors,
BR1799,Simca do Brasil Motors,
BR1955,Toyota Motors,
BR1978,Turner Motors,


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df_filter_old = df_filter.filter(col('dim_branch_key').isNotNull())
df_filter_old.display()

Branch_ID,BranchName,dim_branch_key


In [0]:
df_filter_new = df_filter.filter(col('dim_branch_key').isNull()).select(df_filter.Branch_ID, df_filter.BranchName)
df_filter_new.limit(10).display()


Branch_ID,BranchName
BR0131,Audi Motors
BR0760,Healey Motors
BR0789,Hillman Motors
BR0938,Isotta Fraschini Motors
BR1040,Lada Motors
BR1693,Saleen Motors
BR1792,Simca do Brasil Motors
BR1799,Simca do Brasil Motors
BR1955,Toyota Motors
BR1978,Turner Motors


### Creating Surrogate key

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("select max('dim_branch_key') from carproject.gold.dim_branch")
    max_value = max_value_df.collect()[0][0]+1

In [0]:
df_filter_new = df_filter_new.withColumn('dim_branch_key',max_value + monotonically_increasing_id())


In [0]:
df_filter_new.limit(10).display()

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10


### Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)
df_final.limit(10).display()

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10


### SCD TYPE -1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
# Incremental Run
if spark.catalog.tableExists('CarProject.gold.dim_branch'):
    delta_tbl = DeltaTable.forPath(spark,'abfss://gold@thadiyalcardatalake.dfs.core.windows.net/dim_branch')
    delta_tbl.alias('trg').merge(df_final.alias('src'), 'trg.dim_branch_key = src.dim_branch_key') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()


# Initial Run
else:
    df_final.write.format('delta')\
        .mode('overwrite')\
            .option("path", 'abfss://gold@thadiyalcardatalake.dfs.core.windows.net/dim_branch')\
                .saveAsTable('CarProject.gold.dim_branch')    

In [0]:
%sql
select * from carproject.gold.dim_branch 
limit 10

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10
