In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

In [0]:
df = spark.read.table("databricks_cata.silver.customers")
df = df.dropDuplicates(subset=["customer_id"])

In [0]:
if init_load_flag==0:
    df_old = spark.sql("select dimCustomerKey,customer_id,create_date,update_date from databricks_cata.gold.DimCustomers")
else:
    df_old = spark.sql("select 0 dimCustomerKey, 0 customer_id, 0 create_date, 0 update_date where 1=0")

In [0]:
df_old = df_old.withColumnRenamed("dimCustomerKey","old_dimCustomerKey")\
    .withColumnRenamed("customer_id","old_customer_id")\
    .withColumnRenamed("create_date","old_create_date")\
    .withColumnRenamed("update_date","old_update_date")

In [0]:
df_join = df.join(df_old, df.customer_id==df_old.old_customer_id, "left")
display(df_join)

In [0]:
df_new = df_join.filter(col("old_dimCustomerKey").isNull())
df_old = df_join.filter(col("old_dimCustomerKey").isNotNull())


In [0]:
df_old = df_old.drop("old_customer_id","old_update_date")
df_old = df_old.withColumnRenamed("old_dimCustomerKey","dimCustomerKey")
df_old = df_old.withColumnRenamed("old_create_date","create_date")
df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))
df_old = df_old.withColumn("update_date",current_timestamp())
display(df_old)

In [0]:
df_new = df_new.drop("old_customer_id","old_dimCustomerKey","old_update_date","old_create_date")
df_new = df_new.withColumn("create_date",current_timestamp())
df_new = df_new.withColumn("update_date",current_timestamp())
display(df_new)

In [0]:

df_new = df_new.withColumn("dimCustomerKey", monotonically_increasing_id()+lit(1))
df_new.display()

In [0]:
if init_load_flag == 1:
    max_surrogte_key=0
else:
    df_surr= spark.sql("select max(dimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")
    max_surrogte_key = df_surr.collect()[0]['max_surrogate_key']


In [0]:
df_new=df_new.withColumn("dimCustomerKey",lit(max_surrogte_key)+col("dimCustomerKey"))

In [0]:
df_final=df_new.unionByName(df_old)
display(df_final)

In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):

    delt_obj = DeltaTable.forPath(spark,"abfss://gold@eklavyadatabricks.dfs.core.windows.net/DimCustomers")
    delt_obj.alias("t").merge(df_final.alias("s"),"t.dimCustomerKey=s.dimCustomerKey")\
        .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll().execute()
else:
    
    df_final.write.format("delta").mode("overwrite").option("path","abfss://gold@eklavyadatabricks.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("databricks_cata.gold.DimCustomers")

In [0]:
%sql
select * from databricks_cata.gold.dimcustomers