In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:

init_load_flag = int(dbutils.widgets.get("init_load_flag"))


In [0]:

df = spark.sql("select * from databricks_catalog.silver.customers_silver")
df.display()

In [0]:
df = df.dropDuplicates(subset=["customer_id"])
df.count()

In [0]:

if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date from databricks_catalog.gold.DimCustomers''')
else:
    df_old = spark.sql('''
                            select 0 DimCustomersKey , 0 customer_id, 0 create_date, 0 update_date
                            from databricks_catalog.silver.customers_silver where 1=0
                       ''')
display(df_old)

In [0]:
df_old = df_old.withColumnRenamed("DimCustomersKey", "old_DimCustomerKey")\
    .withColumnRenamed("customer_id", "old_customer_id")\
    .withColumnRenamed("create_date", "old_create_date")\
    .withColumnRenamed("update_date", "old_update_date")


In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, "left")
display(df_join)

**Seperating new vs old Records**

In [0]:
df_old = df_join.filter(df_join["old_DimCustomerKey"].isNotNull())
df_new = df_join.filter(df_join["old_DimCustomerKey"].isNull())


**Preparing df_old**

In [0]:
df_old = df_old.drop('old_CustomerKey', 'old_customer_id', 'old_update_date')

df_old = df_old.withColumnRenamed("old_create_date", "create_date")\
            .withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

df_old = df_old.withColumn("update_date", current_timestamp())
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

df_old.display()

**Prepaing df_new**

In [0]:
df_new = df_new.drop('old_CustomerKey', 'old_customer_id', 'old_update_date', "old_create_date", "old_DimCustomerKey")

df_new = df_new.withColumn("update_date", current_timestamp())\
            .withColumn("create_date", current_timestamp())

df_new.display()

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 1
else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_catalog.gold.DimCustomers")
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id() + lit(max_surrogate_key) )
display(df_new)

In [0]:

df_final = df_old.unionByName(df_new)
display(df_final)

In [0]:
if spark.catalog.tableExists("databricks_catalog.gold.DimCustomers"):

    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@etedatabricks01.dfs.core.windows.net/DimCustomers")
    dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:
    df_final.write.mode("overwrite").format("delta")\
            .option("path", "abfss://gold@etedatabricks01.dfs.core.windows.net/DimCustomers")\
            .saveAsTable("databricks_catalog.gold.DimCustomers")