# Chapter 11 — GlobalMart: Apache Hudi CDC Pipeline (Hudi section only)

This notebook contains the **ready-to-run code** for the GlobalMart Hudi CDC pipeline described in Chapter 11.

It focuses on:
- Parsing **Debezium** change events (`before`/`after`/`op`)
- Transforming events into a **Hudi-friendly** shape
- Applying **upserts + deletes** using Hudi’s supported delete marker: **`_hoodie_is_deleted`**
- Writing to a **Merge-on-Read** Hudi table


## 0) Prerequisites (runtime)

You need a Spark runtime that includes the Hudi Spark bundle compatible with your Spark version.

Typical options:
- Run this notebook **inside the chapter’s Docker environment** (recommended).
- Or configure Spark with the Hudi bundle (e.g., `--packages org.apache.hudi:hudi-spark3.4-bundle_2.12:<HUDI_VERSION>`), depending on your Spark/Hudi versions.

> Note: The code below is written to be **copy/paste friendly** into your Chapter 11 repo under `ch11/` and used as the canonical reference for the Hudi section.


### Choose your path
- **Quick demo (no Kafka needed):** run the seed/batch cells below, then snapshot/changelog.
- **Full CDC (Debezium→Kafka→Spark):** run the streaming cell after starting the stack, optionally pump MySQL changes via the helper script, then rerun snapshot/changelog.

In [1]:
# --- Imports used throughout the notebook ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, from_json, current_timestamp, lit, when, coalesce
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, LongType
)


## 1) Create / reuse SparkSession

If you are running inside a pre-built image, you may already have a SparkSession (`spark`) available.
If not, create one here. Adjust configs to match your environment.


In [2]:
# Create SparkSession only if not already present (common in managed notebook runtimes)
try:
    spark
except NameError:
    spark = (
        SparkSession.builder
            .appName("Ch11-GlobalMart-Hudi-CDC")
            # If your runtime requires Hudi SQL extensions, uncomment and tune these:
            # .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
            # .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
            .getOrCreate()
    )

spark.sparkContext.setLogLevel("WARN")


## 2) Define the Debezium CDC schema + Hudi table schema/options

This section addresses the review comment:
- **`cdc_schema` must be explicitly defined** (otherwise `from_json(...)` fails)


In [3]:
# --- Debezium envelope schema (simplified) ---
# Debezium emits an envelope with before/after/op/ts_ms.
# For deletes: `after` is null and `before` contains the deleted row.

row_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("description", StringType(), True),
    StructField("updated_at", TimestampType(), True)
])

cdc_schema = StructType([
    StructField("before", row_schema, True),
    StructField("after", row_schema, True),
    StructField("op", StringType(), True),
    StructField("ts_ms", LongType(), True)
])

# --- Hudi table options ---
# This matches the Chapter 11 narrative: MERGE_ON_READ + precombine field + partitioning.
hudi_options = {
    "hoodie.table.name": "products_hudi",
    # Some Hudi/Spark configurations also expect this explicit table-name key:
    "hoodie.datasource.write.table.name": "products_hudi",

    "hoodie.datasource.write.recordkey.field": "product_id",
    "hoodie.datasource.write.precombine.field": "updated_at",
    "hoodie.datasource.write.partitionpath.field": "category",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",

    "hoodie.bulkinsert.shuffle.parallelism": "4",
    "hoodie.upsert.shuffle.parallelism": "4",
    "hoodie.datasource.write.operation": "upsert",

    "hoodie.compact.async.enable": "true",
    "hoodie.clustering.async.enabled": "true"
}


## Quick demo (no Kafka required)
Run these to populate Hudi locally and validate immediately.

### Seed if empty

In [4]:
from pyspark.sql.functions import current_timestamp
# Ensure paths are defined even if env vars missing
HUDI_BASE_PATH = globals().get('HUDI_BASE_PATH', '/data/hudi/products_hudi')
CHECKPOINT_PATH = globals().get('CHECKPOINT_PATH', '/data/checkpoints/products')


def seed_demo_if_empty():
    try:
        existing = spark.read.format('hudi').load(HUDI_BASE_PATH)
        if existing.limit(1).count() > 0:
            print('Hudi table already has data; skipping seed.')
            return
    except Exception:
        pass

    seed_rows = [
        (301, 'Seed Widget', 'Tools', 40.0, 'seed v1', False),
        (302, 'Seed Gadget', 'Electronics', 99.0, 'seed v1', False),
        (301, 'Seed Widget', 'Tools', None, 'tombstone', True),  # delete to show tombstone
    ]
    seed_df = spark.createDataFrame(seed_rows, ['product_id','name','category','price','description','_hoodie_is_deleted'])
    seed_df = seed_df.withColumn('updated_at', current_timestamp())
    (seed_df
        .write
        .format('hudi')
        .options(**hudi_options)
        .mode('append')
        .save(HUDI_BASE_PATH)
    )
    print('Seeded demo rows into', HUDI_BASE_PATH)

seed_demo_if_empty()


Hudi table already has data; skipping seed.


### Optional batch demo (insert→update→delete)

In [5]:
# Write a small batch of CDC-like events directly to Hudi (for demo visibility)
from pyspark.sql.functions import current_timestamp
batch_events = [
    (201, 'Batch Widget', 'Tools', 30.0, 'batch v1', False),
    (201, 'Batch Widget', 'Tools', 32.5, 'batch v2', False),
    (201, 'Batch Widget', 'Tools', None, 'batch tombstone', True),  # delete
]
batch_df = spark.createDataFrame(batch_events, ['product_id','name','category','price','description','_hoodie_is_deleted'])
batch_df = batch_df.withColumn('updated_at', current_timestamp())
(batch_df
    .write
    .format('hudi')
    .options(**hudi_options)
    .mode('append')
    .save(HUDI_BASE_PATH)
)
print('Wrote demo batch rows to Hudi at', HUDI_BASE_PATH)


Wrote demo batch rows to Hudi at /data/hudi/products_hudi


### Validate snapshot (latest state)

In [6]:
# Snapshot read: latest state (delete leaves table empty)
snapshot_df = (
    spark.read.format('hudi')
    .load(HUDI_BASE_PATH)
    .orderBy('product_id', '_hoodie_commit_time')
)
snapshot_df.show(50, truncate=False)


+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+----------+-----------+-----------+-----+-----------+------------------+--------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                        |product_id|name       |category   |price|description|_hoodie_is_deleted|updated_at                |
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+----------+-----------+-----------+-----+-----------+------------------+--------------------------+
|20251212194018880  |20251212194018880_1_0|302               |Electronics           |49249cc2-d50b-4a83-af7f-11c89b9af9ac-0_1-54-125_20251212194018880.parquet|302       |Seed Gadget|Electronics|99.0 |seed v1    |false      

### Validate changelog (history with deletes)

In [7]:
# Changelog read: see every upsert/delete that arrived (including _hoodie_is_deleted)
changelog_df = (
    spark.read.format('hudi')
    .option('hoodie.datasource.query.type', 'incremental')
    .option('hoodie.datasource.read.begin.instanttime', '000')
    .load(HUDI_BASE_PATH)
    .orderBy('_hoodie_commit_time', 'product_id')
)
changelog_df.select('product_id','name','category','price','_hoodie_is_deleted','_hoodie_commit_time').show(50, truncate=False)


+----------+-----------+-----------+-----+------------------+-------------------+
|product_id|name       |category   |price|_hoodie_is_deleted|_hoodie_commit_time|
+----------+-----------+-----------+-----+------------------+-------------------+
|302       |Seed Gadget|Electronics|99.0 |false             |20251212194018880  |
+----------+-----------+-----------+-----+------------------+-------------------+



## Full CDC path (Debezium → Kafka → Spark Streaming → Hudi)

In [8]:
# --- Kafka source configuration (edit for your environment) ---
import os

KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "kafka:29092")
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "globalmart.retail.products")

HUDI_BASE_PATH = os.environ.get("HUDI_BASE_PATH", "/data/hudi/products_hudi")
CHECKPOINT_PATH = os.environ.get("CHECKPOINT_PATH", "/data/checkpoints/products")

# Read CDC events from Kafka
cdc_stream = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "latest")
        .load()
)

# Parse the CDC event payload
parsed_stream = (
    cdc_stream
        .select(from_json(col("value").cast("string"), cdc_schema).alias("data"))
        .select("data.*")
)

# Transform to a Hudi-ready shape.
# IMPORTANT: Hudi interprets deletes via the `_hoodie_is_deleted` flag (true for deletes, false/null otherwise).
transformed_stream = (
    parsed_stream
        .select(
            # record key must be present even for deletes (where after is null)
            coalesce(col("after.product_id"), col("before.product_id")).alias("product_id"),

            # keep latest state for upserts; for deletes, these may be null (acceptable)
            coalesce(col("after.name"), col("before.name")).alias("name"),
            coalesce(col("after.category"), col("before.category")).alias("category"),
            coalesce(col("after.price"), col("before.price")).alias("price"),
            coalesce(col("after.description"), col("before.description")).alias("description"),

            current_timestamp().alias("updated_at"),
            when(col("op") == "d", lit(True)).otherwise(lit(False)).alias("_hoodie_is_deleted")
        )
)


In [9]:
# Start the streaming write to Hudi (runs ~60s then stops)
query = (
    transformed_stream
        .writeStream
        .format("hudi")
        .options(**hudi_options)
        .outputMode("append")
        .option("path", HUDI_BASE_PATH)
        .option("checkpointLocation", CHECKPOINT_PATH)
        .trigger(processingTime="30 seconds")
        .start()
)

# Let it run for ~60 seconds to ingest CDC events, then stop.
query.awaitTermination(60)
query.stop()
print("Stopped streaming query after 60s; rerun this cell to ingest more CDC.")


Stopped streaming query after 60s; rerun this cell to ingest more CDC.


## 6) Notes for Chapter 11 readers

- If you are using Debezium with schema+payload enabled, you may need to unwrap the envelope before applying `from_json(...)`.
- For real pipelines, you may also want:
  - a dead-letter queue for malformed CDC events
  - metrics on lag and commit throughput
  - explicit Hudi write tuning (parallelism, compaction cadence, clustering strategy)


### Validate CDC output (summary)
Run after snapshot/changelog to see counts and latest commit times.

In [10]:
# Quick CDC validation summary
HUDI_BASE_PATH = globals().get('HUDI_BASE_PATH', '/data/hudi/products_hudi')
snap = spark.read.format('hudi').load(HUDI_BASE_PATH)
print('Snapshot rows:', snap.count())
snap.select('product_id','name','category','price','_hoodie_is_deleted','_hoodie_commit_time').orderBy('_hoodie_commit_time').show(20, truncate=False)

chg = (spark.read.format('hudi')
    .option('hoodie.datasource.query.type','incremental')
    .option('hoodie.datasource.read.begin.instanttime','000')
    .load(HUDI_BASE_PATH))
print('Changelog rows:', chg.count())
chg.select('product_id','name','category','price','_hoodie_is_deleted','_hoodie_commit_time').orderBy('_hoodie_commit_time').show(50, truncate=False)


Snapshot rows: 1
+----------+-----------+-----------+-----+------------------+-------------------+
|product_id|name       |category   |price|_hoodie_is_deleted|_hoodie_commit_time|
+----------+-----------+-----------+-----+------------------+-------------------+
|302       |Seed Gadget|Electronics|99.0 |false             |20251212194018880  |
+----------+-----------+-----------+-----+------------------+-------------------+

Changelog rows: 1
+----------+-----------+-----------+-----+------------------+-------------------+
|product_id|name       |category   |price|_hoodie_is_deleted|_hoodie_commit_time|
+----------+-----------+-----------+-----+------------------+-------------------+
|302       |Seed Gadget|Electronics|99.0 |false             |20251212194018880  |
+----------+-----------+-----------+-----+------------------+-------------------+

