# SportsBar Incremental Order Fact Loading Pipeline
## Medallion Architecture: Daily Incremental Processing

### üìå NOTEBOOK OVERVIEW

This notebook processes daily incremental orders from landing zone through medallion layers, applying quality transformations, recalculating monthly aggregates for affected months, and upserting into parent company fact table.

**Pipeline Purpose:** Daily ETL job that loads new orders, validates/transforms them, recalculates impacted monthly aggregates, and merges with parent fact table.

**Execution Pattern:** Scheduled daily (morning, after orders arrive)

**Output:** `fmcg.gold.fact_orders` - Updated with yesterday's orders aggregated to monthly grain

### üîÑ INCREMENTAL PROCESSING FLOW

| Layer | Scope | Table Name | Pattern | Purpose |
|-------|-------|-----------|---------|---------|
| **Bronze** | New files only | `fmcg.bronze.orders` | Append | Preserve all order history |
| **Silver** | New orders | `fmcg.silver.orders` | Merge | Quality & standardization |
| **Gold (Staging)** | Daily records | `fmcg.gold.sb_fact_orders` | Merge | Raw fact records |
| **Gold (Parent)** | Monthly aggregate | `fmcg.gold.fact_orders` | Merge | Recalculate affected months |

### üéØ KEY DIFFERENCES FROM FULL LOAD

**Full Load (1_full_load_fact.ipynb):**
- Runs once to bootstrap pipeline
- Processes all historical order files
- Loads everything to monthly grain
- Initializes parent fact table

**Incremental Load (This notebook):**
- Runs daily as scheduled job
- Processes only new order files (landing/ folder)
- Archives processed files (processed/ folder)
- Recalculates only affected months
- Merges into existing parent data (preserves history)

### üí° INCREMENTAL LOAD STRATEGY

1. **Read new files** from `orders/landing/*.csv` to bronze
2. **Transform daily records** through silver quality checks
3. **Merge into fact** to capture individual order records
4. **Identify affected months** (months with new data)
5. **Recalculate aggregates** only for affected months
6. **Upsert to parent** maintaining historical monthly data
7. **Archive files** from landing to processed (prevents reprocessing)
8. **Cleanup staging** tables (temporary tables dropped)

## STEP 1: Import Required Libraries

PySpark SQL functions and Delta Lake modules for incremental ETL processing

In [0]:
# Import PySpark SQL functions for transformations
from pyspark.sql import functions as F
# Import DeltaTable for MERGE operations (upsert patterns)
from delta.tables import DeltaTable

## STEP 2: Load Project Utilities & Initialize Notebook Widgets

Import configuration and set up pipeline parameters for daily execution

In [0]:
# Load utilities - defines schema name constants
%run /Workspace/Project1/1_setup_catalog/utilities

In [0]:
# Verify schema constants loaded correctly
print(bronze_schema, silver_schema, gold_schema)

bronze silver gold


In [0]:
# Configure notebook widgets
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "orders", "Data Source")

# Get widget values
catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

# Configure directory paths for incremental processing
base_path = f"abfss://conatiner-de-practice@adlsgen2narayan.dfs.core.windows.net/{data_source}"
landing_path = f"{base_path}/landing/"      # NEW files to process
processed_path = f"{base_path}/processed/"  # Archive of processed files (prevents reprocessing)
print("Base Path: ", base_path)
print("Landing Path: ", landing_path)
print("Processed Path: ", processed_path)

# Define table references for medallion layers
bronze_table = f"{catalog}.{bronze_schema}.{data_source}"
silver_table = f"{catalog}.{silver_schema}.{data_source}"
gold_table = f"{catalog}.{gold_schema}.sb_fact_{data_source}"

Base Path:  abfss://conatiner-de-practice@adlsgen2narayan.dfs.core.windows.net/orders
Landing Path:  abfss://conatiner-de-practice@adlsgen2narayan.dfs.core.windows.net/orders/landing/
Processed Path:  abfss://conatiner-de-practice@adlsgen2narayan.dfs.core.windows.net/orders/processed/


## STEP 3: BRONZE LAYER - Incremental Order Ingestion

**Purpose:** Append new daily orders with metadata tracking

**Medallion Pattern:** Raw data capture with full lineage

**Key Metadata:**
- `read_timestamp` - Processing time
- `file_name` - Source file
- `file_size` - Data volume

**Mode:** APPEND (preserves entire order history)

**Output Table:** `fmcg.bronze.orders` (cumulative)

In [0]:
# Read NEW orders from landing zone (only today's files)
df = (
    spark.read.options(header=True, inferSchema=True)
    .csv(f"{landing_path}/*.csv")               # Read all CSVs in landing folder
    .withColumn("read_timestamp", F.current_timestamp())  # Add processing time
    .select("*", "_metadata.file_name", "_metadata.file_size")  # Include file metadata
)

# Report record count for monitoring
print("Total Rows: ", df.count())
df.show(5)

Total Rows:  9854
+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|     order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|
+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|FDEC818102602|Tuesday, December...|     789102|  25891502|    196.0|2025-11-30 16:21:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|     789102|  25891503|     NULL|2025-11-30 16:21:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|     789102|  25891602|    147.0|2025-11-30 16:21:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|     789102|  25891101|    337.0|2025-11-30 16:21:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|    INVALID|  25891202|    202.0|2025-11-30 16:21:...|orders_2025_12_16...|    23060|
+-----

In [0]:
# Append new orders to bronze table (cumulative, never overwrite)
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("append") \                               # APPEND mode: preserves history
 .saveAsTable(bronze_table)

### Create Staging Table for Just-Arrived Incremental Data

**Purpose:** Isolate today's orders for transformation pipeline

**Benefit:** Enables tracking of which orders are new vs historical

**Table:** `fmcg.bronze.staging_orders` (OVERWRITE each day with new data)

In [0]:
# Create staging table with ONLY today's arrived orders (not full bronze history)
# This allows us to track exactly which records are new vs which existed before
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \                            # Overwrite daily (reset staging)
 .saveAsTable(f"{catalog}.{bronze_schema}.staging_{data_source}")

### Archive Processed Files (Prevents Reprocessing)

**Purpose:** Move files from landing ‚Üí processed after ingestion

**Benefit:** Idempotent design - safe to retry without duplicate loading

**Process:** MOVE operation (atomic, removes from source)

In [0]:
# Move processed files from landing/ to processed/ directory
# This prevents reprocessing if job reruns and ensures exactly-once delivery semantics
files = dbutils.fs.ls(landing_path)
for file_info in files:
    dbutils.fs.mv(
        file_info.path,                          # Source: landing directory
        f"{processed_path}/{file_info.name}",   # Destination: processed archive
        True                                     # Overwrite if exists
    )

## STEP 4: SILVER LAYER - Daily Order Quality & Standardization

**Purpose:** Apply data quality transformations to incremental orders

**Medallion Pattern:** Clean, validate, and standardize to conformed schema

**Key Quality Steps (6-Step Process):**
1. ‚úÖ **Null Filter** - Keep only orders with quantities
2. ‚úÖ **Customer ID Validation** - Numeric check with fallback
3. ‚úÖ **Date Cleaning** - Remove weekday prefix
4. ‚úÖ **Multi-Format Date Parsing** - Handle 4 date formats
5. ‚úÖ **Deduplication** - Remove exact duplicates
6. ‚úÖ **Product Join** - Add product_code for fact joins

**Output Table:** `fmcg.silver.orders`

In [0]:
# Read staged orders (today's new orders only) to begin transformations
df_orders = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.staging_{data_source};")
df_orders.show(2)

+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|    order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|FDEC83622503|Monday, December ...|     789622|  25891302|     39.0|2025-11-30 16:22:...|orders_2025_12_01...|    21062|
|FDEC83622503|Monday, December ...|     789622|  25891301|     26.0|2025-11-30 16:22:...|orders_2025_12_01...|    21062|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
only showing top 2 rows


### Silver Layer Transformations (6-Step Quality Framework)

In [0]:
# QUALITY STEP 1Ô∏è‚É£: Keep only rows where order_qty is present
# Filter out records without quantities (incomplete orders)
df_orders = df_orders.filter(F.col("order_qty").isNotNull())

# QUALITY STEP 2Ô∏è‚É£: Clean customer_id ‚Üí keep numeric, else set to 999999
# Validates that customer_id matches expected format; fallback for invalid values
df_orders = df_orders.withColumn(
    "customer_id",
    F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id"))  # Numeric? Keep it
     .otherwise("999999")                                                   # Invalid? Use fallback
     .cast("string")                                                        # Ensure string type
)

# QUALITY STEP 3Ô∏è‚É£: Remove weekday name from the date text
# Pattern: "Tuesday, July 01, 2025" ‚Üí "July 01, 2025"
# Regex replaces leading weekday followed by comma and space
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.regexp_replace(F.col("order_placement_date"), r"^[A-Za-z]+,\s*", "")
)

# QUALITY STEP 4Ô∏è‚É£: Parse order_placement_date using multiple possible formats
# Try formats sequentially; coalesce returns first non-null result
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.coalesce(
        F.try_to_date("order_placement_date", "yyyy/MM/dd"),   # Format 1
        F.try_to_date("order_placement_date", "dd-MM-yyyy"),   # Format 2
        F.try_to_date("order_placement_date", "dd/MM/yyyy"),   # Format 3
        F.try_to_date("order_placement_date", "MMMM dd, yyyy"), # Format 4 (month name)
    )
)

# QUALITY STEP 5Ô∏è‚É£: Drop exact duplicates by order composition
# Business key: order_id + date + customer + product + quantity (identifies same order)
df_orders = df_orders.dropDuplicates(
    ["order_id", "order_placement_date", "customer_id", "product_id", "order_qty"]
)

# Convert product_id to string for join consistency with products table
df_orders = df_orders.withColumn('product_id', F.col('product_id').cast('string'))

In [0]:
# Verify date parsing worked - check min/max dates in dataset
df_orders.agg(
    F.min("order_placement_date").alias("min_date"),
    F.max("order_placement_date").alias("max_date")
).show()

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2025-12-01|2025-12-30|
+----------+----------+



### QUALITY STEP 6Ô∏è‚É£: Join with Products Dimension

**Purpose:** Add product_code (conformed key) for fact table joins

**Method:** Inner join on product_id (ensures valid products only)

In [0]:
# Load products dimension from silver layer (contains validated product_codes)
df_products = spark.table("fmcg.silver.products")
# Join orders with products to get standardized product_code
df_joined = df_orders.join(
    df_products,                                 # Right table: products
    on="product_id",                             # Join key: product_id
    how="inner"                                  # Inner join: keep only matched products
).select(df_orders["*"], df_products["product_code"])  # Keep all order columns + product_code

df_joined.show(5)

+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+--------------------+
|     order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|        product_code|
+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+--------------------+
| FDEC84420202|          2025-12-01|     999999|  25891201|    458.0|2025-11-30 16:22:...|orders_2025_12_01...|    21062|2e387cef1424d6e7b...|
| FDEC87522503|          2025-12-04|     789522|  25891403|    342.0|2025-11-30 16:22:...|orders_2025_12_04...|    22385|77b6f538a9d0e0cf8...|
| FDEC89522601|          2025-12-08|     789522|  25891403|    476.0|2025-11-30 16:22:...|orders_2025_12_08...|    21711|77b6f538a9d0e0cf8...|
|FDEC817203502|          2025-12-15|     789203|  25891203|    300.0|2025-11-30 16:22:...|orders_2025_12_15...|    20287|889c67757ece9c973...|

In [0]:
# Write to silver orders table
# If table doesn't exist, create it; otherwise, merge new records
if not (spark.catalog.tableExists(silver_table)):
    # First time: create new silver table
    df_joined.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable(silver_table)
else:
    # Incremental: merge new orders into existing table (avoid duplicates)
    silver_delta = DeltaTable.forName(spark, silver_table)
    silver_delta.alias("silver").merge(
        df_joined.alias("bronze"),
        # Match condition: same order with same date/customer/product/qty
        "silver.order_placement_date = bronze.order_placement_date AND silver.order_id = bronze.order_id AND silver.product_code = bronze.product_code AND silver.customer_id = bronze.customer_id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

### Create Silver Staging Table (Today's Transformed Orders)

In [0]:
# Create staging table with only today's transformed orders for downstream processing
df_joined.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \                             # Overwrite daily (reset staging)
 .saveAsTable(f"{catalog}.{silver_schema}.staging_{data_source}")

## STEP 5: GOLD LAYER - Daily Order Facts (Raw Grain)

**Purpose:** Store individual order records with daily grain (not aggregated)

**Medallion Pattern:** Select business columns, store at transaction level

**Grain:** Daily (one row per order)

**Output Table:** `fmcg.gold.sb_fact_orders`

In [0]:
# Read staged orders and prepare for gold layer
# Select columns needed for fact table; rename for clarity
df_gold = spark.sql(f"""
    SELECT 
        order_id,                                   # Order identifier
        order_placement_date as date,              # Order date (renamed for clarity)
        customer_id as customer_code,              # Customer identifier
        product_code,                              # Conformed product key
        product_id,                                # Original product ID
        order_qty as sold_quantity                 # Order quantity (renamed)
    FROM {catalog}.{silver_schema}.staging_{data_source};
""")

df_gold.show(2)

+------------+----------+-------------+--------------------+----------+-------------+
|    order_id|      date|customer_code|        product_code|product_id|sold_quantity|
+------------+----------+-------------+--------------------+----------+-------------+
|FDEC84420202|2025-12-01|       999999|2e387cef1424d6e7b...|  25891201|        458.0|
|FDEC87522503|2025-12-04|       789522|77b6f538a9d0e0cf8...|  25891403|        342.0|
+------------+----------+-------------+--------------------+----------+-------------+
only showing top 2 rows


In [0]:
# Count orders being processed (monitoring metric)
df_gold.count()

7736

In [0]:
# Write to gold facts table
# If doesn't exist, create; otherwise merge to avoid duplicates
if not (spark.catalog.tableExists(gold_table)):
    print("creating New Table")
    df_gold.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_table)
else:
    gold_delta = DeltaTable.forName(spark, gold_table)
    gold_delta.alias("source").merge(
        df_gold.alias("gold"),
        # Match condition: same order on same date with same customer/product
        "source.date = gold.date AND source.order_id = gold.order_id AND source.product_code = gold.product_code AND source.customer_code = gold.customer_code"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

## STEP 6: MERGE with Parent Company Fact Table (CRITICAL - Monthly Aggregation)

**Purpose:** Recalculate monthly aggregates for affected months and merge with parent

**Challenge:** Incremental data arrives daily, but fact table is monthly grain

**Solution:** 
1. Identify affected months (those with new data)
2. Recalculate ONLY those months (not entire history)
3. Merge into parent fact table

**Pattern:** Delta MERGE for efficient incremental aggregation

**Output Table:** `fmcg.gold.fact_orders` (monthly grain, parent dimension)

### Incremental Load Strategy: Calculate Only Affected Months

In [0]:
# Step 1: Identify WHICH MONTHS have new orders
# This allows us to recalculate only affected periods (huge performance optimization)

df_child = spark.sql(f"SELECT order_placement_date as date FROM {catalog}.{silver_schema}.staging_{data_source}")

# Extract month boundaries and get distinct months with new data
incremental_month_df = df_child.select(
    F.trunc("date", "MM").alias("start_month")  # Truncate to first of month
).distinct()

incremental_month_df.show()

# Create temp view for use in SQL query
incremental_month_df.createOrReplaceTempView("incremental_months")

+-----------+
|start_month|
+-----------+
| 2025-12-01|
+-----------+



In [0]:
# Step 2: Get ALL orders (historical + new) for affected months only
# This ensures we recalculate complete monthly aggregates (not just deltas)

monthly_table = spark.sql(f"""
    SELECT date, product_code, customer_code, sold_quantity
    FROM {catalog}.{gold_schema}.sb_fact_orders sbf
    INNER JOIN incremental_months m
        ON trunc(sbf.date, 'MM') = m.start_month  -- Match affected month
""")

print("Total Rows: ", monthly_table.count())
monthly_table.show(10)

Total Rows:  7736
+----------+--------------------+-------------+-------------+
|      date|        product_code|customer_code|sold_quantity|
+----------+--------------------+-------------+-------------+
|2025-12-01|2e387cef1424d6e7b...|       999999|        458.0|
|2025-12-04|77b6f538a9d0e0cf8...|       789522|        342.0|
|2025-12-08|77b6f538a9d0e0cf8...|       789522|        476.0|
|2025-12-15|889c67757ece9c973...|       789203|        300.0|
|2025-12-16|ee1f7df9cf660ef02...|       789101|        221.0|
|2025-12-16|889c67757ece9c973...|       789101|        393.0|
|2025-12-21|ee1f7df9cf660ef02...|       789622|        190.0|
|2025-12-05|e91ba9d665f90254d...|       789303|        462.0|
|2025-12-14|889c67757ece9c973...|       789702|        322.0|
|2025-12-23|e91ba9d665f90254d...|       789201|        343.0|
+----------+--------------------+-------------+-------------+
only showing top 10 rows


In [0]:
# Verify which months are being recalculated
monthly_table.select('date').distinct().orderBy('date').show()

+----------+
|      date|
+----------+
|2025-12-01|
|2025-12-02|
|2025-12-03|
|2025-12-04|
|2025-12-05|
|2025-12-06|
|2025-12-07|
|2025-12-08|
|2025-12-09|
|2025-12-10|
|2025-12-11|
|2025-12-12|
|2025-12-13|
|2025-12-14|
|2025-12-15|
|2025-12-16|
|2025-12-17|
|2025-12-18|
|2025-12-19|
|2025-12-20|
+----------+
only showing top 20 rows


In [0]:
# Step 3: Aggregate daily orders to monthly grain
# Group by: month_start, product_code, customer_code
# Aggregate: SUM(sold_quantity) for each group

df_monthly_recalc = (
    monthly_table
    .withColumn("month_start", F.trunc("date", "MM"))  # First day of month
    .groupBy("month_start", "product_code", "customer_code")  # Monthly aggregation key
    .agg(F.sum("sold_quantity").alias("sold_quantity"))  # Aggregate quantities
    .withColumnRenamed("month_start", "date")   # Rename: month_start ‚Üí date = first of month
)

df_monthly_recalc.show(10, truncate=False)

+----------+----------------------------------------------------------------+-------------+-------------+
|date      |product_code                                                    |customer_code|sold_quantity|
+----------+----------------------------------------------------------------+-------------+-------------+
|2025-12-01|778c2a7aa27bfdb211fd5ece048de80d00fbf3d6924bd908d91054796ba16ab6|789402       |1096.0       |
|2025-12-01|778c2a7aa27bfdb211fd5ece048de80d00fbf3d6924bd908d91054796ba16ab6|789503       |1839.0       |
|2025-12-01|ee1f7df9cf660ef02c33037d8d6eb94cbefe8e7b84c306e9387f09b0cae0abae|789703       |1759.0       |
|2025-12-01|3cab59f05924285270313afcfe40a08983bb03dd88f432e34fc6336914c14345|789103       |686.0        |
|2025-12-01|889c67757ece9c973791dfbc2d47b026a3342cc7255e47a3170329d158e897c2|789402       |3340.0       |
|2025-12-01|0cb7b2f42657b625f754e833aa1cf6a967be26f17415f5342302ebb0e90c8a28|789321       |3765.0       |
|2025-12-01|2e387cef1424d6e7b162b45622d4b1a788

In [0]:
# Report monthly aggregation result size
df_monthly_recalc.count()

612

In [0]:
# Step 4: MERGE recalculated monthly aggregates into parent fact table
# This upserts the affected months while preserving all historical months

gold_parent_delta = DeltaTable.forName(spark, f"{catalog}.{gold_schema}.fact_orders")

gold_parent_delta.alias("parent_gold").merge(
    df_monthly_recalc.alias("child_gold"),
    # Match on: same month, same product, same customer
    "parent_gold.date = child_gold.date AND parent_gold.product_code = child_gold.product_code AND parent_gold.customer_code = child_gold.customer_code"
).whenMatchedUpdateAll(  # If exists: update with recalculated values
).whenNotMatchedInsertAll(  # If new: insert complete record
).execute()

# Result: Parent fact table now has updated aggregates for affected months
# Historical months (no new data) remain unchanged

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

## STEP 7: CLEANUP - Drop Temporary Staging Tables

**Purpose:** Clean up temporary tables used only during ETL execution

**Benefit:** Prevents confusion about stale staging data on next run

In [0]:
%sql
-- Drop bronze staging table (no longer needed after merge)
DROP TABLE IF EXISTS fmcg.bronze.staging_orders;

In [0]:
%sql
-- Drop silver staging table (no longer needed after merge)
DROP TABLE IF EXISTS fmcg.silver.staging_orders;