### **Data Reading from Source**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading from Source**

In [0]:
df = spark.sql("select * from databrciks_catalog.silver.customers_silver" )

### **Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset=['customer_id'])

### **Dividing New vs Old Records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''SELECT DimCustomerKey, customer_id, create_date, update_date 
                           FROM databrciks_catalog.gold.DimCustomers''')
else:
    df_old = spark.sql('''SELECT 0 AS DimCustomerKey, 0 AS customer_id, 
                                 0 AS create_date, 0 AS update_date 
                           FROM databrciks_catalog.silver.customers_silver 
                           WHERE 1=0''')


In [0]:
df_old.display()

### **Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
    .withColumnRenamed("customer_id", "old_customer_id")\
    .withColumnRenamed("create_date", "old_create_date")\
    .withColumnRenamed("update_date", "old_update_date")

### **Apply Join with the Old Records**

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['old_customer_id'], "left")


In [0]:
df_join.display()

### **Separating New vs Old Records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())


In [0]:
df_old = df_join.filter(df_join['old_DimCustomerKey'].isNotNull())


### **Preparing df_old**

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop('old_customer_id', 'old_update_date')

# Renaming "old_DimCustomerKey" to "DimCustomerKey
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

# Renaming "old_create_date" to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

# Recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())


In [0]:
df_old.display()

### **Preparing df_new**

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop('old_DimCustomerKey', 'old_customer_id', 'old_update_date', 'old_create_date')

# Recreating "create_date","current_date" column with current timestamp
df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp())


### **Surrogate Key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

### **Adding Max Surrogate Key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("SELECT max(DimCustomerKey) AS max_surrogate_key FROM databrciks_catalog.gold.DimCustomers")
    # Converting df_maxsur to max_surrogate_key
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']


In [0]:
df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key)+col('DimCustomerKey'))
df_new.display()

### **Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

### **SCD Type-1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if (spark.catalog.tableExists("databricks_cata.gold.DimCustomers")):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databricksete.dfs.core.windows.net/DimCustomers")

    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    df_final.write.mode("overwrite") \
    .format("delta") \
    .option("path", "abfss://gold@avidatabricksete.dfs.core.windows.net/DimCustomers") \
    .saveAsTable("databrciks_catalog.gold.DimCustomers")



In [0]:
%sql
select * from databrciks_catalog.gold.dimcustomers