In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("init_load_flag", "0")

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading From Source**

In [0]:
df = spark.sql("select * from databrickseteproject167_catalog.silver.customers")

In [0]:
'''data = [
    Row(customer_id=1, full_name="Alice Johnson"),
    Row(customer_id=2, full_name="Bob Smith"),
    Row(customer_id=3, full_name="Charlie Brown"),
    Row(customer_id=4, full_name="Diana Prince"),
    Row(customer_id=5, full_name="Ethan Hunt")
]
df = spark.createDataFrame(data)'''

In [0]:
'''data = [
    Row(customer_id=3, full_name="Charles Brown"),  # same ID, updated name
    Row(customer_id=6, full_name="Fiona Gallagher"), # new customer
    Row(customer_id=2, full_name="Bob Smith")        # duplicate, unchanged
]
df = spark.createDataFrame(data)'''

In [0]:
df.display()

### Removing Duplicates

In [0]:
df = df.dropDuplicates(subset=['customer_id'])
df.display()

### Dividing New Vs Old Records

In [0]:
if init_load_flag == 0:
  df_old = spark.sql("select DimCustomerKey, customer_id, create_date, update_date from databrickseteproject167_catalog.gold.DimCustomers")
  df_old.display()
else:
  df_old = spark.sql("select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date from databrickseteproject167_catalog.silver.customers where 1=0")
  df_old.display()

### Renaming Columns of df_old

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
               .withColumnRenamed("customer_id", "old_customer_id")\
               .withColumnRenamed("create_date", "old_create_date")\
               .withColumnRenamed("update_date", "old_update_date")
               
df_old.display()

### Applying Join With The Old Records

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['old_customer_id'], 'left')

### Seperating New Vs Old Records

In [0]:
df_new = df_join.filter(df_join.old_DimCustomerKey.isNull())
df_new.display()
df_old = df_join.filter(df_join.old_DimCustomerKey.isNotNull())
df_old.display()

### Preparing df_old

In [0]:
df_old = df_old.drop("old_customer_id", "old_update_date")
df_old.display()
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")\
               .withColumnRenamed("old_create_date", "create_date")
df_old.display()
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))\
               .withColumn("update_date", current_timestamp())
df_old.display()

### Preparing df_new

In [0]:
df_new = df_new.drop("old_DimCustomerKey", "old_customer_id", "old_create_date", "old_update_date")
df_new.display()
df_new = df_new.withColumn("create_date", current_timestamp())
df_new.display()
df_new = df_new.withColumn("update_date", current_timestamp())
df_new.display()

### Surrogate Key - From 1

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))
df_new.display()

### Adding Max Surrogate Key

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databrickseteproject167_catalog.gold.DimCustomers")
    df_maxsur.display()
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']
    df_maxsur.display()

In [0]:
df_new = df_new.withColumn("DimCustomerKey", df_new.DimCustomerKey+lit(max_surrogate_key))
df_new.display()

### Union Of df_old And df_new

In [0]:
df_final = df_new.unionByName(df_old)
df_final.display()

### SCD Type - **1**

In [0]:
from delta.tables import DeltaTable

In [0]:
spark.sql("SELECT current_catalog(), current_schema()").show()

In [0]:
if (spark.catalog.tableExists("databrickseteproject167_catalog.gold.DimCustomers")):
    deltaTable_obj = DeltaTable.forPath(spark, "abfss://gold@databrickseteproject167.dfs.core.windows.net/DimCustomers")
    deltaTable_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey")\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    df_final.write.mode("overwrite")\
    .format("delta")\
    .option("overwriteSchema", "true")\
    .option("path", "abfss://gold@databrickseteproject167.dfs.core.windows.net/DimCustomers")\
    .saveAsTable("databrickseteproject167_catalog.gold.DimCustomers")

In [0]:
df = spark.sql("select * from databrickseteproject167_catalog.gold.DimCustomers order by DimCustomerKey")
df.display()