# **Slowly Changing Dimension Builder**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
# ## Catalog Name
# catalog = "workspace"

# ## Key cols list
# key_cols = "['flight_id']"
# key_cols_list = eval(key_cols)


# ## CDC column
# cdc_col = "modifiedDate"

# ## Backdated Refresh
# backdated_refresh = ""

# ## Source Object
# source_object = "silver_flights"

# ## Source Schema
# source_schema = "silver"

# ## Target Schema
# target_schema = "gold"

# ## Target object
# target_object = "DimFlights"

# ## Surrogate Key
# surrogate_key = "DimFlightsKey"


In [0]:
## Catalog Name
catalog = "workspace"

## Key cols list
key_cols = "['passenger_id']"
key_cols_list = eval(key_cols)


## CDC column
cdc_col = "modifiedDate"

## Backdated Refresh
backdated_refresh = ""

## Source Object
source_object = "silver_customers"

## Source Schema
source_schema = "silver"

## Target Schema
target_schema = "gold"

## Target object
target_object = "DimCustomers"

## Surrogate Key
surrogate_key = "DimCustomersKey"


In [0]:
# ## Catalog Name
# catalog = "workspace"

# ## Key cols list
# key_cols = "['airport_id']"
# key_cols_list = eval(key_cols)


# ## CDC column
# cdc_col = "modifiedDate"

# ## Backdated Refresh
# backdated_refresh = ""

# ## Source Object
# source_object = "silver_airports"

# ## Source Schema
# source_schema = "silver"

# ## Target Schema
# target_schema = "gold"

# ## Target object
# target_object = "DimAirports"

# ## Surrogate Key
# surrogate_key = "DimAirportsKey"


## **Incremental Data Ingestion**

In [0]:
# No back date refresh
if len(backdated_refresh) == 0:
  # If Table exists, get the last load date
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    last_load = spark.sql(f"SELECT MAX({cdc_col}) FROM {catalog}.{target_schema}.{target_object}")
  else:
    last_load = "1900-01-01 00:00:00"
# If backdated refresh is provided, use that date
else:
  last_load = backdated_refresh

# Test to see if the last load
last_load

'1900-01-01 00:00:00'

In [0]:
last_load

'1900-01-01 00:00:00'

## **Read only Changed Source Data**

In [0]:
df_src = spark.sql(f"SELECT * FROM workspace.{source_schema}.{source_object} WHERE '{cdc_col}' > '{last_load}'")

## **Read Existing target or simulate empty for first time run**

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    # Key Columns String for Incremental
    key_cols_string_incremental = ", ".join(key_cols_list)

    df_trg = spark.sql(f"""SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date FROM {catalog}.{target_schema}.{target_object}""")

else:
    key_cols_string_init = [f" '' AS {col}" for col in key_cols_list]
    key_cols_string_init = ", ".join(key_cols_string_init)

    df_trg = spark.sql(f"""SELECT {key_cols_string_init}, CAST('0' AS INT) AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS create_date, CAST    ('1900-01-01 00:00:00' AS TIMESTAMP) AS update_date WHERE 1 = 0""")

df_trg.display()

passenger_id,DimCustomersKey,create_date,update_date


## **Applying and Peforming Left Join**

In [0]:
join_condition = " AND ".join(f"src.{col} = trg.{col}" for col in key_cols_list)

In [0]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(f"""
                    SELECT
                      src.*,
                      trg.{surrogate_key} AS {surrogate_key},
                      trg.create_date,
                      trg.update_date
                    FROM
                      src
                    LEFT JOIN
                      trg
                    ON
                      {join_condition}""")

**Split into old records and new records**

In [0]:
df_old_records = df_join.filter(col(f"{surrogate_key}").isNotNull())
df_new_records = df_join.filter(col(f"{surrogate_key}").isNull())


## **Updating old records with new update timestamp**

In [0]:
df_old_enr = df_old_records.withColumn('update_date', current_timestamp())
df_old_enr.display()

passenger_id,name,gender,nationality,modifiedDate,DimCustomersKey,create_date,update_date


## **Assigning Surrogate key and timestamps to new records**

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
  max_surrogate_key = spark.sql(f"""
                                SELECT 
                                  MAX({surrogate_key})
                                FROM  
                                  {catalog}.{target_schema}.{target_object}
                                """).collect()[0][0]
  df_new_enr = df_new_records.withColumn(f"{surrogate_key}", lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())\
                             .withColumn('create_date', current_timestamp())\
                             .withColumn('update_date', current_timestamp())

else:
  max_surrogate_key = 0  
  df_new_enr = df_new_records.withColumn(f"{surrogate_key}", lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())\
                             .withColumn('create_date', current_timestamp())\
                             .withColumn('update_date', current_timestamp())
  

## **Union Old destination records with new and merge to delta table**

In [0]:
df_union = df_old_enr.union(df_new_enr)

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()


else:
    df_union.write.format("delta")\
            .mode("append")\
            .saveAsTable(f"{catalog}.{target_schema}.{target_object}")