#Create flag parameter

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

0


# Creating Dimension model

In [0]:
# Read the Parquet file into a DataFrame
df = spark.read.parquet("abfss://silver@carprojectadlsgen2.dfs.core.windows.net/carsales")

# Create a temporary view
df.createOrReplaceTempView("carsales_view")

# Now you can query the data using SQL
query = """
SELECT distinct(Date_ID) as Date_ID FROM carsales_view
"""
df_src = spark.sql(query)

# Display the result
display(df_src)

Date_ID
DT00029
DT00140
DT00192
DT00444
DT00475
DT00947
DT00976
DT01028
DT01099
DT00657


# just bring schema if table doesn't exists

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    df_sink = spark.sql(
        '''
        select dim_date_key , Date_ID
        from  cars_catalog.gold.dim_date
        '''
    )

else :    
    df_sink = spark.sql(
        '''
        select 1 as dim_date_key , Date_ID
        from  carsales_view
        where 1 = 0
        '''
    )

# bring existing records

In [0]:
df_filter = df_src.join(df_sink,df_src.Date_ID == df_sink.Date_ID,'left').select(df_src.Date_ID,df_sink.dim_date_key)

# `df_filter_old`

In [0]:
df_filter_old = df_filter.filter(df_filter.dim_date_key.isNotNull())

# `df_filter_new`

In [0]:
df_filter_new = df_filter.filter(df_filter.dim_date_key.isNull()).select(df_filter.Date_ID)
display(df_filter_new)

Date_ID
DT00029
DT00140
DT00192
DT00444
DT00475
DT00947
DT00976
DT01028
DT01099
DT00657


## fetch max surrogate key

In [0]:
if(incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("select max(dim_date_key) from cars_catalog.gold.dim_date") 
    max_value = max_value_df.collect()[0][0]   

### Create surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn('dim_date_key',max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10


### Create final df - df_fildter_old + new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
display(df_final)

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10


### SCD Type 1 (Upsert)

In [0]:
from delta.tables import DeltaTable

In [0]:
# Incremental Run
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    delta_tbl = DeltaTable.forPath(spark,"abfss://gold@carprojectadlsgen2.dfs.core.windows.net/dim_date")
    delta_tbl.alias("trg").merge(df_final.alias("src"),"trg.dim_date_key = src.dim_date_key")\
             .whenMatchedUpdateAll()\
             .whenNotMatchedInsertAll()\
             .execute()     
# Intial Run
else:
   df_final.write.format("delta")\
       .mode('overwrite')\
       .option("path","abfss://gold@carprojectadlsgen2.dfs.core.windows.net/dim_date")\
       .saveAsTable('cars_catalog.gold.dim_date')            

In [0]:
%sql
select * from cars_catalog.gold.dim_date

Date_ID,dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10
