In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading from Source**

In [0]:
df = spark.sql("""
SELECT * FROM databricks_cat.silver.customers_silver
""")
df.display()

### **Removing Dups**

In [0]:
df = df.dropDuplicates(subset=['customer_id'])
df.limit(10).display()
df.count()

# **Divide new vs old records**

In [0]:
if init_load_flag == 0:

    df_old = spark.sql("""
    SELECT DimCustomersKey, customer_id, create_date, update_date FROM databricks_cat.gold.DimCustomers
    """)
else:
#databricks_cat.gold.DimCustomers
    df_old = spark.sql("""
    SELECT 0 DimCustomersKey, 0 customer_id, 0 create_date, 0 update_date FROM databricks_cat.silver.customers_silver
    WHERE 1=0
    """)

In [0]:
df_old.display()

**Renaming cols**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomersKey", "old_DimCustomersKey")\
    .withColumnRenamed("customer_id", "old_customer_id")\
    .withColumnRenamed("create_date", "old_create_date")\
    .withColumnRenamed("update_date", "old_update_date")

**Apply join with old records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, how='left')
df_join.display()

**Seperating new vs old records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomersKey'].isNull())
df_old = df_join.filter(df_join['old_DimCustomersKey'].isNotNull())

**Preparing DF_old**

In [0]:
df_old = df_old.drop("old_customer_id", "old_update_date")
df_old = df_old.withColumnRenamed("old_DimCustomersKey", "DimCustomersKey")
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

df_old = df_old.withColumn("update_date", current_timestamp())

df_old.display()

**Preparing DF_new**

In [0]:
df_new.limit(10).display()

In [0]:
df_new = df_new.drop("old_DimCustomersKey", "old_customer_id", "old_create_date", "old_update_date")
df_new = df_new.withColumn("create_date", current_timestamp())
df_new = df_new.withColumn("update_date", current_timestamp())
df_new.limit(10).display()

**Surrogate Key - from 1**

In [0]:
df_new = df_new.withColumn("DimCustomersKey", monotonically_increasing_id()+lit(1))
df_new.limit(10).display()

**Adding max surrogate key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("SELECT MAX(DimCustomersKey) as max_surrogate_key FROM databricks_cat.gold.DimCustomers")
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']


In [0]:
df_new = df_new.withColumn("DimCustomersKey", lit(max_surrogate_key) + col("DimCustomersKey"))
df_new.display()

**Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)
df_final.limit(10).display()
df_final.count()

## **SCD Type 1**

In [0]:
if spark.catalog.tableExists("databricks_cat.gold.DimCustomers"):
  dlt_obj = DeltaTable.forPath(spark, "abfss://gold@fmcdatabrickse2e.dfs.core.windows.net/DimCustomers")
  dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomersKey = src.DimCustomersKey")\
    .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
        .execute()

else:
  df_final.write.mode("overwrite")\
    .format("delta")\
      .option("path", "abfss://gold@fmcdatabrickse2e.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("databricks_cat.gold.DimCustomers")
  

In [0]:
%sql
SELECT * from databricks_cat.gold.dimcustomers