#### Importing Librarries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS databricks_catalog.gold

#### **Initial Load Flag Handling:**

The initial_load_flag should be set to 1 (TRUE) during the first ingestion of the data to indicate an initial load. Once the initial data load is successfully completed, this flag must be updated to 0 (FALSE) to reflect that subsequent ingestions are part of incremental or regular data loads. This logic should be consistently applied across both development and production environments to maintain uniformity in pipeline behavior.

In [0]:
initial_load_flag_value = int(dbutils.widgets.get('initial_load_flag'))

#### Data Reading From Silver

In [0]:
df_customers = spark.read.table("databricks_catalog.silver.ext_silver_customers")
# df_customers = spark.sql(" SELECT * FROM databricks_catalog.silver.ext_silver_customers")

In [0]:
df_customers.display()

#### Remove Duplicates in Primary Key

In [0]:
df_customers = df_customers.dropDuplicates(subset=['customer_id'])

## Dividing New Vs Old Records

In [0]:
# If initial_load_flag_value set to FALSE. It means, we are assuming that we already having the data in the table
if initial_load_flag_value == 0: 
    df_old = spark.sql(''' SELECT DimCustomerKey, customer_id, created_date, updated_date FROM databricks_catalog.gold.DimCustomers ''')


# If initial_load_flag_value set to TRUE. It means, we doesn't have any("DimCustomers") existing table in the "gold", so we need to create a pseudo(dummy) table for the old table.
else:
    df_old = spark.sql(''' SELECT 0 AS DimCustomerKey, 0 AS customer_id, 0 AS created_date, 0 AS updated_date FROM databricks_catalog.silver.ext_silver_customers WHERE 1=0 ''')  # This will give only columns with empty table

In [0]:
df_old.display()

#### Renaming Columns for df_old

we need to rename the add the suffix as old for df_old column to seggregate it from new records

In [0]:
df_old = df_old.withColumnRenamed('DimCustomerKey', 'old_DimCustomerKey') \
                .withColumnRenamed('customer_id', 'old_customer_id') \
                .withColumnRenamed('created_date','old_created_date') \
                .withColumnRenamed('updated_date', 'old_updated_date')
df_old.display()

#### Applying the joins


In [0]:
# When we join new data with old data, if we have nulls in old_DimCustomerKey that means it's a new record, otherwise old records appeard again with some changes(modified).
df_join = df_customers.join(df_old, df_customers['customer_id']==df_old['old_customer_id'], 'left')
df_join.limit(5).display()

#### Seperating New and Old Records

In [0]:
df_new = df_join.filter(df_join['old_DimcustomerKey'].isNull()) # This will bring only new records
df_old = df_join.filter(df_join['old_DimCustomerKey'].isNotNull()) # This will bring modified records

In [0]:
df_new.limit(5).display()

In [0]:
df_old.display()

#### Preparing df_old

In [0]:
# Dropping un-wanted columns

# if we are doing initial_load: Then, below columns are having Nulls. 
# And, old_DimCustomerKey can be added as a surrogate key in the next step.
# old_customer_id column is already is available as customer_id in df_new.
# old_created_date can be added as created_date in the df_new.
# old_updated_date is has to be updated with current timestamp. Hence, we dropped and re-added as current timestamp.

# if we are doing incremental_load: 
# And, old_DimCustomerKey is already is available as DimCustomerKey in df_old.
# old_customer_id column is already is available as customer_id in df_old.
# old_created_date is already is available as created_date in df_old.
# old_updated_date is already is available as "updated_date".It can be deleted and re-added with current timestamp.

df_old = df_old.drop('old_customer_id', 'old_updated_date')


# Renaming "old_DimCustomerKeyand" as "old_DimCustomerKey" and"old_created_date" as "created_date"

df_old = df_old.withColumnRenamed('old_DimCustomerKey','DimCustomerKey') \
                .withColumnRenamed('old_created_date','created_date') \
                .withColumn('created_date', to_timestamp('created_date'))

# Creating updated_date column
df_old = df_old.withColumn('updated_date', current_timestamp())

In [0]:
df_old.display()

#### Preparing df_new

In [0]:
# We can drop all columns from suffix "old"
# We need to create new columns for created_date and updated_date for the new records irresepctive of initial load and incremental load. Since, we never get these values from thr source it self.

df_new = df_new.drop('old_DimCustomerKey', 'old_customer_id', 'old_created_date','old_updated_date')

df_new = df_new.withColumn('created_date', current_timestamp()) \
                .withColumn('updated_date', current_timestamp())


In [0]:
df_new.limit(5).display()

## Creating Surrogate Key 
We want to start the values from 1

In [0]:
df_new = df_new.withColumn('DimCustomerKey', monotonically_increasing_id()+lit(1))

In [0]:
df_new.limit(5).display()

#### Adding Max Surrogate Key

In [0]:
if initial_load_flag_value == 1:
    max_surrogate_value = 0
else:
    df_max_surrogate_value = spark.sql(''' SELECT max(DimCustomerKey) AS max_surrogate_key FROM databricks_catalog.gold.DimCustomers''')
    max_surrogate_value = df_max_surrogate_value.collect()[0].max_surrogate_key

In [0]:
df_new = df_new.withColumn('DimCustomerKey', max_surrogate_value+col('DimCustomerKey'))

#### Union the df_old and df_customers(df_new)
This union will bring new records and modified records together

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.limit(5).display()

## SCD Type -1

In [0]:
from delta.tables import DeltaTable

In [0]:
if initial_load_flag_value == 1:
    df_final.write.format('delta') \
                .mode('overwrite') \
                .option('path','abfss://gold@dldatabricks.dfs.core.windows.net/DimCustomers') \
                .saveAsTable('databricks_catalog.gold.DimCustomers')

else:
    exist_df = DeltaTable.forPath(spark, 'abfss://gold@dldatabricks.dfs.core.windows.net/DimCustomers')

    exist_df.alias('trg').merge(df_final.alias('src'), "trg.DimCustomerKey==src.DimCustomerKey") \
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()


In [0]:
# if spark.catalog.tableExists("databricks_catalog.gold.DimCustomers"):

#     exist_df = DeltaTable.forPath(spark, 'abfss://gold@dldatabricks.dfs.core.windows.net/DimCustomers')

#     exist_df.alias('trg').merge(df_final.alias('src'), "trg.DimCustomerKey==src.DimCustomerKey") \
#             .whenMatchedUpdateAll()\
#             .whenNotMatchedInsertAll()\
#             .execute()

# else:

#     df_final.write.format('delta') \
#         .mode('overwrite') \
#         .option('path','abfss://gold@dldatabricks.dfs.core.windows.net/Dimcustomers') \
#         .saveAsTable('databricks_catalog.gold.DimCustomers')



In [0]:
df_test = spark.sql(" SELECT * FROM databricks_catalog.gold.DimCustomers")
df_test.display()
