# Project 2: Incremental ETL with CDC & SCD in Databricks  

In **Project 1**, we implemented a baseline medallion architecture (Bronze → Silver → Gold) on static CSV files of e-commerce data, focusing on cleaning and structuring. Those original CSVs remain available in Project 1 for reference.  

This follow-up project extends that work into a more realistic **incremental pipeline**, introducing advanced data engineering concepts:  
- **Incremental loads (daily feeds)** instead of static files.  
- **CDC (Change Data Capture)** for customers → keep only the latest state.  
- **SCD Type-2 (Slowly Changing Dimension)** for products → preserve history with valid date ranges.  
- **Fact tables with point-in-time joins** to attach the correct product version to each order.  
- **Incremental aggregates (Gold layer)** for KPIs such as revenue, order counts, and average order value.  

The pipeline is built using the **Databricks Lakehouse (PySpark + Delta Lake)** and simulates daily e-commerce data feeds.  


## Step 1 – Environment Setup
We start by selecting our working **catalog** (`workspace`) and **schema** (`default`) so all new tables are created in a consistent namespace.  
We also create a **`gold` schema** to hold final reporting tables.  

Finally, we configure Spark’s **time parser policy** to `LEGACY`. This makes date parsing more forgiving across mixed formats (e.g., `yyyy-MM-dd`, `MM/dd/yyyy`), which is important when handling messy raw data.  

In [0]:
# STEP 1: Setup
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA default")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

# Safer date parsing across mixed formats
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

## Step 2 – Simulate Day 1 Incoming Data  

Since the free Databricks edition does not allow automatic ingestion, we **simulate daily feeds** by manually creating small DataFrames for:  
- Customers  
- Products  
- Orders  
- Order Items  

This represents the **Day 1 incremental load**.  

Each DataFrame is written into a **Bronze staging table** (Delta format, `*_inc` suffix). These tables act as the raw incoming data for that day, ready to be merged into the Silver layer.  

👉 Expected outcome: 4 new Bronze tables are created (`bronze_customers_inc`, `bronze_products_inc`, `bronze_orders_inc`, `bronze_order_items_inc`).  

In [0]:
# STEP 2: Simulate Day 1 incoming data
from pyspark.sql import functions as F

cust_day1 = spark.createDataFrame([
    (1, "Lucas Garcia",  "WA",  "2022-05-23"),
    (2, "Oliver Martin", "SA",  "2022-06-13"),
    (3, "Leo Brown",     "ACT", "2023-07-24"),
], ["customer_id","customer_name","region","signup_date"])

prod_day1 = spark.createDataFrame([
    (1, "Sports Item 1", "SPORTS",          60.11),
    (2, "Sports Item 2", "SPORTS",         141.14),
    (3, "Home Item 3",   "HOME & KITCHEN",  74.07),
], ["product_id","product_name","category","list_price"])

orders_day1 = spark.createDataFrame([
    (1001, 1, "2025-03-29", "SHIPPED"),
    (1002, 2, "2025-03-29", "SHIPPED"),
], ["order_id","customer_id","order_date","status"])

items_day1 = spark.createDataFrame([
    (1,1001,1,2, 60.11),
    (2,1002,2,1,141.14),
], ["order_item_id","order_id","product_id","quantity","unit_price"])

# Write to Bronze incremental staging
cust_day1.write.mode("overwrite").format("delta").saveAsTable("bronze_customers_inc")
prod_day1.write.mode("overwrite").format("delta").saveAsTable("bronze_products_inc")
orders_day1.write.mode("overwrite").format("delta").saveAsTable("bronze_orders_inc")
items_day1.write.mode("overwrite").format("delta").saveAsTable("bronze_order_items_inc")

## Step 3 – Silver Customers (CDC Type-1 Upsert)

**What this does:**  
We keep a **latest snapshot** of each customer. If a customer already exists, we **update** their details; if not, we **insert** them. This is classic **CDC Type-1** (overwrite, no history).

**How it works:**  
- Create the target Delta table `silver_customers_cdc` (once).  
- Read today’s increment (`bronze_customers_inc`) and do light cleaning:
  - cast `customer_id` to INT  
  - `TRIM` names, `UPPER(TRIM(region))`  
- Use `MERGE INTO` on `customer_id`:
  - **MATCHED** → `UPDATE` (refresh fields)
  - **NOT MATCHED** → `INSERT` (new customer)

**Why Type-1 here:**  
We only need the **current** customer attributes (no history). It keeps the table simple for joining in Gold.

**Expected outcome:**  
`silver_customers_cdc` contains **one row per `customer_id`**, reflecting the latest name/region/signup_date. The `display(...)` shows the cleaned, up-to-date snapshot.

In [0]:
# STEP 3: Silver Customers CDC (Type-1)
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_customers_cdc (
  customer_id INT,
  customer_name STRING,
  region STRING,
  signup_date STRING
) USING delta
""")

# Upsert (insert new, update existing)
spark.sql("""
MERGE INTO silver_customers_cdc AS tgt
USING (
  SELECT
    CAST(customer_id AS INT) AS customer_id,
    TRIM(customer_name) AS customer_name,
    UPPER(TRIM(region)) AS region,
    signup_date
  FROM bronze_customers_inc
) AS src
ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN UPDATE SET
  tgt.customer_name = src.customer_name,
  tgt.region        = src.region,
  tgt.signup_date   = src.signup_date
WHEN NOT MATCHED THEN INSERT *
""")

#check customer CDC
display(
  spark.table("silver_customers_cdc")
       .orderBy("customer_id")
)

customer_id,customer_name,region,signup_date
1,Lucas Garcia,WA,2022-05-23
2,Oliver Martin,SA,2022-06-13
3,Leo Brown,ACT,2023-07-24


## Step 4 – Silver Products (SCD Type-2 with History)

**What this does:**  
We track **product changes over time** (name/category/price). Instead of overwriting, we **keep history** by closing the old row and inserting a new “current” row. This is **SCD Type-2**.

**How it works:**  
1) Create `silver_products_scd` with versioning columns:  
   - `valid_from`, `valid_to`, `is_current` (true = active version).  
2) Build two helper views:  
   - `current_products` → only active versions (`is_current = true`).  
   - `incoming_products` → today’s cleaned feed from Bronze.  
3) **Close changed rows (4a):**  
   - If an incoming product differs from the current one, update the current row: set `valid_to = current_date()` and `is_current = false`.  
4) **Insert new versions (4b):**  
   - Insert a row for any **new product** or **changed product** with `valid_from = current_date()`, `valid_to = 9999-12-31`, `is_current = true`.

**Why SCD2 here:**  
Product attributes (especially **price**) change, and we need correct values **as of the order date** later in Gold (point-in-time joins).

**Expected outcome:**  
`silver_products_scd` shows one **current** row per product plus older **historical** rows. Sorting by `product_id, valid_from` reveals the version timeline.  

In [0]:
# STEP 4: Silver Products SCD2

# Create SCD2 table if not exists
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_products_scd (
  product_id INT,
  product_name STRING,
  category STRING,
  list_price DOUBLE,
  valid_from DATE,
  valid_to   DATE,
  is_current BOOLEAN
) USING delta
""")

# Helper: current rows
spark.sql("""
CREATE OR REPLACE TEMP VIEW current_products AS
SELECT * FROM silver_products_scd WHERE is_current = true
""")

# Incoming cleaned
spark.sql("""
CREATE OR REPLACE TEMP VIEW incoming_products AS
SELECT
  CAST(product_id AS INT) AS product_id,
  TRIM(product_name) AS product_name,
  UPPER(TRIM(category)) AS category,
  CAST(list_price AS DOUBLE) AS list_price
FROM bronze_products_inc
""")

# 4a) Close changed rows
spark.sql("""
MERGE INTO silver_products_scd AS tgt
USING (
  SELECT c.product_id
  FROM current_products c
  JOIN incoming_products i ON c.product_id = i.product_id
  WHERE c.is_current = true AND (
    c.product_name <> i.product_name OR
    c.category     <> i.category     OR
    c.list_price   <> i.list_price
  )
) AS chg
ON tgt.product_id = chg.product_id AND tgt.is_current = true
WHEN MATCHED THEN UPDATE SET
  tgt.valid_to   = current_date(),
  tgt.is_current = false
""")

# 4b) Insert new versions (new or changed)
spark.sql("""
INSERT INTO silver_products_scd
SELECT
  i.product_id, i.product_name, i.category, i.list_price,
  current_date() AS valid_from,
  DATE '9999-12-31' AS valid_to,
  true AS is_current
FROM incoming_products i
LEFT JOIN current_products c ON i.product_id = c.product_id
WHERE c.product_id IS NULL
   OR (c.product_name <> i.product_name OR c.category <> i.category OR c.list_price <> i.list_price)
""")

#check products SCD2
display(
  spark.table("silver_products_scd")
       .orderBy("product_id","valid_from")
)
# Tip: also filter current only:
# display(spark.table("silver_products_scd").where("is_current = true"))

product_id,product_name,category,list_price,valid_from,valid_to,is_current
1,Sports Item 1,SPORTS,60.11,2025-09-29,9999-12-31,True
2,Sports Item 2,SPORTS,141.14,2025-09-29,9999-12-31,True
3,Home Item 3,HOME & KITCHEN,74.07,2025-09-29,9999-12-31,True


## Step 5 – Silver Orders & Order Items (Append + Clean)

**What this does:**  
We lightly **clean** today’s orders and line items, then **append** them into Silver tables. This keeps facts simple and fast: new transactions are treated as **new rows**.

**How it works:**  
- Cast IDs and numerics to the right types.  
- Trim and normalize strings (upper-case `status`).  
- Filter out bad rows (e.g., `quantity > 0`).  
- **Append** to `silver_orders` and `silver_order_items` (no MERGE, no dedupe).

**Why append-only here:**  
Orders/items are modeled as **events**. For this project, we’re not tracking order status history or overwriting prior rows. 

**Expected outcome:**  
New rows are added to `silver_orders` and `silver_order_items`, ready to be joined in Gold.

In [0]:
# STEP 5: Silver Orders & Items (simple append-clean)

from pyspark.sql import functions as F

# Create target if not exists
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_orders (
  order_id INT, customer_id INT, order_date STRING, status STRING
) USING delta
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_order_items (
  order_item_id INT, order_id INT, product_id INT, quantity INT, unit_price DOUBLE
) USING delta
""")

# Clean incoming and append to targets
o_inc = (spark.table("bronze_orders_inc")
          .withColumn("order_id", F.col("order_id").cast("int"))
          .withColumn("customer_id", F.col("customer_id").cast("int"))
          .withColumn("order_date", F.trim(F.col("order_date").cast("string")))
          .withColumn("status", F.upper(F.trim(F.col("status"))))
        )

i_inc = (spark.table("bronze_order_items_inc")
          .withColumn("order_item_id", F.col("order_item_id").cast("int"))
          .withColumn("order_id",      F.col("order_id").cast("int"))
          .withColumn("product_id",    F.col("product_id").cast("int"))
          .withColumn("quantity",      F.col("quantity").cast("int"))
          .withColumn("unit_price",    F.col("unit_price").cast("double"))
          .filter(F.col("quantity") > 0)
        )

# Append (no dedupe here)
(o_inc.write.mode("append").format("delta").saveAsTable("silver_orders"))
(i_inc.write.mode("append").format("delta").saveAsTable("silver_order_items"))

## Step 6 – Gold Fact (Point-in-Time Join to Product SCD)

**What this does:**  
Builds the **line-item fact** table and attaches the **correct product version as of the order date** (PIT = point-in-time).

**How it works:**  
- Load Silver tables: items (`i`), orders (`o`), customers (`c`), and product SCD2 (`p_scd`).  
- **Parse `order_date` safely** with regex-guarded formats → `order_date_parsed` (a true `DATE`).  
- Join `i`→`o`→`c` normally, then **PIT-join to products** using:
  - `i.product_id = p_scd.product_id` **and**
  - `order_date_parsed BETWEEN valid_from AND valid_to` (open-ended on `valid_to`).  
- Select metrics and attributes (e.g., `line_amount = quantity * unit_price`) and **write** to `gold.fact_order_item_pti`.

**Why this matters:**  
Because products can change (price/category), a simple join could attach the wrong attributes. The **PIT window** guarantees we use the **version that was active on the order date**.

**Expected outcome:**  
`gold.fact_order_item_pti` with one row per order item, enriched with **time-correct product attributes**, ready for daily KPI aggregation.

In [0]:
# STEP 6: Gold fact_order_item_pti (point-in-time product attributes)
from pyspark.sql import functions as F

i = spark.table("silver_order_items")
o = spark.table("silver_orders") \
    .withColumn("order_date", F.trim(F.col("order_date").cast("string"))) \
    .withColumn(
        "order_date_parsed",
        F.when(F.col("order_date").rlike(r'^\d{4}-\d{2}-\d{2}$'),
               F.to_date("order_date","yyyy-MM-dd"))
         .when(F.col("order_date").rlike(r'^\d{4}/\d{2}/\d{2}$'),
               F.to_date("order_date","yyyy/MM/dd"))
         .when(F.col("order_date").rlike(r'^\d{2}/\d{2}/\d{4}$'),
               F.to_date("order_date","MM/dd/yyyy"))
         .when(F.col("order_date").rlike(r'^\d{2}-\d{2}-\d{4}$'),
               F.to_date("order_date","dd-MM-yyyy"))
         .otherwise(F.lit(None).cast("date"))
    )
c = spark.table("silver_customers_cdc")
p_scd = spark.table("silver_products_scd")

fact_pti = (
    i.join(o, "order_id")
     .join(c, "customer_id")
     .join(
        p_scd,
        (i.product_id == p_scd.product_id) &
        (o.order_date_parsed >= p_scd.valid_from) &
        (o.order_date_parsed <  p_scd.valid_to),
        "left"
     )
     .select(
        "order_item_id","order_id","customer_id", i.product_id.alias("product_id"),
        o.order_date_parsed.alias("order_date"),
        "quantity","unit_price",
        (F.col("quantity")*F.col("unit_price")).alias("line_amount"),
        "product_name","category","list_price"
     )
)

(fact_pti.write.mode("overwrite").option("overwriteSchema","true")
 .format("delta").saveAsTable("gold.fact_order_item_pti"))

 #check gold fact table with point-in-time product attributes
display(
  spark.table("gold.fact_order_item_pti")
       .select("order_item_id","order_id","customer_id","product_id",
               "order_date","quantity","unit_price","line_amount",
               "product_name","category","list_price")
       .orderBy("order_id","order_item_id")
       .limit(20)
)

order_item_id,order_id,customer_id,product_id,order_date,quantity,unit_price,line_amount,product_name,category,list_price
117,46,1,3,2024-12-07,2,74.07,148.14,,,
118,46,1,9,2024-12-07,5,103.42,517.1,,,
119,46,1,22,2024-12-07,2,66.79,133.58,,,
120,46,1,14,2024-12-07,1,84.03,84.03,,,
124,49,1,20,2024-04-24,5,135.94,679.7,,,
125,49,1,10,2024-04-24,4,352.16,1408.64,,,
126,49,1,4,2024-04-24,4,339.97,1359.88,,,
153,61,3,14,2025-04-26,1,84.03,84.03,,,
154,61,3,7,2025-04-26,5,113.23,566.15,,,
155,61,3,12,2025-04-26,5,142.7,713.5,,,


## Step 7 – Gold Daily Aggregate (Incremental Upsert)

**What this does:**  
Aggregates the line-item fact into **daily KPIs** (orders, items, revenue, distinct customers, AOV) and **upserts** results by `order_date` so we can re-run for new days without rebuilding everything.

**How it works:**  
- Group `gold.fact_order_item_pti` by `order_date` to compute metrics and `aov = revenue / orders_cnt`.  
- Create the target table `gold.fact_daily_sales_inc` if it doesn’t exist.  
- Use `MERGE` on `order_date`:
  - **MATCHED** → `UPDATE` that day’s row (idempotent re-runs).  
  - **NOT MATCHED** → `INSERT` a new day.

**Why this matters:**  
This demonstrates an **incremental pattern**: add or refresh only the affected days, which is how real pipelines avoid full recomputes.

**Expected outcome:**  
`gold.fact_daily_sales_inc` contains one row per day with stable KPIs. The display shows a clean time series we can chart (e.g., revenue by date).

In [0]:
# STEP 7: Incremental daily aggregate with MERGE
from pyspark.sql import functions as F

f = spark.table("gold.fact_order_item_pti")

daily_new = (
  f.where(F.col("order_date").isNotNull())
   .groupBy("order_date")
   .agg(
     F.countDistinct("order_id").alias("orders_cnt"),
     F.count("*").alias("items_cnt"),
     F.sum("line_amount").alias("revenue"),
     F.countDistinct("customer_id").alias("distinct_customers"),
   )
   .withColumn("aov", F.round(F.col("revenue")/F.when(F.col("orders_cnt")!=0, F.col("orders_cnt")), 2))
)

spark.sql("""
CREATE TABLE IF NOT EXISTS gold.fact_daily_sales_inc (
  order_date DATE,
  orders_cnt BIGINT,
  items_cnt BIGINT,
  revenue DOUBLE,
  distinct_customers BIGINT,
  aov DOUBLE
) USING delta
""")

daily_new.createOrReplaceTempView("daily_new")
spark.sql("""
MERGE INTO gold.fact_daily_sales_inc AS tgt
USING daily_new AS src
ON tgt.order_date = src.order_date
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

# check gold daily aggregate table
display(
  spark.table("gold.fact_daily_sales_inc")
       .orderBy("order_date")
)
# Optional: in the output, click Chart → Line (X=order_date, Y=revenue)

order_date,orders_cnt,items_cnt,revenue,distinct_customers,aov
2024-04-24,1,3,3448.2200000000003,1,3448.22
2024-04-28,1,2,1995.41,1,1995.41
2024-07-16,1,1,772.24,1,772.24
2024-09-07,1,2,1004.54,1,1004.54
2024-12-07,1,4,882.85,1,882.85
2025-03-22,1,1,173.42,1,173.42
2025-03-29,2,8,1045.44,2,522.72
2025-04-26,1,4,3136.88,1,3136.88


## Step 8 – Data Quality (DQ) Checks  

**What this does:**  
We run simple **data quality checks** on the pipeline outputs to confirm that Silver and Gold tables are consistent and reliable.  

**How it works:**  
- **Duplicate customers** → check if any `customer_id` appears more than once in `silver_customers_cdc`.  
- **Missing products** → check if any fact rows do not match a product in `silver_products_scd`.  
- **Bad quantities** → check if any fact rows have `quantity <= 0`.  

**Expected results:**  
- Duplicate customers should be `0` (CDC logic keeps only one row per customer).  
- Missing products should normally be `0` (every fact row should match to a valid product).  
- Bad quantities should be `0` (we filtered invalid rows in Silver).

**Why this matters:**  
By comparing expected vs observed results, we can quickly spot mismatches and highlight potential data alignment or quality issues that need attention in real-world pipelines. 


In [0]:
# STEP 8: DQ checks (quick)
spark.sql("SELECT COUNT(*) AS dup_customers FROM (SELECT customer_id FROM silver_customers_cdc GROUP BY customer_id HAVING COUNT(*)>1)").show()
spark.sql("SELECT COUNT(*) AS missing_products FROM gold.fact_order_item_pti f LEFT JOIN silver_products_scd p ON f.product_id=p.product_id WHERE p.product_id IS NULL").show()
spark.sql("SELECT COUNT(*) AS bad_qty FROM gold.fact_order_item_pti WHERE quantity <= 0").show()

+-------------+
|dup_customers|
+-------------+
|            0|
+-------------+

+----------------+
|missing_products|
+----------------+
|              15|
+----------------+

+-------+
|bad_qty|
+-------+
|      0|
+-------+



**Observed results:**  
- `dup_customers = 0` ✅  
- `missing_products = 15` ⚠️ → 15 fact rows did not join to a product in `silver_products_scd`. This indicates orders referencing product IDs that do not have a valid version at the order date. In practice, such rows would be flagged for review or handled with a default product.  
- `bad_qty = 0` ✅  
 

## Step 9 – Delta Lake History  

**What this does:**  
We use Delta Lake’s built-in **time travel and audit history** feature to review all operations on a table.  

**How it works:**  
`DESCRIBE HISTORY silver_products_scd` shows a log of changes including:  
- Operation type (e.g., `MERGE`, `INSERT`, `UPDATE`).  
- User, timestamp, and environment details.  
- Version numbers, which can be queried directly with `VERSION AS OF` or `TIMESTAMP AS OF`.  

**Expected results:**  
A table of history entries for `silver_products_scd`. At minimum, we see the initial inserts (Day 1 load). If additional runs are executed with updates, `MERGE` operations appear in the log.  

**Why this matters:**  
Delta’s history provides an **audit trail** and supports **time travel queries**, making debugging and reproducibility easier in production pipelines.  

In [0]:
%sql
-- STEP 9: Delta history (run as SQL cell or wrap with spark.sql)
DESCRIBE HISTORY silver_products_scd;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2025-09-29T07:13:50.000Z,71920865700348,nagpalsahil0205@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,List(2782975292225066),0929-063326-qb7q466z-v2n,3.0,WriteSerializable,False,"Map(numFiles -> 0, numOutputRows -> 0, numOutputBytes -> 0)",,Databricks-Runtime/17.1.x-aarch64-photon-scala2.13
3,2025-09-29T07:13:48.000Z,71920865700348,nagpalsahil0205@gmail.com,MERGE,"Map(predicate -> [""((product_id#13162 = product_id#13116) AND is_current#13168)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [])",,List(2782975292225066),0929-063326-qb7q466z-v2n,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 1306, materializeSourceTimeMs -> 881, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 349, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 0, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 0)",,Databricks-Runtime/17.1.x-aarch64-photon-scala2.13
2,2025-09-29T06:59:08.000Z,71920865700348,nagpalsahil0205@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2782975292225066),0929-063326-qb7q466z-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 2008)",,Databricks-Runtime/17.1.x-aarch64-photon-scala2.13
1,2025-09-29T06:59:06.000Z,71920865700348,nagpalsahil0205@gmail.com,MERGE,"Map(predicate -> [""((product_id#12607 = product_id#12561) AND is_current#12613)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [])",,List(2782975292225066),0929-063326-qb7q466z-v2n,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 957, materializeSourceTimeMs -> 291, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 585, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 0, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 0)",,Databricks-Runtime/17.1.x-aarch64-photon-scala2.13
0,2025-09-29T06:59:03.000Z,71920865700348,nagpalsahil0205@gmail.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(2782975292225066),0929-063326-qb7q466z-v2n,,WriteSerializable,True,Map(),,Databricks-Runtime/17.1.x-aarch64-photon-scala2.13


## Step 10 (Optional) – Simulate Day 2 Incremental Data  

**Objective:**  
This step demonstrates how a **second daily feed** (Day 2) introduces both **changes** and **new records**, which will trigger CDC and SCD logic when we re-run Steps 3–7. It shows how the pipeline handles evolving data over time.  

**How it works:**  
- **Customers**:  
  - `customer_id=2` → region changed from `SA` → `VIC` (CDC Type-1 update).  
  - `customer_id=4` → brand new customer.  
- **Products**:  
  - `product_id=2` → price updated from `141.14` → `149.99` (SCD2 new version).  
  - `product_id=4` → brand new product.  
- **Orders and Order Items**:  
  - Two new orders (`1003`, `1004`) and their items are added, referencing the changed/new customers and products.  
- These are written to the Bronze `_inc` tables, overwriting the previous Day 1 increment.  

**Next steps:**  
To apply Day 2, we re-run **Steps 3–7**:  
- Step 3 → merge customers (CDC).  
- Step 4 → merge products (SCD2).  
- Step 5 → append orders and items.  
- Step 6 → rebuild fact with point-in-time product join.  
- Step 7 → update daily aggregates.  

**Expected outcome:**  
After re-running, we should see:  
- `silver_customers_cdc` updated so `customer_id=2` region = VIC, and `customer_id=4` inserted.  
- `silver_products_scd` with a closed version of product 2 (old price) and a new row (new price), plus `product_id=4` inserted as current.  
- `gold.fact_order_item_pti` and `gold.fact_daily_sales_inc` reflecting the new Day 2 orders.  

In [0]:
# STEP 10: Simulate Day 2 (changes + new data)
cust_day2 = spark.createDataFrame([
    (2, "Oliver Martin", "VIC", "2022-06-13"),  # region changed SA -> VIC
    (4, "Zoe Kumar",     "QLD", "2024-01-29"),  # new customer
], ["customer_id","customer_name","region","signup_date"])

prod_day2 = spark.createDataFrame([
    (2, "Sports Item 2", "SPORTS", 149.99),     # price changed 141.14 -> 149.99
    (4, "Electronics 4", "ELECTRONICS", 339.97) # new product
], ["product_id","product_name","category","list_price"])

orders_day2 = spark.createDataFrame([
    (1003, 2, "2025-03-30", "SHIPPED"),
    (1004, 4, "2025-03-30", "SHIPPED"),
], ["order_id","customer_id","order_date","status"])

items_day2 = spark.createDataFrame([
    (3,1003,2,1,149.99),   # uses new product price
    (4,1004,4,1,339.97),   # brand new product
], ["order_item_id","order_id","product_id","quantity","unit_price"])

# Overwrite Bronze incrementals with Day 2
cust_day2.write.mode("overwrite").format("delta").saveAsTable("bronze_customers_inc")
prod_day2.write.mode("overwrite").format("delta").saveAsTable("bronze_products_inc")
orders_day2.write.mode("overwrite").format("delta").saveAsTable("bronze_orders_inc")
items_day2.write.mode("overwrite").format("delta").saveAsTable("bronze_order_items_inc")

## Conclusion & Learnings  

In this project, we extended the basic medallion architecture from Project 1 into a more **realistic incremental pipeline** using Databricks, PySpark, and Delta Lake.  

**Key learnings and skills demonstrated:**  
- **Incremental ingestion**: simulated daily feeds with Bronze `_inc` tables.  
- **CDC (Type-1)**: kept customers up-to-date by overwriting changed attributes.  
- **SCD Type-2**: tracked product attribute changes over time with `valid_from`, `valid_to`, and `is_current`.  
- **Point-in-time joins**: ensured orders were linked to the correct product version valid on the order date.  
- **Incremental aggregation with MERGE**: refreshed daily KPIs without rebuilding the whole dataset.  
- **Data quality checks**: verified duplicates, missing joins, and invalid values.  
- **Delta Lake features**: used schema evolution and history for auditability and reproducibility.  

This showcases the transition from **analyst-style batch cleaning** (Project 1) to **data engineering skills** such as incremental ETL, change data capture, and dimension versioning.  

**Outcome:**  
We now have a complete pipeline that ingests daily e-commerce data, applies CDC/SCD logic, and produces Gold-level fact and aggregate tables ready for analytics.  