In [0]:

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import *

In [0]:
dbutils.widgets.text('incremental_load', '')

In [0]:
incremental_load = dbutils.widgets.get('incremental_load')

In [0]:
df = spark.read.format('delta').option('header','true').option('path','abfss://silver@adls5485.dfs.core.windows.net/customers').load()
df.display()

In [0]:
df_src = df.dropDuplicates(subset=['customer_id'])


In [0]:
%sql

select * from sales_prj.gold.customers;

In [0]:
if incremental_load == '0':

    df_old = spark.sql("select dimcustomerkey,customer_id,create_date,update_date from sales_prj.gold.customers")

else:
    df_old = spark.sql("select 0 as dimcustomerkey,customer_id,0 as create_date,0 as update_date from sales_prj.gold.customers where 1=0")


In [0]:
df_old.display()


In [0]:
df_old = df_old.withColumnRenamed('dimcustomerkey','dimcustomerkey_old') \
               .withColumnRenamed("customer_id","customer_id_old") \
               .withColumnRenamed("create_date",'created_date_old') \
               .withColumnRenamed("update_date",'update_date_old')
df_old.display()


In [0]:
df = df_src.withColumn('dimcustomerkey',monotonically_increasing_id() + lit(1))
df.display()


In [0]:
df_join = df.join(df_old,df.customer_id == df_old.customer_id_old,'left')
df_join.display()


In [0]:
df_old = df_join.filter(df_join.dimcustomerkey_old.isNotNull())
df_new = df_join.filter(df_join.dimcustomerkey_old.isNull())
df_old.display()
df_new.display()


In [0]:
df_old = df_old.drop('dimcustomerkey_old','customer_id_old','update_date_old')
df_old = df_old.withColumnRenamed('created_date_old','create_date')
df_old = df_old.withColumn('create_date',to_timestamp(df_old['create_date']))
df_old = df_old.withColumn('update_date',current_timestamp())



In [0]:
df_old.display()


In [0]:
df_new.display()

In [0]:
df_new = df_new.drop('dimcustomerkey_old','customer_id_old','created_date_old','update_date_old')
df_new = df_new.withColumn('create_date',current_timestamp()).withColumn('update_date',current_timestamp())
df_new.display()

In [0]:
df_new = df_new.withColumn('dimcustomerkey',monotonically_increasing_id() + lit(1))
df_new.display()

In [0]:
if incremental_load == '1':
    max_value = 0

else:

    max_value_df = spark.sql('select max(dimcustomerkey) as max_surrogate_key from sales_prj.gold.customers')
    max_value = max_value_df.collect()[0]['max_surrogate_key']



In [0]:
df_new = df_new.withColumn('dimcustomerkey',lit(max_value) + col("dimcustomerkey"))
df_new.display()

In [0]:
df_final = df_new.unionByName(df_old)
df_final.display()

In [0]:
if spark.catalog.tableExists('sales_prj.gold.customers'):

    dlt_table = DeltaTable.forPath(spark,'abfss://gold@adls5485.dfs.core.windows.net/customers')

    dlt_table.alias('t').merge(df_final.alias('s'),'t.dimcustomerkey = s.dimcustomerkey') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
    
else:
    df_final.write.format('delta').mode('append').option('path','abfss://gold@adls5485.dfs.core.windows.net/customers').saveAsTable('sales_prj.gold.customers')


In [0]:
%sql

select * from sales_prj.gold.customers