In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("incremental_flag", '0')
dbutils.widgets.text("load_date", "")
load_date = dbutils.widgets.get("load_date")

In [0]:
incremental_flag = dbutils.widgets.get("incremental_flag")


##Fetching columns to create device dimension

In [0]:
source_df = spark.sql(f'''
select distinct(device_code) AS device_code, device_name, brand, device_segment, device_price from parquet.`abfss://silver@azurestorageaccnt.dfs.core.windows.net/{load_date}/creditcheck`
''')

In [0]:
source_df.sort('device_code').display()

In [0]:
if spark.catalog.tableExists('credcheck_catalog.gold.dim_device'):
    sink_df = spark.sql('''
                        select dimdevice_key, device_code, device_name, brand, device_segment, device_price from credcheck_catalog.gold.dim_device
                        ''')


else:
    sink_df = spark.sql(f'''
                        select 1 as dimdevice_key, device_code, device_name, brand, device_segment, device_price
                        from parquet.`abfss://silver@azurestorageaccnt.dfs.core.windows.net/{load_date}/creditcheck`
                        where 1=0
                        ''')

In [0]:
sink_df.display()

##Filtering new and old records

In [0]:
filter_df = source_df.join(sink_df, source_df.device_code == sink_df.device_code, 'left').select(source_df.device_code, source_df.device_name, source_df.brand, source_df.device_segment, source_df.device_price, sink_df.dimdevice_key)

filter_df.display()



###Filtering old and new records

In [0]:
old_df = filter_df.filter(filter_df.dimdevice_key.isNotNull())

In [0]:
new_df = filter_df.filter(filter_df.dimdevice_key.isNull()).select('device_code', 'device_name', 'brand', 'device_segment', 'device_price')
new_df.display()


#Creating Surrogate Key

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("select max(dimdevice_key) from credchcheck_catalog.gold.dim_device")
    max_value = max_value_df.collect()[0][0]+1

In [0]:
new_df = new_df.withColumn('dimdevice_key', max_value + monotonically_increasing_id())
new_df.display()


###CREATING FINAL DF = old_df + new_df

In [0]:
final_df = old_df.union(new_df)
final_df.display()


###SCD TYPE1 IMPLEMENTATION

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('credcheck_catalog.gold.dim_device'):
     delta_table = DeltaTable.forName(spark, "credcheck_catalog.gold.dim_device")
     
     delta_table.alias("trg").merge(final_df.alias("src"), "trg.dimdevice_key = src.dimdevice_key")\
         .whenMatchedUpdateAll()\
         .whenNotMatchedInsertAll()\
         .execute()

else:
    final_df.write.format('delta')\
        .mode('overwrite')\
        .option("path", f"abfss://gold@azurestorageaccnt.dfs.core.windows.net/{load_date}/dim_device")\
        .saveAsTable("credcheck_catalog.gold.dim_device" )

In [0]:
%sql
select count(*) from credcheck_catalog.gold.dim_device