In [0]:
# spark.sql("""
# INSERT INTO workspace.silver.silver_products
# (product_sk, product_id, product_name, category, brand, origin_location, modified_date)
# VALUES
#  (220, 'PROD_00220', 'Sports Bottle', 'Sports & Outdoors', 'Reebok', 'Denver', current_timestamp())
# """)

# 🥇 Incremental Dimension Load (SCD) with PySpark and Delta Lake

## Objective
This notebook implements an **incremental load** process to keep the dimensional tables in the `gold` schema up to date, based on the data from the tables in the `silver` schema.  

The flow is designed following the **Slowly Changing Dimension** (SCD) pattern, updating only the records that have changed since the last load and adding new ones, thus optimizing performance and reducing unnecessary processing.

---


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Process Parameters Configuration

The configuration variables necessary for the incremental load process are defined.  
Each parameter determines how and from where the data will be read, as well as where it will be written to the target.

- **`catalog`**  
  - Working catalog where the tables are registered.  
  - Allows fully qualifying tables using the form `catalog.schema.table`.

- **`key_cols_list`**  
  - List of **natural key columns** that uniquely identify a record in the dimension.  

- **`cdc_col`** (*Change Data Capture column*)  
  - Column used as a **change control** (*Change Data Capture*).  
  - Indicates the date and time of the last modification of each record, and is used to filter only new or updated data.  

- **`backdate_refresh`**  
  - Optional field to force a reload from a specific date.  
  - If empty (`""`), the last load date detected in the target will be used.  
  - If a date is provided (`"YYYY-MM-DD HH:MM:SS"`), it will be used as the starting point to reload data.

- **`source_object`**  
  - Name of the source table in the `silver` schema containing the data.  

- **`source_schema`**  
  - Name of the schema where the source table resides (`silver`).  

- **`target_schema`**  
  - Name of the schema where the dimension will be stored (`gold`).  

- **`target_object`**  
  - Name of the target table that will hold the dimension.  

- **`surrogate_key`**  
  - Name of the **surrogate key** column used in the dimension to assign a unique internal identifier.  


In [0]:
catalog = "workspace"
key_cols = "['product_id']"
key_cols_list = eval(key_cols)
cdc_col  = "modified_date"
backdate_refresh = ""
source_object = "silver_products"
source_schema = "silver"
target_schema = "gold"
target_object = "DimProducts"
surrogate_key = "product_sk"

In [0]:
# import json

# raw_input = dbutils.widgets.get("input")
# dim = json.loads(raw_input)

# catalog = dim["catalog"]
# key_cols_list = dim["key_cols_list"]
# cdc_col = dim["cdc_col"]
# backdate_refresh = dim["backdate_refresh"]
# source_object = dim["source_object"]
# source_schema = dim["source_schema"]
# target_schema = dim["target_schema"]
# target_object = dim["target_object"]
# surrogate_key = dim["surrogate_key"]

`last_load` is calculated, which is the date from which changes will be read:

- If `backdate_refresh` is empty:
  - If the target table exists → takes the maximum value of `modified_date`.
  - If it does not exist → uses `"1900-01-01 00:00:00"` to force a full load.
- If `backdate_refresh` has a value → that date is used directly.

This controls whether the load will be **incremental** or **forced**.


In [0]:
if len(backdate_refresh) == 0:
    if spark.catalog.tableExists(f"{target_schema}.{target_object}"):
        last_load = spark.sql(f"select max({cdc_col}) from workspace.{target_schema}.{target_object}").collect()[0][0]
    else:
        last_load = "1900-01-01 00:00:00"
else:
    last_load = backdate_refresh

In [0]:
last_load

datetime.datetime(2025, 8, 19, 18, 16, 31, 107000)

The source table in the `silver` schema is queried,  
filtering only the records whose `modified_date` is greater than `last_load`.  

The result (`df_src`) represents **only the new or modified data** since the last load,  
and is displayed on screen for validation.


In [0]:
df_src = spark.sql(f"SELECT * FROM {catalog}.{source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

df_src.display()


product_sk,product_id,product_name,category,brand,origin_location,modified_date
214,PROD_00215,Golf Clubs Set,Sports & Outdoors,Callaway,Phoenix,2025-08-19T21:02:10.523Z
208,PROD_00208,Training Pants,Sports & Outdoors,Reebok,San Francisco,2025-08-19T21:02:10.523Z
78,PROD_00078,Ultraboost 22,Clothing,Adidas,San Francisco,2025-08-19T21:02:10.523Z
212,PROD_00212,Cycling Helmet,Sports & Outdoors,Giro,Denver,2025-08-19T21:02:10.523Z
3,PROD_00003,PlayStation 5,Electronics,Sony,San Francisco,2025-08-19T21:02:10.523Z
211,PROD_00211,Boxing Gloves,Sports & Outdoors,Everlast,Chicago,2025-08-19T21:02:10.523Z
10,PROD_00010,QLED 4K TV,Electronics,Samsung,San Francisco,2025-08-19T21:02:10.523Z
213,PROD_00213,Climbing Rope,Sports & Outdoors,Black Diamond,Boulder,2025-08-19T21:02:10.523Z
216,PROD_00217,Skateboard,Sports & Outdoors,Element,Santa Cruz,2025-08-19T21:02:10.523Z
215,PROD_00216,Hiking Boots,Sports & Outdoors,Merrell,Portland,2025-08-19T21:02:10.523Z



The current dimension data is loaded into `df_target`:

- **If the target table exists**  
  → The natural key columns, surrogate key, and creation/update dates are selected.

- **If it does not exist**  
  → An empty DataFrame is created with the same structure (empty keys, surrogate key set to `0`, initial dates `"1900-01-01"`).  
  This allows the flow to work even on the **first load**.


In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    key_cols_string_incremental = ", ".join(key_cols_list)
    df_target = spark.sql(f"SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date FROM {catalog}.{target_schema}.{target_object}")
else:
    key_cols_string_init =  [f"'' AS {i}" for i in key_cols_list]
    key_cols_string_init = ", ".join(key_cols_string_init)
    df_target = spark.sql(f"SELECT {key_cols_string_init}, 0 AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS timestamp) AS create_date, CAST('1900-01-01 00:00:00' AS timestamp) AS update_date WHERE 1=0")


In [0]:
df_target.display()

product_id,product_sk,create_date,update_date
PROD_00001,1,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00002,2,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00003,3,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00004,4,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00005,5,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00006,6,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00007,7,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00008,8,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00009,9,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
PROD_00010,10,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z


1. **Creation of temporary views**  
   - **`src`** → new or modified data obtained from the source table.  
   - **`target`** → current dimension records in the target table.

2. **Construction of the join condition**  
   - The natural key columns (`key_cols_list`) are used to link source and target records.

3. **LEFT JOIN** between `src` and `target`  
   - Returns **all** records from `src`.  
   - If a `src` record also exists in `target` (natural keys match), its `create_date` and `update_date` values are retrieved.  
   - If it does not exist in `target`, these fields remain null.


In [0]:
df_src.createOrReplaceTempView("src")
df_target.createOrReplaceTempView("target")

In [0]:
join_condition = ' AND '.join([f"src.{i} = target.{i}" for i in key_cols_list])

df_join = spark.sql(f"""
    SELECT src.*, target.create_date, target.update_date
    FROM src
    LEFT JOIN target
    ON ({join_condition})
""")

In [0]:
df_join.display()

product_sk,product_id,product_name,category,brand,origin_location,modified_date,create_date,update_date
10,PROD_00010,QLED 4K TV,Electronics,Samsung,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
208,PROD_00208,Training Pants,Sports & Outdoors,Reebok,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
3,PROD_00003,PlayStation 5,Electronics,Sony,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
186,PROD_00186,Yoga Mat,Sports & Outdoors,Nike,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
78,PROD_00078,Ultraboost 22,Clothing,Adidas,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
214,PROD_00215,Golf Clubs Set,Sports & Outdoors,Callaway,Phoenix,2025-08-19T21:02:10.523Z,,
215,PROD_00216,Hiking Boots,Sports & Outdoors,Merrell,Portland,2025-08-19T21:02:10.523Z,,
211,PROD_00211,Boxing Gloves,Sports & Outdoors,Everlast,Chicago,2025-08-19T21:02:10.523Z,,
216,PROD_00217,Skateboard,Sports & Outdoors,Element,Santa Cruz,2025-08-19T21:02:10.523Z,,
212,PROD_00212,Cycling Helmet,Sports & Outdoors,Giro,Denver,2025-08-19T21:02:10.523Z,,



4. **Post-classification**  
   - **`df_old`** → Records found in the target (`create_date` not null) → will be **updated**.  
   - **`df_new`** → Records not existing in the target (`create_date` null) → will be **inserted**.


In [0]:
df_old = df_join.filter(col("create_date").isNotNull())
df_new = df_join.filter(col("create_date").isNull())

In [0]:
df_old.display()

product_sk,product_id,product_name,category,brand,origin_location,modified_date,create_date,update_date
3,PROD_00003,PlayStation 5,Electronics,Sony,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
10,PROD_00010,QLED 4K TV,Electronics,Samsung,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
78,PROD_00078,Ultraboost 22,Clothing,Adidas,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
186,PROD_00186,Yoga Mat,Sports & Outdoors,Nike,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z
208,PROD_00208,Training Pants,Sports & Outdoors,Reebok,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T19:36:46.402Z


In [0]:
df_new.display()

product_sk,product_id,product_name,category,brand,origin_location,modified_date,create_date,update_date
214,PROD_00215,Golf Clubs Set,Sports & Outdoors,Callaway,Phoenix,2025-08-19T21:02:10.523Z,,
215,PROD_00216,Hiking Boots,Sports & Outdoors,Merrell,Portland,2025-08-19T21:02:10.523Z,,
211,PROD_00211,Boxing Gloves,Sports & Outdoors,Everlast,Chicago,2025-08-19T21:02:10.523Z,,
216,PROD_00217,Skateboard,Sports & Outdoors,Element,Santa Cruz,2025-08-19T21:02:10.523Z,,
212,PROD_00212,Cycling Helmet,Sports & Outdoors,Giro,Denver,2025-08-19T21:02:10.523Z,,
213,PROD_00213,Climbing Rope,Sports & Outdoors,Black Diamond,Boulder,2025-08-19T21:02:10.523Z,,


- **`df_old_enr`** → Existing records, only `update_date` is updated with the current date/time.  
- **`df_new_enr`** → New records, both `create_date` and `update_date` are set to the current date/time.  

Finally, **`df_union`** combines both sets to prepare the load into the target table.


In [0]:
df_old_enr = df_old.withColumn("update_date", current_timestamp())

In [0]:
df_old_enr.display()

product_sk,product_id,product_name,category,brand,origin_location,modified_date,create_date,update_date
208,PROD_00208,Training Pants,Sports & Outdoors,Reebok,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:06.109Z
78,PROD_00078,Ultraboost 22,Clothing,Adidas,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:06.109Z
3,PROD_00003,PlayStation 5,Electronics,Sony,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:06.109Z
10,PROD_00010,QLED 4K TV,Electronics,Samsung,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:06.109Z
186,PROD_00186,Yoga Mat,Sports & Outdoors,Nike,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:06.109Z


- **Option A:** Use directly the surrogate keys already present in the Silver table (our case).  
- **Option B:** Generate surrogate keys in Gold when they do not exist in Silver, using the current `max(surrogate_key)` + an increment.


In [0]:
#Option A: If we already have surrogate key generated
df_new_enr = df_new.withColumn("create_date", current_timestamp()) \
                  .withColumn("update_date", current_timestamp())

In [0]:
#Option B: If we don't have surrogate key generated
# if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
#     max_surrogate_key = spark.sql(
#         f"SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}"
#     ).collect()[0][0]
#     if max_surrogate_key is None:
#         max_surrogate_key = 0
# else:
#     max_surrogate_key = 0

# df_new_enr = (
#     df_new.withColumn(
#         surrogate_key,
#         lit(max_surrogate_key) + lit(1) + monotonically_increasing_id()
#     )
#     .withColumn("create_date", current_timestamp())
#     .withColumn("update_date", current_timestamp())
# )

In [0]:
df_new_enr.display()

product_sk,product_id,product_name,category,brand,origin_location,modified_date,create_date,update_date
214,PROD_00215,Golf Clubs Set,Sports & Outdoors,Callaway,Phoenix,2025-08-19T21:02:10.523Z,2025-08-19T21:16:11.107Z,2025-08-19T21:16:11.107Z
215,PROD_00216,Hiking Boots,Sports & Outdoors,Merrell,Portland,2025-08-19T21:02:10.523Z,2025-08-19T21:16:11.107Z,2025-08-19T21:16:11.107Z
211,PROD_00211,Boxing Gloves,Sports & Outdoors,Everlast,Chicago,2025-08-19T21:02:10.523Z,2025-08-19T21:16:11.107Z,2025-08-19T21:16:11.107Z
216,PROD_00217,Skateboard,Sports & Outdoors,Element,Santa Cruz,2025-08-19T21:02:10.523Z,2025-08-19T21:16:11.107Z,2025-08-19T21:16:11.107Z
212,PROD_00212,Cycling Helmet,Sports & Outdoors,Giro,Denver,2025-08-19T21:02:10.523Z,2025-08-19T21:16:11.107Z,2025-08-19T21:16:11.107Z
213,PROD_00213,Climbing Rope,Sports & Outdoors,Black Diamond,Boulder,2025-08-19T21:02:10.523Z,2025-08-19T21:16:11.107Z,2025-08-19T21:16:11.107Z


In [0]:
df_union = df_old_enr.unionByName(df_new_enr)

In [0]:
df_union.display()

product_sk,product_id,product_name,category,brand,origin_location,modified_date,create_date,update_date
208,PROD_00208,Training Pants,Sports & Outdoors,Reebok,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:15.150Z
78,PROD_00078,Ultraboost 22,Clothing,Adidas,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:15.150Z
3,PROD_00003,PlayStation 5,Electronics,Sony,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:15.150Z
10,PROD_00010,QLED 4K TV,Electronics,Samsung,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:15.150Z
186,PROD_00186,Yoga Mat,Sports & Outdoors,Nike,San Francisco,2025-08-19T21:02:10.523Z,2025-08-19T19:36:46.402Z,2025-08-19T21:16:15.150Z
214,PROD_00215,Golf Clubs Set,Sports & Outdoors,Callaway,Phoenix,2025-08-19T21:02:10.523Z,2025-08-19T21:16:15.150Z,2025-08-19T21:16:15.150Z
215,PROD_00216,Hiking Boots,Sports & Outdoors,Merrell,Portland,2025-08-19T21:02:10.523Z,2025-08-19T21:16:15.150Z,2025-08-19T21:16:15.150Z
211,PROD_00211,Boxing Gloves,Sports & Outdoors,Everlast,Chicago,2025-08-19T21:02:10.523Z,2025-08-19T21:16:15.150Z,2025-08-19T21:16:15.150Z
216,PROD_00217,Skateboard,Sports & Outdoors,Element,Santa Cruz,2025-08-19T21:02:10.523Z,2025-08-19T21:16:15.150Z,2025-08-19T21:16:15.150Z
212,PROD_00212,Cycling Helmet,Sports & Outdoors,Giro,Denver,2025-08-19T21:02:10.523Z,2025-08-19T21:16:15.150Z,2025-08-19T21:16:15.150Z


### **Writing to the Target Table with Delta Lake (Upsert)**

- **If the target table exists**  
  1. It is obtained as a Delta object (`dlt_object`).  
  2. A **MERGE** is executed between:
     - **`src`** → combined data (`df_union`).
     - **`trg`** → target table.
  3. MERGE conditions:
     - **Match by surrogate key** (`product_sk`).  
     - **When matched** → update all fields if `modified_date` from `src` is greater than or equal to that of `trg`.  
     - **When not matched** → insert the full record.

- **If the target table does not exist**  
  → It is created by writing `df_union` in Delta format, in `append` mode.

This step ensures that the dimension is updated incrementally.


In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    dlt_object = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_object.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_union.write.format("delta")\
        .mode("append")\
        .saveAsTable(f"{catalog}.{target_schema}.{target_object}")
