In [1]:
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.

# Deep Dive into Hudi Table & Query Types: Snapshot, RO, Incremental, Time Travel, CDC
This notebook is your guide to mastering Hudi's advanced query capabilities. We'll explore hands-on examples of the different read modes like Snapshot, Read-Optimized, Incremental, Time Travel, and Change Data Capture to help you understand when to use each for building efficient data pipelines.

## Setting up the Environment
We begin by loading the utils.py file, which contains the necessary imports and functions to start a SparkSession.

In [32]:
%run utils.py

SparkSession started with app name: Hudi-Notebooks


Before we can start querying, we need to create our Hudi tables. Hudi offers two primary table types to choose from:

- **Copy-on-Write (COW)**
- **Merge-on-Read (MOR)**

For this deep dive, we'll create one table for each of Hudi's main storage types:

- **trips_table_cow:** Our Copy-on-Write (COW) table, which we'll use to demonstrate how Hudi rewrites files on updates.
- **trips_table_mor:** Our Merge-on-Read (MOR) table, which will help us understand how Hudi uses log files for faster updates and different read views.

After creating both tables, we'll be ready to explore all the query types.

This is the sample ride data we will use to create our Hudi table. It includes details like the timestamp, a unique ID, rider, driver, fare, and city.

In [3]:
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
    ("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
    ("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
    ("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
    ("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
    ("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
    ("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york"),
]

In [33]:
inputDF = spark.createDataFrame(data).toDF(*columns)
display(inputDF)

ts,uuid,rider,driver,fare,city
2025-08-10 08:15:30,uuid-001,rider-A,driver-X,18.5,new_york
2025-08-10 09:22:10,uuid-002,rider-B,driver-Y,22.75,san_francisco
2025-08-10 10:05:45,uuid-003,rider-C,driver-Z,14.6,chicago
2025-08-10 11:40:00,uuid-004,rider-D,driver-W,31.9,new_york
2025-08-10 12:55:15,uuid-005,rider-E,driver-V,25.1,san_francisco
2025-08-10 13:20:35,uuid-006,rider-F,driver-U,19.8,chicago
2025-08-10 14:10:05,uuid-007,rider-G,driver-T,28.45,san_francisco
2025-08-10 15:00:20,uuid-008,rider-H,driver-S,16.25,new_york
2025-08-10 15:45:50,uuid-009,rider-I,driver-R,24.35,chicago
2025-08-10 16:30:00,uuid-010,rider-J,driver-Q,20.0,new_york


### Hudi Configuration for a COW Table

In [5]:
table_name_cow = "trips_table_cow"
base_path = f"s3a://warehouse/hudi-db"

cow_hudi_conf = {
    "hoodie.table.name": table_name_cow, # The name of our Hudi table.
    "hoodie.datasource.write.recordkey.field": "uuid", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.table.cdc.enabled": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true" # This ensures partition directories are named like `city=new_york`.
}

### Inserting data in a COW Table

In [6]:
# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**cow_hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_cow}")



Now that COW table is set up, we can begin our deep dive into Hudi's powerful query types. In this section, we will discuss:

- **Snapshot Query:** The default read mode for viewing the latest state of the table.
- **Incremental Query:** A way to get only the new data added since a specific point in time.
- **Time Travel Query:** How to view the table as it existed at a past moment.
- **Change Data Capture (CDC) Query:** How to retrieve a detailed stream of changes (updates, inserts, and deletes).

## Querying COW Tables

### 1. Snapshot Query
This is the default query type when reading Hudi tables. Its goal is to give you a complete, up-to-the-minute view of your data.

Let's do a quick snapshot query to see the current state of our tables.

In [7]:
cowSnapshotQueryDF = spark.read \
        .format("hudi") \
        .load(f"{base_path}/{table_name_cow}" + "/*/*")

display(cowSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092128379,uuid-002,rider-B,driver-Y,22.75,san_francisco,2025-08-10 09:22:10
20250926092128379,uuid-005,rider-E,driver-V,25.1,san_francisco,2025-08-10 12:55:15
20250926092128379,uuid-007,rider-G,driver-T,28.45,san_francisco,2025-08-10 14:10:05
20250926092128379,uuid-001,rider-A,driver-X,18.5,new_york,2025-08-10 08:15:30
20250926092128379,uuid-004,rider-D,driver-W,31.9,new_york,2025-08-10 11:40:00
20250926092128379,uuid-008,rider-H,driver-S,16.25,new_york,2025-08-10 15:00:20
20250926092128379,uuid-010,rider-J,driver-Q,20.0,new_york,2025-08-10 16:30:00
20250926092128379,uuid-003,rider-C,driver-Z,14.6,chicago,2025-08-10 10:05:45
20250926092128379,uuid-006,rider-F,driver-U,19.8,chicago,2025-08-10 13:20:35
20250926092128379,uuid-009,rider-I,driver-R,24.35,chicago,2025-08-10 15:45:50


Now update one record in the COW table.

In [8]:
from pyspark.sql.functions import col
updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").filter(col("rider") == "rider-G").withColumn("fare", col("fare") * 10)

display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))

uuid,rider,driver,fare,city,ts
uuid-007,rider-G,driver-T,284.5,san_francisco,2025-08-10 14:10:05


In [9]:
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Again execute the snapshot query to confirm if it results the latest view of the table and YES, We can see that it fetches the updated fare.

In [10]:
cowSnapshotQueryDF = spark.read \
        .format("hudi") \
        .load(f"{base_path}/{table_name_cow}" + "/*/*")

display(cowSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").filter(col("rider") == "rider-G"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092134564,uuid-007,rider-G,driver-T,284.5,san_francisco,2025-08-10 14:10:05


### 2. Incremental Reads
Hudi's incremental query feature lets us efficiently process only the data that has changed since a specific point in time. We'll start by listing all the commit times in our table.

Now, let's configure an incremental read to grab only the data committed after our update operation. Let's fetch the latest commit from the table.

In [11]:
# Get distinct commit times ordered
all_commits_df = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}") \
    .select("_hoodie_commit_time") \
    .distinct() \
    .orderBy("_hoodie_commit_time")

# Collect top 50 commit times as a list
all_commits = [row['_hoodie_commit_time'] for row in all_commits_df.take(50)]

incrementalTime = all_commits[-1]  # Commit time we are interested in
display(all_commits_df)
print(f"Incremental commit time: {incrementalTime}")

_hoodie_commit_time
20250926092128379
20250926092134564


Incremental commit time: 20250926092134564


In [12]:
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': incrementalTime,
}

incrementalQueryDF = spark.read.format("hudi") \
  .options(**incremental_read_options) \
  .load(f"{base_path}/{table_name_cow}")

incrementalQueryDF.createOrReplaceTempView("trips_incremental")

When we query our temporary incremental table, you can see that it returns only the single record that was updated since our last write operation.

In [13]:
display(spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare, city, ts from trips_incremental"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092134564,uuid-007,rider-G,driver-T,284.5,san_francisco,2025-08-10 14:10:05


### 3. Time Travel Query
Hudi also allows for time travel, which means we can query the state of our table at a specific point in the past. By specifying the commit time from our initial data insertion, we can view the table's contents before we performed the update.

In [14]:
beginTime = all_commits[-2]  # Commit time we are interested in
print(f"Begin/Initial commit time: {beginTime}")

Begin/Initial commit time: 20250926092128379


In [15]:
spark.read.format("hudi") \
  .option("as.of.instant", beginTime) \
  .load(f"{base_path}/{table_name_cow}").createOrReplaceTempView("trips_time_travel")

In [16]:
display(spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare, city, ts from trips_time_travel"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092128379,uuid-002,rider-B,driver-Y,22.75,san_francisco,2025-08-10 09:22:10
20250926092128379,uuid-005,rider-E,driver-V,25.1,san_francisco,2025-08-10 12:55:15
20250926092128379,uuid-007,rider-G,driver-T,28.45,san_francisco,2025-08-10 14:10:05
20250926092128379,uuid-001,rider-A,driver-X,18.5,new_york,2025-08-10 08:15:30
20250926092128379,uuid-004,rider-D,driver-W,31.9,new_york,2025-08-10 11:40:00
20250926092128379,uuid-008,rider-H,driver-S,16.25,new_york,2025-08-10 15:00:20
20250926092128379,uuid-010,rider-J,driver-Q,20.0,new_york,2025-08-10 16:30:00
20250926092128379,uuid-003,rider-C,driver-Z,14.6,chicago,2025-08-10 10:05:45
20250926092128379,uuid-006,rider-F,driver-U,19.8,chicago,2025-08-10 13:20:35
20250926092128379,uuid-009,rider-I,driver-R,24.35,chicago,2025-08-10 15:45:50


As you can see, querying the historical view shows the original fare for 'rider-G' before we updated it. This is a great way to audit or restore data from the past.

### 4. Change Data Capture (CDC)
Hudi's Change Data Capture (CDC) feature lets you read a stream of all the changes (inserts, updates, and deletes) that have been applied to your table. This is perfect for downstream systems that need to react to data modifications in real-time. We'll start by adding some new data and updating an existing record to generate some changes.

In [17]:
from pyspark.sql.functions import lit
from pyspark.sql import Row

# Define a DataFrame with one new record and one updated record
cdc_data = [
    ("2025-08-11 10:00:00", "uuid-011", "rider-K", "driver-P", 10.50, "chicago"), # new record
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 50.00, "san_francisco") # updated record
]

cdc_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
cdcDF = spark.createDataFrame(cdc_data).toDF(*cdc_columns)

Now, we'll perform an upsert with our new data. This will create a new commit with one insert and one update.

In [18]:
# Perform the upsert to introduce the new changes
cdcDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Now we can perform a CDC query using a special incremental format. We'll set the query type to "incremental" and specify "hoodie.datasource.query.incremental.format": "cdc". By using the latest_commit_time we just fetched, we can capture all the changes from our last commit. The output will include the op column, which tells us whether a record was inserted, updated, or deleted.

To see the changes from this specific transaction, we'll first get its commit time. We'll then use this as our starting point for the CDC query to capture all the changes from that moment forward.

In [19]:
# Read the Hudi table to get a DataFrame of all records.
hudi_df = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}")

# Get all distinct commit times and sort them in chronological order.
# We then collect this list to the driver.
all_commits = [
    row[0]
    for row in hudi_df.select("_hoodie_commit_time")
    .distinct()
    .orderBy("_hoodie_commit_time")
    .collect()
]

if len(all_commits) >= 2:
    # Use index -2 to get the second-to-last commit time from the sorted list.
    previous_commit_time = all_commits[-2]
    print(f"Second-to-last commit time: {previous_commit_time}")
else:
    print("There are not enough commits to determine a previous commit time.")
    previous_commit_time = 0

Second-to-last commit time: 20250926092134564


Now we can perform a CDC query using a special incremental format. We'll set the query type to "incremental" and specify "hoodie.datasource.query.incremental.format": "cdc". By using the latest_commit_time we just fetched, we can capture all the changes from our last commit. The output will include the op column, which tells us whether a record was inserted, updated, or deleted.

In [20]:
cdc_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': previous_commit_time,
  'hoodie.datasource.query.incremental.format': 'cdc'
}

cdcQueryDF = spark.read.format("hudi"). \
  options(**cdc_read_options). \
  load(f"{base_path}/{table_name_cow}")

display(cdcQueryDF)

op,ts_ms,before,after
u,20250926092136853,"{""ts"": ""2025-08-10 09:22:10"", ""uuid"": ""uuid-002"", ""rider"": ""rider-B"", ""driver"": ""driver-Y"", ""fare"": 22.75, ""city"": ""san_francisco""}","{""ts"": ""2025-08-10 09:22:10"", ""uuid"": ""uuid-002"", ""rider"": ""rider-B"", ""driver"": ""driver-Y"", ""fare"": 50.0, ""city"": ""san_francisco""}"
i,20250926092136853,,"{""ts"": ""2025-08-11 10:00:00"", ""uuid"": ""uuid-011"", ""rider"": ""rider-K"", ""driver"": ""driver-P"", ""fare"": 10.5, ""city"": ""chicago""}"


Let's look at the above output to see what happened:

**Update:** We have a record where **op is u**. This corresponds to the update we made to uuid-002. The before column shows the original fare of 22.75, and the after column shows the new fare of 50.0.

**Insert:** We also have a record where **op is i**. This is the new record for uuid-011. The before column is null because it didn't exist before this commit, while the after column contains all the new record's data.

### Hudi Configuration for a MOR Table

In [21]:
table_name_mor = "trips_table_mor"
base_path = f"s3a://warehouse/hudi-db"

mor_hudi_conf = {
    "hoodie.table.name": table_name_mor,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.datasource.write.hive_style_partitioning": "true"
}

### Inserting Data into a MOR Table

In [22]:
# Write the DataFrame to a Hudi MOR table
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**mor_hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_mor}")

The MOR table is ready now, we can begin our deep dive into Hudi's powerful query types. In this section, we will discuss:

- **Snapshot Query:** The default read mode for viewing the latest state of the table.
- **Read-Optimized (RO) Query:** A specialized mode for faster reads on MOR tables.

### Querying MOR Tables

Just like COW tables, lets query the MOR table as well. As we do not have any updates on our MOR table yet it is having only base data files. We can confirm the same by inspecting the filesystem.

In [23]:
ls(f"{base_path}/{table_name_mor}")

s3a://warehouse/hudi-db/trips_table_mor/.hoodie
s3a://warehouse/hudi-db/trips_table_mor/city=chicago
s3a://warehouse/hudi-db/trips_table_mor/city=new_york
s3a://warehouse/hudi-db/trips_table_mor/city=san_francisco


In [24]:
ls(f"{base_path}/{table_name_mor}/city=new_york")

s3a://warehouse/hudi-db/trips_table_mor/city=new_york/.hoodie_partition_metadata
s3a://warehouse/hudi-db/trips_table_mor/city=new_york/6162e656-529e-475e-9848-add94ca9429a-0_2-148-385_20250926092138358.parquet


### 1. Snapshot Query
When you run this query on a Merge-on-Read (MOR) table, Hudi merges the recent changes from the log files with the base data files to present the latest records, which can affect performance.

In [25]:
morSnapshotQueryDF = spark.read \
        .format("hudi") \
        .load(f"{base_path}/{table_name_mor}" + "/*/*")

display(morSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092138358,uuid-002,rider-B,driver-Y,22.75,san_francisco,2025-08-10 09:22:10
20250926092138358,uuid-005,rider-E,driver-V,25.1,san_francisco,2025-08-10 12:55:15
20250926092138358,uuid-007,rider-G,driver-T,28.45,san_francisco,2025-08-10 14:10:05
20250926092138358,uuid-003,rider-C,driver-Z,14.6,chicago,2025-08-10 10:05:45
20250926092138358,uuid-006,rider-F,driver-U,19.8,chicago,2025-08-10 13:20:35
20250926092138358,uuid-009,rider-I,driver-R,24.35,chicago,2025-08-10 15:45:50
20250926092138358,uuid-001,rider-A,driver-X,18.5,new_york,2025-08-10 08:15:30
20250926092138358,uuid-004,rider-D,driver-W,31.9,new_york,2025-08-10 11:40:00
20250926092138358,uuid-008,rider-H,driver-S,16.25,new_york,2025-08-10 15:00:20
20250926092138358,uuid-010,rider-J,driver-Q,20.0,new_york,2025-08-10 16:30:00


### 2. Reading in Read-Optimized Mode
Now, let's read the same table in read-optimized mode. This mode is faster because it only reads the base files, but it won't show any recent updates that are still in the log files.

In [26]:
mor_ro_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(f"{base_path}/{table_name_mor}")

display(mor_ro_df.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092138358,uuid-002,rider-B,driver-Y,22.75,san_francisco,2025-08-10 09:22:10
20250926092138358,uuid-005,rider-E,driver-V,25.1,san_francisco,2025-08-10 12:55:15
20250926092138358,uuid-007,rider-G,driver-T,28.45,san_francisco,2025-08-10 14:10:05
20250926092138358,uuid-001,rider-A,driver-X,18.5,new_york,2025-08-10 08:15:30
20250926092138358,uuid-004,rider-D,driver-W,31.9,new_york,2025-08-10 11:40:00
20250926092138358,uuid-008,rider-H,driver-S,16.25,new_york,2025-08-10 15:00:20
20250926092138358,uuid-010,rider-J,driver-Q,20.0,new_york,2025-08-10 16:30:00
20250926092138358,uuid-003,rider-C,driver-Z,14.6,chicago,2025-08-10 10:05:45
20250926092138358,uuid-006,rider-F,driver-U,19.8,chicago,2025-08-10 13:20:35
20250926092138358,uuid-009,rider-I,driver-R,24.35,chicago,2025-08-10 15:45:50


### Updating a Record in the MOR table
Let's update a record in our MOR table to see how it affects our read modes. We'll find the record for 'driver-W' and double its fare.

In [27]:
from pyspark.sql.functions import col
updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").filter(col("driver") == "driver-W").withColumn("fare", col("fare") * 2)

display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))

uuid,rider,driver,fare,city,ts
uuid-004,rider-D,driver-W,63.8,new_york,2025-08-10 11:40:00


Now we perform the upsert. In a MOR table, this update will be written to a log file, separate from the main Parquet data files.

In [28]:
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**mor_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_mor}")

In [29]:
ls(f"{base_path}/{table_name_mor}/city=new_york")

s3a://warehouse/hudi-db/trips_table_mor/city=new_york/.6162e656-529e-475e-9848-add94ca9429a-0_20250926092140565.log.1_0-194-504
s3a://warehouse/hudi-db/trips_table_mor/city=new_york/.hoodie_partition_metadata
s3a://warehouse/hudi-db/trips_table_mor/city=new_york/6162e656-529e-475e-9848-add94ca9429a-0_2-148-385_20250926092138358.parquet


After the update, a snapshot query correctly shows the new fare for 'driver-W'. This is because the log files containing our update were merged with the base files during this read operation.

In [30]:
morSnapshotQueryDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}")
display(morSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").filter(col("driver") == "driver-W"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092140565,uuid-004,rider-D,driver-W,63.8,new_york,2025-08-10 11:40:00


Finally, a read-optimized query of the same table still shows the old fare for 'driver-W'. This is because the read-optimized query only looks at the base data files and ignores the unmerged update in the log file.

In [31]:
mor_ro_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(f"{base_path}/{table_name_mor}")

display(mor_ro_df.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").filter(col("driver") == "driver-W"))

_hoodie_commit_time,uuid,rider,driver,fare,city,ts
20250926092138358,uuid-004,rider-D,driver-W,31.9,new_york,2025-08-10 11:40:00
