##Create parameter

In [0]:
dbutils.widgets.text('incremental_flag', '0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

#Create dimensional tables

##Dimensional Model

###Fetch Data

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
query = '''Select distinct(Date_ID) as Date_ID
           From parquet.`abfss://silver@carsalesdatdpdatalake.dfs.core.windows.net/carsales`
'''
df_src = spark.sql(query)
df_src.display()

Date_ID
DT00976
DT00192
DT01028
DT00029
DT00140
DT00444
DT00947
DT00475
DT01099
DT01208


###Dim_Date Sink

In [0]:
if not spark.catalog.tableExists('car_catalogs.gold.Dim_Date'):
    query = '''
    Select 1 as Dim_Date_Key, Date_ID
    From parquet.`abfss://silver@carsalesdatdpdatalake.dfs.core.windows.net/carsales`
    Where 1=0
    '''
    df_sink  = spark.sql(query)
else:
    new_query = '''
        Select Dim_Date_Key, Date_ID
        From car_catalogs.gold.Dim_Date
    '''
    df_sink = spark.sql(new_query)

In [0]:
df_filter = df_src.join(df_sink, df_src['Date_ID'] == df_sink['Date_ID'], 'left').select(df_src['Date_ID'], df_sink['Dim_Date_Key'])
df_filter.display()

Date_ID,Dim_Date_Key
DT00976,1
DT00192,2
DT01028,3
DT00029,4
DT00140,5
DT00444,6
DT00947,7
DT00475,8
DT01099,9
DT01208,10


In [0]:
df_filter_old = df_filter.filter(col('Dim_Date_Key').isNotNull())
df_filter_old.display()

Date_ID,Dim_Date_Key
DT00976,1
DT00192,2
DT01028,3
DT00029,4
DT00140,5
DT00444,6
DT00947,7
DT00475,8
DT01099,9
DT01208,10


In [0]:
df_filter_new = df_filter.filter(col('Dim_Date_Key').isNull()).select(df_src['Date_ID'])
df_filter_new.display()

Date_ID


##Create Surrogate Key

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    df_max_val = spark.sql('Select max(Dim_Date_Key) as Dim_Date_Key From car_catalogs.gold.Dim_Date')
    max_value = df_max_val.collect()[0][0]+1

In [0]:
df_filter_new = df_filter_new.withColumn('Dim_Date_Key', max_value + monotonically_increasing_id())
df_filter_new.display()

Date_ID,Dim_Date_Key


##Combine old and new df

In [0]:
df_final = df_filter_new.union(df_filter_old)
df_final.display()

Date_ID,Dim_Date_Key
DT00976,1
DT00192,2
DT01028,3
DT00029,4
DT00140,5
DT00444,6
DT00947,7
DT00475,8
DT01099,9
DT01208,10


##SCD Type 1 

In [0]:
from delta.tables import *

In [0]:
#Init Run
if not spark.catalog.tableExists('car_catalogs.gold.Dim_Date'):
    df_final.write.format('delta') \
                  .mode('overwrite') \
                  .option('path','abfss://gold@carsalesdatdpdatalake.dfs.core.windows.net/Dim_Date') \
                  .saveAsTable('car_catalogs.gold.Dim_Date')
#Incremental Run
else:
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@carsalesdatdpdatalake.dfs.core.windows.net/Dim_Date')
    delta_tbl.alias('trg').merge((df_final).alias('src'), 'trg.Dim_Date_Key == src.Dim_Date_Key') \
                          .withSchemaEvolution() \
                          .whenMatchedUpdateAll() \
                          .whenNotMatchedInsertAll() \
                          .execute()


In [0]:
%sql  
select * from car_catalogs.gold.dim_date

Date_ID,Dim_Date_Key
DT00976,1
DT00192,2
DT01028,3
DT00029,4
DT00140,5
DT00444,6
DT00947,7
DT00475,8
DT01099,9
DT01208,10
