### 4. Date

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Parameter Creation

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

In [0]:
df_src = spark.sql('''select distinct(Date_ID) as Date_ID
                        from parquet.`abfss://silver@salesetldatalake.dfs.core.windows.net/silver_data/`
                   ''')

In [0]:
## Sink for intial and incremental

if spark.catalog.tableExists('sales_catalog.gold.dim_date'):

    df_sink = spark.sql('''select date_key, Date_ID
                        from sales_catalog.gold.dim_date
                        ''')
else :
    df_sink = spark.sql('''select 1 as date_key, Date_ID
                        from parquet.`abfss://silver@salesetldatalake.dfs.core.windows.net/silver_data/`
                        where 1=0
                        ''')

### Filtering new and old records

In [0]:
df_filter = df_src.join(df_sink, df_src.Date_ID == df_sink.Date_ID, 'left').select(df_src.Date_ID, df_sink.date_key)
df_filter.display()

In [0]:
df_filter_old = df_filter.filter(col('date_key').isNotNull())

In [0]:
df_filter_new = df_filter.filter(col('date_key').isNull()).select(df_src.Date_ID)

### Surrogate Key

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value = spark.sql("select max(date_key) from sales_catalog.gold.dim_date").collect()[0][0]

In [0]:
df_filter_new = df_filter_new.withColumn('date_key', max_value + monotonically_increasing_id())

In [0]:
#df_filter_new.display()

In [0]:
final_df = df_filter_new.union(df_filter_old)

### Slowly Changing Dimenion 1

In [0]:
from delta.tables import *

In [0]:
if spark.catalog.tableExists('sales_catalog.gold.dim_date'): #Icremental Load/Run
    delta_table = DeltaTable.forPath(spark, 'abfss://gold@salesetldatalake.dfs.core.windows.net/dim_date')
    delta_table.alias('tgt').merge(
        final_df.alias('src'), "tgt.date_key = src.date_key")\
            .whenMatchedUpdateAll()\
             .whenNotMatchedInsertAll()

else: #Initial Load/Run
    final_df.write.format('delta')\
        .mode('overwrite')\
            .option('path','abfss://gold@salesetldatalake.dfs.core.windows.net/dim_date')\
                .saveAsTable('sales_catalog.gold.dim_date')

In [0]:
%sql
-- select * from sales_catalog.gold.dim_date