# **CREATING **

In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *


# **CREATING FLAG PARAMETER**

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

display(incremental_flag)

'0'

## **Creating Dimension (Branch)**

### **Loading the first 10 rows in the silver table**

In [0]:
%sql

SELECT * FROM 
parquet.`abfss://silver@oacardatalake.dfs.core.windows.net/car_sales`
LIMIT 10;

Branch_ID,Dealer_ID,Model_ID,Revenue,Units_Sold,Date_ID,Day,Month,Year,BranchName,DealerName,Product_Name,model_category,revenue_per_unit
BR0001,DLR0001,BMW-M1,13363978,2,DT00001,1,1,2017,AC Cars Motors,AC Cars Motors,BMW,BMW,6681989.0
BR0003,DLR0228,Hon-M218,17376468,3,DT00001,10,5,2017,AC Cars Motors,Deccan Motors,Honda,Hon,5792156.0
BR0004,DLR0208,Tat-M188,9664767,3,DT00002,12,1,2017,AC Cars Motors,Wiesmann Motors,Tata,Tat,3221589.0
BR0005,DLR0188,Hyu-M158,5525304,3,DT00002,16,9,2017,AC Cars Motors,Subaru Motors,Hyundai,Hyu,1841768.0
BR0006,DLR0168,Ren-M128,12971088,3,DT00003,20,5,2017,AC Cars Motors,Saab Motors,Renault,Ren,4323696.0
BR0008,DLR0128,Hon-M68,7321228,1,DT00004,28,4,2017,AC Cars Motors,Messerschmitt Motors,Honda,Hon,7321228.0
BR0009,DLR0108,Cad-M38,11379294,2,DT00004,31,12,2017,AC Cars Motors,Lexus Motors,Cadillac,Cad,5689647.0
BR0010,DLR0088,Mer-M8,11611234,2,DT00005,4,9,2017,AC Cars Motors,"IFA (including Trabant, Wartburg, Barkas) Motors",Mercedes-Benz,Mer,5805617.0
BR0011,DLR0002,BMW-M2,19979446,2,DT00005,2,1,2017,Acura Motors,Acura Motors,BMW,BMW,9989723.0
BR0011,DLR0069,Vol-M256,14181510,3,DT00006,9,5,2017,Acura Motors,Geo Motors,Volkswagen,Vol,4727170.0


## **Selecting the needed columns for the Dim (Model)**

In [0]:
cs_sq= spark.sql('''
SELECT DISTINCT Date_ID,Day,Month,Year  FROM 
parquet.`abfss://silver@oacardatalake.dfs.core.windows.net/car_sales`
''')
display(cs_sq)

Date_ID,Day,Month,Year
DT00032,17,1,2017
DT00052,8,1,2017
DT00305,7,3,2017
DT00357,19,11,2017
DT00380,11,11,2018
DT00565,23,8,2018
DT00769,24,2,2019
DT00806,20,2,2019
DT00829,30,10,2019
DT00845,2,11,2019


## **New and incremental loading**

### Bringing the schema if the table doesnt exist

In [0]:
if not spark.catalog.tableExists('cars_catalog.gold_layer.dim_date'):

    
    cs_sink = spark.sql( '''
    SELECT 1 as dim_date_key,Date_ID,Day,Month,Year FROM 
    parquet.`abfss://silver@oacardatalake.dfs.core.windows.net/car_sales`
    WHERE 1= 0 ''')

else:
    cs_sink = spark.sql( '''
    SELECT  dim_date_key,Date_ID,Day,Month,Year FROM 
    cars_catalog.gold.dim_date
    ''')


In [0]:
display(cs_sink)

dim_date_key,Date_ID,Day,Month,Year


## _Filtering new records and old records_

In [0]:
cs_filter = cs_sq.join(cs_sink, cs_sq['Date_ID'] == cs_sink['Date_ID'], 'left').select(cs_sq['Date_ID'], cs_sq['Day'],cs_sq['Month'],cs_sq['Year'],cs_sink['dim_date_key'])

In [0]:
cs_filter.display()

Date_ID,Day,Month,Year,dim_date_key
DT00032,17,1,2017,
DT00052,8,1,2017,
DT00305,7,3,2017,
DT00357,19,11,2017,
DT00380,11,11,2018,
DT00565,23,8,2018,
DT00769,24,2,2019,
DT00806,20,2,2019,
DT00829,30,10,2019,
DT00845,2,11,2019,


### **old_filter**

In [0]:
cs_filter_old = cs_filter.filter(cs_filter['dim_date_key'].isNotNull())


### **new_filter**

In [0]:
cs_filter_new = cs_filter.filter(cs_filter['dim_date_key'].isNull()).select(cs_sq['Date_ID'],cs_sq['Day'],cs_sq['Month'],cs_sq['Year'])


In [0]:
display(cs_filter_new)

Date_ID,Day,Month,Year
DT00032,17,1,2017
DT00052,8,1,2017
DT00305,7,3,2017
DT00357,19,11,2017
DT00380,11,11,2018
DT00565,23,8,2018
DT00769,24,2,2019
DT00806,20,2,2019
DT00829,30,10,2019
DT00845,2,11,2019


## **Creating a surrogate key**

### fetching the max surrogate key from an existing table

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql('''
    SELECT MAX(dim_date_key) FROM cars_catalog.gold.dim_date
    ''') 
    max_value = max_value_df.collect()[0][0]


### creating a surrogate key column and add a max surrogate key

In [0]:
cs_filter_new = cs_filter_new.withColumn('dim_date_key', max_value+monotonically_increasing_id())
display(cs_filter_new)

Date_ID,Day,Month,Year,dim_date_key
DT00032,17,1,2017,1
DT00052,8,1,2017,2
DT00305,7,3,2017,3
DT00357,19,11,2017,4
DT00380,11,11,2018,5
DT00565,23,8,2018,6
DT00769,24,2,2019,7
DT00806,20,2,2019,8
DT00829,30,10,2019,9
DT00845,2,11,2019,10


### Creating a final dataframe(cs_final) using the union of cs_filter_old andcs_filter_new

In [0]:
cs_final = cs_filter_old.union(cs_filter_new)


In [0]:
cs_final.display()

Date_ID,Day,Month,Year,dim_date_key
DT00032,17,1,2017,1
DT00052,8,1,2017,2
DT00305,7,3,2017,3
DT00357,19,11,2017,4
DT00380,11,11,2018,5
DT00565,23,8,2018,6
DT00769,24,2,2019,7
DT00806,20,2,2019,8
DT00829,30,10,2019,9
DT00845,2,11,2019,10


# SCD TYPE 1 (UPSERT - UPDATE AND INSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#incremental RUN
if spark.catalog.tableExists("cars_catalog.gold_layer.dim_date"):
    delta_table = DeltaTable.forPath(spark,'abfss://gold@oacardatalake.dfs.core.windows.net/dim_date')

    # merging the source destination to the delta table
    
    delta_table.alias("t").merge(cs_final.alias("s"), "t.dim_date_key = s.dim_date_key") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

    #initial RUN
else:
    cs_final.write.format("delta").mode('overwrite').option('path','abfss://gold@oacardatalake.dfs.core.windows.net/dim_date').saveAsTable('cars_catalog.gold.dim_date')

In [0]:
%sql
select * from cars_catalog.gold.dim_date

Date_ID,Day,Month,Year,dim_date_key
DT00032,17,1,2017,1
DT00052,8,1,2017,2
DT00305,7,3,2017,3
DT00357,19,11,2017,4
DT00380,11,11,2018,5
DT00565,23,8,2018,6
DT00769,24,2,2019,7
DT00806,20,2,2019,8
DT00829,30,10,2019,9
DT00845,2,11,2019,10
