In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag")) # Need to convert to integer due to string does not satisfied the if division.
print(init_load_flag)

### **Data Reading from source**

In [0]:
df = spark.sql("SELECT * FROM databricks_cat.silver.customers_silver")

In [0]:
df.display()

### **Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset=["customer_id"])

In [0]:
df.limit(10).display()

**Dividing New vs Old Records**

In [0]:
if init_load_flag == 0:
  df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date 
                     from databricks_cat.gold.DimCustomers''')
else:
  df_old = spark.sql('''select 0 DimCustomerKey,0 customer_id,0 create_date,0 update_date 
                     from databricks_cat.silver.customers_silver where 1=0''')

In [0]:
df_old.display()

**Renamaing Columns of df_old**

In [0]:
 df_old = df_old.withColumnRenamed("DimCustomerKey","old_DimCustomerKey")\
                  .withColumnRenamed("customer_id","old_customer_id")\
                  .withColumnRenamed("create_date","old_create_date")\
                  .withColumnRenamed("update_date","old_update_date")

In [0]:
# df_old = df_old.withColumn("old_customer_id", col("old_customer_id").cast("string"))
# df_old.display()

**surrogate Key - All the values**

In [0]:
# df = df.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))

In [0]:
# df.limit(10).display()

**Applying join with Old record**

In [0]:

df_join= df.join(df_old, df['customer_id'] == df_old['old_customer_id'], "left")

In [0]:
df_join.display()

**Seperating New vs Old Records**

In [0]:
df_new = df_join.filter(df_join["old_DimCustomerKey"].isNull())

In [0]:
df_old = df_join.filter(df_join["old_DimCustomerKey"].isNotNull())

In [0]:
df_new.limit(10).display()
df_old.limit(10).display()

**Preparing df_old**

In [0]:
#Dropping all the columns which are not required

df_old = df_old.drop('old_customer_id',"old_update_date")

# # Renaming "old_DimCustomerKey" to "DimCustomerKey"
df_old = df_old.withColumnRenamed("old_DimCustomerKey","DimCustomerKey")

# Renaming  "old_created_date" to "create_date"
df_old = df_old.withColumnRenamed("old_create_date","create_date")
df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))

# Recreateing "update_date" column with current date
df_old = df_old.withColumn("update_date", current_timestamp())


In [0]:
df_old.display()

**preparing df_new**

In [0]:
# df_new.limit(10).display()

In [0]:
#Dropping all the columns which are not required

df_new = df_new.drop('old_DimCustomerKey','old_customer_id',"old_update_date","old_create_date")

# Recreating "update_date","create_date" columns with current timestamp
df_new = df_new.withColumn("update_date",current_timestamp())
df_new = df_new.withColumn("create_date",current_timestamp())


In [0]:
df_new.limit(10).display()

#actually the DimCustomerKey Should be the next number from the DimCustomers table's old_DimCustomerKey in the gold layer, though this table is not yet created during the initial load stage.

**surrogate Key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

**Adding Max Surrogate Key**

In [0]:
if init_load_flag == 1 :
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cat.gold.DimCustomers")
    #Converting df_maxsur to max_surrogate_key variable
    max_surrogate_key = df_maxsur.collect()[0]["max_surrogate_key"]

In [0]:
print(max_surrogate_key)

In [0]:
df_new = df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))
df_new.display()

**Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

## **SCD TYPE - 1**

In [0]:
# if spark.catalog.tableExists("databricks_cat.gold.DimCustomers"): # this is the different way to do this not like a flag
#     print("Hello")
# else:
#     print("Bro") 

In [0]:
from delta.tables import DeltaTable

In [0]:
if (spark.catalog.tableExists("databricks_cat.gold.DimCustomers")) :
     dlt_obj = DeltaTable.forPath(spark,"abfss://gold@databricksetesg.dfs.core.windows.net/DimCustomers")

     dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
    
else:
   df_final.write.mode("overwrite")\
        .format("delta")\
        .option("path","abfss://gold@databricksetesg.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("databricks_cat.gold.DimCustomers")

    

In [0]:
%sql
SELECT * FROM databricks_cat.gold.dimcustomers

Change the init_load_flag from 1 to 0