In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
df = spark.sql("select * from `databricks-catas`.silver.customers_silver")

df.display()

### Removing Duplicates

In [0]:
df = df.dropDuplicates(subset = ['customer_id'])

df.display()

#  Dividing New Vs Old Records

In [0]:
init_load_flag = dbutils.widgets.get("init_load_flag")

if init_load_flag == 0:
    df_old = spark.sql('''select dimCustomerKey, customer_id, create_date, update_date
                   from `databricks-catas`.gold.DimCustomers''')
else:
    df_old = spark.sql('''select 0 dimCustomerKey, 0 customer_id, 0 create_date, 0 update_date
                   from `databricks-catas`.silver.customers_silver
                   where 1 = 0''')
    
df_old.display()

### **Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("dimCustomerKey", "old_dimCustomerKey") \
            .withColumnRenamed("customer_id", "old_customer_id") \
            .withColumnRenamed("create_date", "old_create_date") \
            .withColumnRenamed("update_date", "old_update_date")    

### **Applying join with the old records**

In [0]:
df_join = df.join(df_old, df["customer_id"] == df_old["old_customer_id"], how = "left")

df_join.display()

### **Seperating New Vs Old Records**

In [0]:
df_new = df_join.filter(df_join["old_dimCustomerKey"].isNull()) 
            

In [0]:
df_old = df_join.filter(df_join["old_dimCustomerKey"].isNotNull())

### **Preparing df_old**

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop("old_customer_id",  "old_update_date")

# Renaming "old_customer_key" to "dimCustomerKey"
df_old = df_old.withColumnRenamed("old_dimCustomerKey", "dimCustomerKey")

# Renaming "old_create_date" to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

# Recreate the "old_update_date" column
df_old = df_old.withColumn("update_date", current_timestamp())

df_old.display()


### **Preparing df_new**

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop("old_dimCustomerKey", "old_customer_id", "old_create_date", "old_update_date")


# Recreating the "create_date"  and "update_date" columns
df_new = df_new.withColumn("create_date", current_timestamp())

df_new = df_new.withColumn("update_date", current_timestamp())


### **Surrogate Key - From 1**

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [0]:
df_new = df_new.withColumn("dimCustomerKey", monotonically_increasing_id() + lit(1))
df_new.display()

### **Adding Max Surrogate key**

In [0]:
# Create widget
dbutils.widgets.text("init_load_flag", "0", "Init Load Flag")

# Get the value as an integer
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

# Use it in logic
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("SELECT max(dimCustomerKey) AS max_surrogate_key FROM `databricks-catas`.gold.DimCustomers")

    # converting max_sur to max_surrogate_key variable
    max_surrogate_key = df_maxsur.collect()[0]["max_surrogate_key"] 


In [0]:
df_new = df_new.withColumn("dimCustomerKey", lit(max_surrogate_key) + col("dimCustomerKey"))

df_new.display()

### **union of df_old and df_new**



In [0]:
df_final  = df_new.unionByName(df_old)

df_final.display()

### **SCD - Type 1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("`databricks-catas`.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@datalakeadbete.dfs.core.windows.net/DimCustomers")
    
    dlt_obj.alias("target").merge(df_final.alias("source"), "target.dimCustomerKey = source.dimCustomerKey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_final.write.mode("overwrite")\
        .format("delta")\
        .option("path", "abfss://gold@datalakeadbete.dfs.core.windows.net/DimCustomers")\
        .saveAsTable("`databricks-catas`.gold.DimCustomers")



In [0]:
%sql
select * from `databricks-catas`.gold.DimCustomers