
<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Professional/main/Includes/images/customers.png" width="60%">
</div>

In [0]:
%run ../Includes/Copy-Datasets

Data catalog: workspace
Schema: bookstore_eng_pro


In [0]:
from pyspark.sql import functions as F

schema = "customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING, country_code STRING, row_status STRING, row_time timestamp"

customers_df = (spark.table("bronze")
                 .filter("topic = 'customers'")
                 .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                 .select("v.*")
                 .filter(F.col("row_status").isin(["insert", "update"])))

display(customers_df)

customer_id,email,first_name,last_name,gender,street,city,country_code,row_status,row_time
C00335,scoulmanpn@topsy.com,Silvanus,Coulman,Agender,4319 Prairieview Crossing,Hengdian,CN,insert,2022-01-31T23:06:00.000Z
C00327,scoskerryei@marketwatch.com,Sula,Coskerry,Female,7027 Karstens Avenue,Nanqiao,CN,update,2022-01-31T15:14:00.000Z
C00315,cconellyff@amazonaws.com,Cahra,Conelly,Female,104 East Street,Klakeh,ID,update,2022-01-31T03:30:00.000Z
C00334,scoughand5@dion.ne.jp,Salvatore,Coughan,Male,452 Westport Way,Hexi,CN,insert,2022-01-30T22:05:00.000Z
C00333,,Etheline,Cottey,Female,,Jambesari,ID,insert,2022-01-30T21:06:00.000Z
C00332,acottage5i@google.ca,Albertine,Cottage,Genderqueer,82 Morningstar Hill,Diffa,NE,insert,2022-01-30T20:07:00.000Z
C00282,,Josias,Claffey,Polygender,42 La Follette Junction,Nidek,PL,update,2022-01-30T20:02:00.000Z
C00331,tcosson59@etsy.com,Trixi,Cosson,Female,2 Surrey Court,Hilversum,NL,insert,2022-01-30T19:10:00.000Z
C00330,,Donalt,Cosslett,Male,,Kalajoki,FI,insert,2022-01-30T18:07:00.000Z
C00288,mcleminshawaq@oaic.gov.au,Melli,Cleminshaw,Female,16223 Golf Park,Usatove,UA,update,2022-01-30T01:41:00.000Z


In [0]:
from pyspark.sql.window import Window

window = Window.partitionBy("customer_id").orderBy(F.col("row_time").desc())

ranked_df = (customers_df.withColumn("rank", F.rank().over(window))
                          .filter("rank == 1")
                          .drop("rank"))
display(ranked_df)

customer_id,email,first_name,last_name,gender,street,city,country_code,row_status,row_time
C00001,dabby2y@japanpost.jp,Dniren,Abby,Female,768 Mesta Terrace,Annecy,FR,insert,2021-11-12T01:09:00.000Z
C00002,eabbysc1@github.com,Etti,Abbys,Female,1748 Vidon Plaza,Varge Mondar,PT,insert,2021-11-12T02:10:00.000Z
C00003,rabelovd1@wikispaces.com,Ronnie,Abelov,Male,366 Randy Park,San Celestio,PH,update,2021-11-26T05:15:00.000Z
C00004,rabels9g@behance.net,Ray,Abels,Female,613 Lyons Way,Oudtshoorn,ZA,insert,2021-11-12T04:05:00.000Z
C00005,sabendrothin@cargocollective.com,Shanon,Abendroth,Female,30292 Manufacturers Junction,Ani-e,PH,insert,2021-11-13T05:05:00.000Z
C00006,norman.abernethy@gmail.com,Norman,Abernethy,Male,9294 Oxford Center,Gibara,CU,update,2021-11-25T08:02:00.000Z
C00007,sabrahmson3h@blinklist.com,Skell,Abrahmson,Male,90941 Hallows Park,Huarong Chengguanzhen,CN,insert,2021-11-13T07:02:00.000Z
C00008,dacheson2h@mapy.cz,Darsey,Acheson,Non-binary,29579 Grim Plaza,Dārayyā,SY,insert,2021-11-13T08:10:00.000Z
C00009,fackwoodji@gravatar.com,Fredrick,Ackwood,Male,67 Dunning Plaza,Santo Domingo,CU,update,2021-11-17T09:37:00.000Z
C00010,doralynne.adamkiewicz@gmail.com,Doralynne,Adamkiewicz,Female,84126 Glendale Center,Ugep,NG,insert,2021-11-14T10:09:00.000Z


In [0]:
# This will throw an exception because non-time-based window operations are not supported on streaming DataFrames.
ranked_df = (spark.readStream
                   .table("bronze")
                   .filter("topic = 'customers'")
                   .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                   .select("v.*")
                   .filter(F.col("row_status").isin(["insert", "update"]))
                   .withColumn("rank", F.rank().over(window))
                   .filter("rank == 1")
                   .drop("rank")
             )

(ranked_df.writeStream
            .option("checkpointLocation", f"{bookstore.checkpoint_path}/ranked")
            .trigger(availableNow=True)
            .format("console")
            .start()
)


# Cell 5 attempts to apply a window function (ranking) directly on a streaming DataFrame, which is not supported in Spark Structured Streaming. The code reads customer events from the bronze table as a stream, parses the JSON payload, filters for inserts and updates, and tries to rank each customer’s records by timestamp to keep only the latest version.

# However, the line with F.rank().over(window) will cause an exception because Spark does not allow non-time-based window operations (like ranking or partitioning) on streaming DataFrames. The code then tries to write the ranked results to the console using writeStream, but this will fail due to the unsupported window operation. This cell illustrates a common pitfall when working with streaming data: complex window functions must be handled differently, typically using foreachBatch or other batch-oriented logic.


# Spark Structured Streaming does not support non-time-based window operations (such as ranking, partitioning, or row_number) on streaming DataFrames because these operations require a complete view of all data within each partition to compute correct results. In streaming, data arrives continuously and is unbounded, so Spark cannot guarantee it has seen all records for a given partition (e.g., all events for a customer) at any point in time.

# Time-based window operations (like tumbling or sliding windows) work because they group data by event time, allowing Spark to process and finalize results for each window as data arrives. In contrast, ranking or partitioning without a time boundary would require Spark to wait indefinitely for more data, making it impossible to produce consistent, incremental results in a streaming context. This limitation ensures that streaming queries remain scalable and can produce timely, deterministic outputs.



[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-6119484106182275>, line 17[0m
[1;32m      1[0m [38;5;66;03m# This will throw an exception because non-time-based window operations are not supported on streaming DataFrames.[39;00m
[1;32m      2[0m ranked_df [38;5;241m=[39m (spark[38;5;241m.[39mreadStream
[1;32m      3[0m                    [38;5;241m.[39mtable([38;5;124m"[39m[38;5;124mbronze[39m[38;5;124m"[39m)
[1;32m      4[0m                    [38;5;241m.[39mfilter([38;5;124m"[39m[38;5;124mtopic = [39m[38;5;124m'[39m[38;5;124mcustomers[39m[38;5;124m'[39m[38;5;124m"[39m)
[0;32m   (...)[0m
[1;32m     10[0m                    [38;5;241m.[39mdrop([38;5;124m"[39m[38;5;124mrank[39m[38;5;124m"[39m)
[1;32m     11[0m              )
[1;32m     13[0m (ranked_df[38;5;241m.[39mwriteStream
[1;32m   

In [0]:
from pyspark.sql.window import Window

def batch_upsert(microBatchDF, batchId):
    window = Window.partitionBy("customer_id").orderBy(F.col("row_time").desc())
    
    (microBatchDF.filter(F.col("row_status").isin(["insert", "update"]))
                 .withColumn("rank", F.rank().over(window))
                 .filter("rank == 1")
                 .drop("rank")
                 .createOrReplaceTempView("ranked_updates"))
    
    query = """
        MERGE INTO customers_silver c
        USING ranked_updates r
        ON c.customer_id=r.customer_id
            WHEN MATCHED AND c.row_time < r.row_time
              THEN UPDATE SET *
            WHEN NOT MATCHED
              THEN INSERT *
    """
    
    microBatchDF.sparkSession.sql(query)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS customers_silver
(customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING, country STRING, row_time TIMESTAMP)

In [0]:
df_country_lookup = spark.read.json(f"{bookstore.dataset_path}/country_lookup")
display(df_country_lookup)

calling_code,code,country
376,AD,Andorra
971,AE,United Arab Emirates
93,AF,Afghanistan
+1-268,AG,Antigua and Barbuda
+1-264,AI,Anguilla
355,AL,Albania
374,AM,Armenia
244,AO,Angola
,AQ,Antarctica
54,AR,Argentina


In [0]:
query = (spark.readStream
                  .table("bronze")
                  .filter("topic = 'customers'")
                  .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                  .select("v.*")
                  .join(F.broadcast(df_country_lookup), F.col("country_code") == F.col("code") , "inner")
               .writeStream
                  .foreachBatch(batch_upsert)
                  .option("checkpointLocation", f"{bookstore.checkpoint_path}/customers_silver")
                  .trigger(availableNow=True)
                  .start()
          )

query.awaitTermination()

Here's a line-by-line explanation of the highlighted streaming query:

**Line 1:**  `query = (spark.readStream`

-   Initiates a streaming read operation using Spark's Structured Streaming API
-   Returns a  `DataStreamReader`  that will continuously process incoming data

**Line 2:**  `.table("bronze")`

-   Reads from the  `bronze`  Delta table as a streaming source
-   This table contains raw event data that needs to be processed

**Line 3:**  `.filter("topic = 'customers'")`

-   Filters the stream to only process records where the  `topic`  column equals 'customers'
-   Isolates customer-related events from potentially multiple topics in the bronze table

**Line 4:**  `.select(F.from_json(F.col("value").cast("string"), schema).alias("v"))`

-   Casts the  `value`  column to string (it's likely stored as binary)
-   Parses the JSON string using  `from_json()`  with the predefined  `schema`
-   Aliases the parsed JSON structure as "v" for easier reference

**Line 5:**  `.select("v.*")`

-   Expands all fields from the nested "v" structure into top-level columns
-   After this, you have columns like  `customer_id`,  `email`,  `first_name`, etc. instead of a nested structure

**Line 6:**  `.join(F.broadcast(df_country_lookup), F.col("country_code") == F.col("code") , "inner")`

-   Performs an inner join with the  `df_country_lookup`  static DataFrame
-   Uses  `broadcast()`  to optimize the join by sending the small lookup table to all executors
-   Joins on  `country_code`  from the stream matching  `code`  from the lookup table
-   This enriches customer records with full country names

**Line 7:**  `.writeStream`

-   Transitions from read operations to write operations for the streaming query

**Line 8:**  `.foreachBatch(batch_upsert)`

-   Processes each micro-batch using the custom  `batch_upsert`  function
-   This allows complex operations (like ranking and MERGE) that aren't supported in streaming mode
-   Each micro-batch is processed as a regular DataFrame in the function

**Line 9:**  `.option("checkpointLocation", f"{bookstore.checkpoint_path}/customers_silver")`

-   Sets the checkpoint directory for fault tolerance and exactly-once processing
-   Spark stores metadata here to track progress and enable recovery after failures

**Line 10:**  `.trigger(availableNow=True)`

-   Processes all available data in a single micro-batch, then stops
-   Similar to batch processing but uses the streaming API infrastructure

**Line 11:**  `.start()`

-   Starts the streaming query execution
-   Returns a  `StreamingQuery`  object that can be monitored and controlled

**Line 13:**  `query.awaitTermination()`

-   Blocks execution until the streaming query completes or fails
-   Essential for  `availableNow=True`  mode to ensure all data is processed before continuing

This pattern combines streaming ingestion with complex batch processing (via  `foreachBatch`) to implement Change Data Capture with deduplication and upserts into the silver layer.

In [0]:
count = spark.table("customers_silver").count()
expected_count = spark.table("customers_silver").select("customer_id").distinct().count()

assert count == expected_count, "Unit test failed"
print("Unit test passed")

# Notebook Summary: Change Data Capture (CDC) with Spark Structured Streaming

## Overview
This notebook demonstrates implementing Change Data Capture (CDC) in a medallion architecture using Spark Structured Streaming. It processes customer event data from a bronze layer, applies deduplication and enrichment, and loads the results into a silver layer table using upsert operations.

## Detailed Workflow

### 1. Setup and Data Preparation (Cell 2)
```python
%run ../Includes/Copy-Datasets
```
- Executes an external setup notebook to initialize the environment
- Creates the `bookstore` object with properties like `dataset_path` and `checkpoint_path`
- Populates the bronze table with raw customer event data

---

### 2. Batch Processing - Proof of Concept (Cell 3)
```python
from pyspark.sql import functions as F

schema = "customer_id STRING, email STRING, first_name STRING, last_name STRING, 
          gender STRING, street STRING, city STRING, country_code STRING, 
          row_status STRING, row_time timestamp"

customers_df = (spark.table("bronze")
                 .filter("topic = 'customers'")
                 .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                 .select("v.*")
                 .filter(F.col("row_status").isin(["insert", "update"])))
```
**Code Breakdown:**
- **Import F**: Imports PySpark SQL functions for data transformations
- **Schema Definition**: Defines the structure of customer JSON data with 10 fields including metadata fields (`row_status`, `row_time`)
- **Read Bronze Table**: Loads the bronze Delta table containing raw event data
- **Filter Topic**: Isolates customer events from potentially multiple event types (could be orders, products, etc.)
- **Parse JSON**: Uses `from_json()` to convert the string value column into structured columns based on the schema
- **Flatten Structure**: `select("v.*")` expands the nested "v" structure into top-level columns
- **Filter Operations**: Keeps only insert and update operations, excluding deletes

**Why This Matters:** This establishes the pattern for extracting and parsing customer events before implementing streaming logic.

---

### 3. Deduplication Logic - Batch Version (Cell 4)
```python
from pyspark.sql.window import Window

window = Window.partitionBy("customer_id").orderBy(F.col("row_time").desc())

ranked_df = (customers_df.withColumn("rank", F.rank().over(window))
                          .filter("rank == 1")
                          .drop("rank"))
```
**Code Breakdown:**
- **Window Specification**: Partitions data by `customer_id` and orders by `row_time` descending (newest first)
- **Ranking**: Assigns a rank to each row within its partition (rank 1 = most recent record for each customer)
- **Filter Rank 1**: Keeps only the latest record for each customer
- **Drop Rank Column**: Removes the temporary ranking column

**Why This Matters:** When a customer updates their profile multiple times, we only want the most recent version. This deduplication logic ensures data quality.

---

### 4. Streaming Limitations Demonstration (Cell 5)
```python
# This will throw an exception
ranked_df = (spark.readStream
                   .table("bronze")
                   .filter("topic = 'customers'")
                   .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                   .select("v.*")
                   .filter(F.col("row_status").isin(["insert", "update"]))
                   .withColumn("rank", F.rank().over(window))  # ❌ NOT SUPPORTED
                   .filter("rank == 1")
                   .drop("rank")
             )
```
**Why It Fails:**
- Spark Structured Streaming cannot apply ranking operations directly on streaming DataFrames
- Ranking requires seeing ALL records in a partition to assign correct ranks
- In streaming, data is unbounded and continuously arriving, so Spark can't guarantee it has seen all customer records
- Only time-based windows (tumbling, sliding) are supported in streaming mode

**Key Learning:** Complex window operations like ranking, row_number, and dense_rank require the `foreachBatch` pattern.

---

### 5. Solution: foreachBatch Pattern (Cell 6)
```python
def batch_upsert(microBatchDF, batchId):
    window = Window.partitionBy("customer_id").orderBy(F.col("row_time").desc())
    
    # Deduplicate within the micro-batch
    (microBatchDF.filter(F.col("row_status").isin(["insert", "update"]))
                 .withColumn("rank", F.rank().over(window))
                 .filter("rank == 1")
                 .drop("rank")
                 .createOrReplaceTempView("ranked_updates"))
    
    # Perform upsert using MERGE
    query = """
        MERGE INTO customers_silver c
        USING ranked_updates r
        ON c.customer_id=r.customer_id
            WHEN MATCHED AND c.row_time < r.row_time
              THEN UPDATE SET *
            WHEN NOT MATCHED
              THEN INSERT *
    """
    
    microBatchDF.sparkSession.sql(query)
```
**Code Breakdown:**
- **Function Signature**: `microBatchDF` contains one batch of streaming data; `batchId` is a unique batch identifier
- **Window Function**: Same deduplication logic from Cell 4, now applied to each micro-batch
- **Temp View Creation**: Creates a temporary view of deduplicated records for use in SQL
- **MERGE Statement**:
  - **ON Clause**: Matches records by `customer_id`
  - **WHEN MATCHED**: Updates existing customer if the new record is newer (`c.row_time < r.row_time`)
  - **WHEN NOT MATCHED**: Inserts new customers
  - **UPDATE SET ***: Updates all columns with values from the source

**Why This Works:** Each micro-batch is processed as a regular DataFrame, allowing complex operations. The MERGE ensures we don't create duplicates and only update when data is fresher.

---

### 6. Target Table Creation (Cell 7)
```python
CREATE TABLE IF NOT EXISTS customers_silver
(customer_id STRING, email STRING, first_name STRING, last_name STRING, 
 gender STRING, street STRING, city STRING, country STRING, row_time TIMESTAMP)
```
**Code Breakdown:**
- Creates a Delta table with the target schema
- Note: `country` column (not `country_code`) - this will be enriched with full country names
- `IF NOT EXISTS` ensures idempotency - safe to run multiple times

---

### 7. Data Enrichment Setup (Cell 8)
```python
df_country_lookup = spark.read.json(f"{bookstore.dataset_path}/country_lookup")
```
**Code Breakdown:**
- Loads a static lookup table mapping country codes to country names
- This is a small dimension table loaded into memory
- Will be used in a broadcast join for efficient enrichment

---

### 8. Complete Streaming Pipeline (Cell 9)
```python
query = (spark.readStream
                  .table("bronze")
                  .filter("topic = 'customers'")
                  .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                  .select("v.*")
                  .join(F.broadcast(df_country_lookup), 
                        F.col("country_code") == F.col("code"), "inner")
               .writeStream
                  .foreachBatch(batch_upsert)
                  .option("checkpointLocation", f"{bookstore.checkpoint_path}/customers_silver")
                  .trigger(availableNow=True)
                  .start()
          )

query.awaitTermination()
```
**Code Breakdown:**

**Read Phase:**
1. `spark.readStream.table("bronze")`: Reads bronze table as a stream
2. `.filter("topic = 'customers'")`: Isolates customer events
3. `.select(F.from_json(...))`: Parses JSON payload using the schema
4. `.select("v.*")`: Flattens nested structure
5. `.join(F.broadcast(df_country_lookup), ...)`: Enriches with country names
   - `F.broadcast()`: Sends the small lookup table to all executors for efficient joining
   - Inner join: Only keeps records with valid country codes

**Write Phase:**
6. `.writeStream`: Initiates streaming write operation
7. `.foreachBatch(batch_upsert)`: Applies custom upsert logic to each micro-batch
8. `.option("checkpointLocation", ...)`: Enables fault tolerance
   - Stores offset tracking and state information
   - Allows recovery after failures
   - Ensures exactly-once processing semantics
9. `.trigger(availableNow=True)`: Processes all available data then stops (one-time processing mode)
10. `.start()`: Starts the streaming query
11. `query.awaitTermination()`: Blocks until query completes (necessary for `availableNow=True`)

**Data Flow:** Bronze (raw JSON) → Parse → Filter → Enrich → Deduplicate → Upsert → Silver (clean, current state)

---

### 9. Validation (Cell 11)
```python
count = spark.table("customers_silver").count()
expected_count = spark.table("customers_silver").select("customer_id").distinct().count()

assert count == expected_count, "Unit test failed"
```
**Code Breakdown:**
- **count**: Total number of rows in the silver table
- **expected_count**: Number of distinct customers
- **Assertion**: Verifies that total rows = distinct customers (no duplicates)
- If duplicates exist, the assertion fails, indicating a problem with the CDC logic

**Why This Matters:** Validates that our deduplication and upsert logic is working correctly - each customer should appear exactly once.

---

## Key Technical Concepts

### 1. Change Data Capture (CDC)
CDC tracks changes (inserts, updates, deletes) in source systems. This notebook handles:
- **Inserts**: New customer records
- **Updates**: Modified customer information
- **Deduplication**: Keeping only the latest version when multiple updates exist

### 2. foreachBatch Pattern
Bridges the gap between streaming and batch operations:
- Streaming engine handles data ingestion and micro-batching
- Each micro-batch processed as a regular DataFrame with full DataFrame API support
- Enables complex operations (ranking, MERGE statements) not supported in streaming mode

### 3. Broadcast Join
Optimization technique for joining large streaming data with small static data:
- Small lookup table copied to all executors
- Eliminates shuffle operations
- Significantly improves performance

### 4. MERGE Statement (Upsert)
SQL operation that combines UPDATE and INSERT:
- Updates existing records if they match
- Inserts new records if they don't match
- Atomic operation ensuring data consistency

### 5. Exactly-Once Semantics
Checkpoint location enables:
- Offset tracking (which records have been processed)
- State management (maintaining streaming state)
- Idempotent writes (same data processed multiple times produces same result)
- Failure recovery (resume from last committed offset)

### 6. Medallion Architecture
Multi-layered approach to data organization:
- **Bronze**: Raw, unprocessed data (as-is from source)
- **Silver**: Cleaned, validated, enriched data (this notebook's output)
- **Gold**: Business-level aggregates and analytics-ready datasets

---

## Performance Considerations

1. **Broadcast Join**: Used for small lookup tables (<10MB) to avoid expensive shuffles
2. **Partitioning**: Window operations partition by `customer_id` for parallel processing
3. **availableNow Trigger**: Processes all data in one go, useful for batch-like streaming
4. **MERGE Optimization**: Delta Lake optimizes MERGE operations with file pruning and statistics

---

## Common Pitfalls Avoided

1. ❌ **Don't use ranking directly on streams** → ✅ Use foreachBatch
2. ❌ **Don't forget checkpointing** → ✅ Always specify checkpoint location
3. ❌ **Don't ignore row_time for updates** → ✅ Only update if new data is fresher
4. ❌ **Don't skip deduplication** → ✅ Rank and filter to keep latest version