In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import *
p_init_load = int(dbutils.widgets.get("p_init_load"))

In [0]:
df = spark.sql("SELECT * FROM `databricks-catalog`.silver.silver_customers")

df.limit(10).display()

In [0]:
df.dropDuplicates(subset=['customer_id'])

spark.sql("SELECT customer_id, COUNT(*) FROM `databricks-catalog`.silver.silver_customers GROUP BY customer_id").limit(5).display()

## **Dividing New vs Old Records**

In [0]:
if p_init_load == 0:

    df_old = spark.sql('''
                       SELECT 
                       DimCustomerKey, 
                       Customer_id,
                       Create_date, 
                       Update_date 
                       FROM `databricks-catalog`.gold.DimCustomers
                       ''')
else:

    df_old = spark.sql('''
                       SELECT 
                       0 DimCustomerKey, 
                       0 Customer_id, 
                       0 Create_date, 
                       0 Update_date
                       WHERE 1 = 0 
                       ''')

In [0]:
df_old.display()

## **Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "oldDimCustomerKey")\
              .withColumnRenamed("Customer_id", "oldCustomer_id")\
              .withColumnRenamed("Create_date", "oldCreate_date")\
              .withColumnRenamed("Update_date", "oldUpdate_date")

## **Applying Join with the old records**

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['oldCustomer_id'], 'left')

In [0]:
df_join.display()

## **Separating New vs Old Records**

In [0]:
df_new = df_join.filter(df_join['oldDimCustomerKey'].isNull())
df_old = df_join.filter(df_join['oldDimCustomerKey'].isNotNull())

In [0]:
df_new.display()

In [0]:
df_old.display()

## **Preparing df_old**

In [0]:
df_old = df_old.drop("oldCustomer_id", "oldUpdate_date")
df_old = df_old.withColumnRenamed("oldDimCustomerKey", "DimCustomerKey")

df_old = df_old.withColumnRenamed("oldCreate_date", "Create_date")
df_old = df_old.withColumn("Create_date", to_timestamp(col("Create_date")))

df_old = df_old.withColumn("Update_date", current_timestamp())


In [0]:
df_old.display()

## **Preparing df_new**

In [0]:
df_new = df_new.drop("oldDimCustomerKey", "oldCustomer_id", "oldUpdate_date", "oldCreate_date")

df_new = df_new.withColumn("Update_date", current_timestamp())
df_new = df_new.withColumn("Create_date", current_timestamp())

In [0]:
df_new.display()

## **Surrogate Key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

## **Surrogate Key - Max value**

In [0]:
if p_init_load == 1:
    max_surrogate_key = 0

else:
    df_max_surrogate_key = spark.sql('''
                                  SELECT MAX(DimCUstomerKey) AS max_surrogate_key
                                  FROM `databricks-catalog`.gold.DimCustomers
                                  ''')
    max_surrogate_key = df_max_surrogate_key.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key) + col("DimCustomerKey"))

In [0]:
df_new.display()

## **Union df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

## **SCD Type 1**

## **UPSERT Command**

In [0]:
if (spark.catalog.tableExists('`databricks-catalog`.gold.DimCustomers')):

    dlt_obj = DeltaTable.forPath(spark, 'abfss://gold@storagetosty.dfs.core.windows.net/DimCustomers')

    dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey")\
                        .whenMatchedUpdateAll()\
                        .whenNotMatchedInsertAll()\
                        .execute()

else:
    df_final.write.mode("overwrite")\
                .format("delta")\
                .option("path", "abfss://gold@storagetosty.dfs.core.windows.net/DimCustomers")\
                .saveAsTable("`databricks-catalog`.gold.DimCustomers")

In [0]:
%sql
SELECT * FROM `databricks-catalog`.gold.DimCustomers