In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Flag Parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

# Creating Dimension Model

In [0]:
df_source = spark.sql('''
select DISTINCT(Date_ID) as Date_ID from parquet.`abfss://silver@sacarsdatalake.dfs.core.windows.net/carsales`''')

In [0]:
if not spark.catalog.tableExists('cars_catalog.gold.dim_date'):
 df_sink = spark.sql('''
 SELECT 1 as dim_date_key, Date_ID
 from parquet.`abfss://silver@sacarsdatalake.dfs.core.windows.net/carsales/`
 where 1 = 0 ''')
else:
 df_sink = spark.sql('''
 SELECT dim_date_key, Date_ID
 from cars_catalog.gold.dim_date
''')

In [0]:
display(df_sink)

### Filtering new records and old records

In [0]:
df_filter = df_source.join(df_sink, df_source['Date_ID'] == df_sink['Date_ID'], 'left').select(df_source['Date_ID'],df_sink['dim_date_key'])

***df_filter_old***

In [0]:
df_filter_old = df_filter.filter(col('dim_date_key').isNotNull())

***df_filter_new***

In [0]:
df_filter_new = df_filter.filter(col('dim_date_key').isNull())\
    .select(df_source['Date_ID'])

In [0]:
display(df_filter_new)

# Create Surrogate Key

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("select max(dim_date_key) from cars_catalog.gold.dim_date")
    max_value = max_value_df.collect()[0][0] + 1

**Create Surrogate Key column and ADD the max Surrogate Key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_date_key', max_value + monotonically_increasing_id())

In [0]:
display(df_filter_new)

### Create Final DF -> df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

# SCD TYPE 1 - (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@sacarsdatalake.dfs.core.windows.net/dim_date')

    delta_tbl.alias("target").merge(df_final.alias("source"), "target.dim_date_key = source.dim_date_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

else:
    df_final.write.format("delta")\
        .mode('overwrite')\
        .option('path', 'abfss://gold@sacarsdatalake.dfs.core.windows.net/dim_date')\
        .saveAsTable("cars_catalog.gold.dim_date")

In [0]:
%sql
select * from cars_catalog.gold.dim_date