# CREATE FLAG PARAMETRE

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text('incremental_flag', '0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')


# DIM MODEL CREATION


In [0]:
df_src= spark.sql("SELECT DISTINCT Date_ID FROM parquet.`abfss://silver@datalakeproject1goks.dfs.core.windows.net/car_sales`")

In [0]:
df_src.display()

# DF SINK 

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    df_sink= spark.sql(''' SELECT dim_date_key, Date_ID from cars_catalog.gold.dim_date ''')
else: 
    df_sink=spark.sql('''SELECT 1 as dim_date_key,Date_ID from parquet.`abfss://silver@datalakeproject1goks.dfs.core.windows.net/car_sales` WHERE 1=0 ''')

# Filtering old and new records

In [0]:
df_filter= df_src.join(df_sink,df_src['Date_ID']==df_sink['Date_ID'],'left').select(df_src['Date_ID'],df_sink['dim_date_key'])

In [0]:
df_filter.display()

**df_filter_OLD**

In [0]:
df_filter_old= df_filter.filter(col('dim_date_key').isNotNull())

In [0]:
df_filter_old.display()

**DF_FILTER NEW**

In [0]:
df_filter_new= df_filter.filter(col('dim_date_key').isNull())

In [0]:
df_filter_new.display()

# ADD SURROGATE KEY

In [0]:
if (incremental_flag == '0'):
    max_value= 1
else:
    max_value_df = spark.sql('select max(dim_date_key) from cars_catalog.gold.dim_date')
    max_value = (max_value_df.collect()[0][0])+1



# Create Surrogate Key column and add surrogate key

In [0]:
df_filter_new= df_filter_new.withColumn('dim_date_key',max_value+ monotonically_increasing_id() )

In [0]:
df_filter_new.display()

**FINAL DF**

In [0]:
final_df= df_filter_old.union(df_filter_new)

In [0]:
final_df.display()

# SCD Type 1

In [0]:
#Incremental Run

if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    delta_table= DeltaTable.forPath(spark, 'abfss://gold@datalakeproject1goks.dfs.core.windows.net/dim_date')
    delta_table.alias('trg').merge(final_df.alias('src'), "src.dim_date_key == trg.dim_date_key")\
                            .whenMatchedUpdateAll()\
                            .whenNotMatchedInsertAll()\
                            .execute() 

#First Run
else:
    final_df.write.format('delta')\
                  .mode('overwrite')\
                  .option("path",'abfss://gold@datalakeproject1goks.dfs.core.windows.net/dim_date')\
                  .saveAsTable("cars_catalog.gold.dim_date")


In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_date