##  Reading

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("init_load_flag", "0")

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))
display(init_load_flag)

In [0]:
df = spark.sql(" select * from databricks_cata.silver.customers_silver")

In [0]:
df.display()

In [0]:
df = df.dropDuplicates(subset=["customer_id"])
df.display()

## dividing new and old records

In [0]:
if init_load_flag == 0:
  df_old = spark.sql('''
                     select Dimcustomerkey, customer_id, create_date, update_date
                     from databricks_cata.gold.Dimcustomers''')
else:
  df_old = spark.sql('''
                     select 0 Dimcustomerkey, 0 customer_id, 0 create_date, 0 update_date
                     from databricks_cata.silver.customers_silver where 1=0''')


In [0]:
df_old.display()

## # Reanaming columns of df_old

In [0]:
df_old = df_old.withColumnRenamed("Dimcustomerkey", "old_Dimcustomerkey")\
                .withColumnRenamed("customer_id", "old_customer_id")\
                .withColumnRenamed("create_date", "old_create_date")\
                .withColumnRenamed("update_date", "old_update_date")

## applying join with old records

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')

In [0]:
df_join.display()

## surrogate key on all the values

In [0]:
df = df.withColumn("Dimcustomerkey", monotonically_increasing_id()+lit(1))
df.limit(10).display()


## seperating new and old records

In [0]:
df_new = df_join.filter(df_join["old_Dimcustomerkey"].isNull())

In [0]:
df_old = df_join.filter(df_join["old_Dimcustomerkey"].isNotNull())

## preparing old_df

In [0]:
# dropping all the columns which are not required
df_old = df_old.drop( "old_customer_id", "old_update_date")

# renaming old_Dimcustomerkey to Dimcustomerkey
df_old = df_old.withColumnRenamed("old_Dimcustomerkey", "Dimcustomerkey")

# Renaming old_create_date column to create date
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

# recreating the update column to current time stamp
df_old = df_old.withColumn("update_date", current_timestamp())


In [0]:
df_old.display()

## preparing new_df

In [0]:
# dropping all the columns which are not required
df_new= df_new.drop("old_Dimcustomerkey", "old_customer_id", "old_update_date", "old_create_date")

# recreating the update_date , current_date column to current time stamp
df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp()) 

In [0]:
df_new.display()

## surrogate key from 1

In [0]:
df_new = df_new.withColumn("Dimcustomerkey", monotonically_increasing_id()+lit(1))

In [0]:
 df_new.display()

## add max surrogate key

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur= spark.sql("select max(Dimcustomerkey) as max_surrogate_key from databricks_cata.gold.Dimcustomers")

    # converting df_maxsur into max_surrogate_key
    max_surrogate_key= df_maxsur.collect()[0]["max_surrogate_key"]


In [0]:
df_new = df_new.withColumn("Dimcustomerkey", lit(max_surrogate_key)+col("Dimcustomerkey"))

In [0]:
df_new.display()

**union of df_new and df_old**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

## SCD TYPE -1

In [0]:
from delta.tables import DeltaTable


In [0]:
if (spark.catalog.tableExists("databricks_cata.gold.Dimcustomers")):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databricksstorageadls.dfs.core.windows.net/Dimcustomers")
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.Dimcustomerkey ==src.Dimcustomerkey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_final.write.mode("overwrite")\
        .format("delta")\
            .option("path", "abfss://gold@databricksstorageadls.dfs.core.windows.net/Dimcustomers")\
            .saveAsTable("databricks_cata.gold.Dimcustomers")

In [0]:
%sql
select * from databricks_cata.gold.dimcustomers