In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
init_load_flag =int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
df=spark.sql("select * from databricks_cata.silver.customers_silver")


In [0]:
df=df.dropDuplicates(subset=['customer_id'])



### **Dividing New vs Old records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date
                        from databricks_cata.gold.DimCustomers''')
else:
    df_old = spark.sql('''select 0 DimCustomerKey, 0 customer_id,0 create_date,0 update_date
                        from databricks_cata.silver.customers_silver where 1=0''')

    

### **Applying Join With old Records**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey","old_DimCustomerKey")\
    .withColumnRenamed("customer_id","old_customer_id")\
    .withColumnRenamed("create_date","old_create_date")\
    .withColumnRenamed("update_date","old_update_date")

In [0]:
df_join = df.join(df_old,df.customer_id==df_old.old_customer_id,'left')


In [0]:
df_new = df_join.filter(df_join.old_DimCustomerKey.isNull())
df_old = df_join.filter(df_join.old_DimCustomerKey.isNotNull())


### **Preparing df_old**

In [0]:
df_old = df_old.drop('old_customer_id','old_update_date')


df_old = df_old.withColumnRenamed("old_create_date","create_date")\
    .withColumnRenamed("old_DimCustomerKey","DimCustomerKey")

df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))
    
df_old = df_old.withColumn("update_date",current_timestamp())

In [0]:
df_old.limit(10).display()

### **Preparing df_new**

In [0]:
df_new = df_new.drop('old_DimCustomerKey','old_customer_id','old_create_date','old_update_date')
df_new = df_new.withColumn("create_date",current_timestamp())
df_new = df_new.withColumn("update_date",current_timestamp())


### **Surrogate Key - From 1**

In [0]:
df_new=df_new.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))
df.limit(10).display()

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_key from databricks_cata.gold.DimCustomers")
    max_surrogate_key = df_maxsur.collect()[0]['max_key']


In [0]:
df_new = df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))

### **union of old and new**

In [0]:
df_final = df_new.unionByName(df_old)
df_final.display()

### **SCD Type -1**

In [0]:
from delta.tables import DeltaTable


In [0]:
if not spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
    df_final.write.mode("overwrite")\
        .option("path","abfss://gold@databrickssk.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("databricks_cata.gold.DimCustomers")
else:
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databrickssk.dfs.core.windows.net/DimCustomers")
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
            .execute()



In [0]:
%sql
SELECT * from databricks_cata.gold.dimcustomers