## Getting Started with Hudi: A Hands-on Guide to CRUD Operations

This notebook is a practical guide to performing CRUD (Create, Read, Update, Delete) operations on an Apache Hudi table using PySpark. We'll be using MinIO as our S3-compatible storage backend, demonstrating how to handle modern data lake architecture. We will work with both Copy-On-Write (COW) and Merge-On-Read (MOR) tables.

### Setting up the Spark Environment

We begin by loading the init_spark.ipynb notebook, which contains the necessary imports and functions to start a SparkSession.

In [6]:
%run utils/init_spark.ipynb
%run utils/s3_utils.ipynb
%run utils/s3_utils_updated.ipynb
%run utils/display_utils.ipynb

Now, let's start the SparkSession. We'll give it the app name 'Hudi-Jupyter' and configure it to use our Hudi and MinIO settings.

In [7]:
%%capture
spark = get_spark()

This is our initial dataset for our Hudi table. It's a list of ride records with columns for timestamp, a unique ID, rider and driver names, the fare, and the city.

In [4]:
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
    ("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
    ("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
    ("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
    ("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
    ("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
    ("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york"),
]

First, we are creating a PySpark DataFrame from our sample data. Let's take a quick look at the data to see what we're starting with.

In [5]:
inputDF = spark.createDataFrame(data).toDF(*columns)
#inputDF.show(truncate = False)
display(inputDF)

ts,uuid,rider,driver,fare,city
2025-08-10 08:15:30,uuid-001,rider-A,driver-X,18.5,new_york
2025-08-10 09:22:10,uuid-002,rider-B,driver-Y,22.75,san_francisco
2025-08-10 10:05:45,uuid-003,rider-C,driver-Z,14.6,chicago
2025-08-10 11:40:00,uuid-004,rider-D,driver-W,31.9,new_york
2025-08-10 12:55:15,uuid-005,rider-E,driver-V,25.1,san_francisco
2025-08-10 13:20:35,uuid-006,rider-F,driver-U,19.8,chicago
2025-08-10 14:10:05,uuid-007,rider-G,driver-T,28.45,san_francisco
2025-08-10 15:00:20,uuid-008,rider-H,driver-S,16.25,new_york
2025-08-10 15:45:50,uuid-009,rider-I,driver-R,24.35,chicago
2025-08-10 16:30:00,uuid-010,rider-J,driver-Q,20.0,new_york


## Copy-on-Write (COW) Tables

We will now explore the Copy-On-Write (COW) storage type. In a COW table, each time data is updated or deleted in a file, Hudi rewrites the entire file with the new data. This is a simpler and more traditional approach

### HUDI Configuration

Next, we will set up the specific configuration for our Hudi table. We'll use uuid as our unique record key and partition the data by city to keep it organized. The ts (timestamp) field is our precombine key, which helps Hudi decide which record to keep if it finds duplicates.

In [5]:
table_name_cow = "trips_table_cow"
base_path = f"s3a://warehouse/hudi-db"

hudi_conf = {
    "hoodie.table.name": table_name_cow,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.table.cdc.enabled": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true"
}

### Inserting data

This is the **"Create"** part of our CRUD operations. We are writing our initial DataFrame to MinIO as a Hudi table. Using mode("overwrite") ensures that we start with a fresh table every time we run to ensure a clean start.

In [6]:
# Write the DataFrame to a Hudi COW table
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_cow}")



There are two main types of files:

- Metadata files located in <base_path>/.hoodie/
- Data files stored within partition paths for partitioned tables, or under the base path for non-partitioned tables

In [7]:
ls(f"{base_path}/{table_name_cow}")

s3a://warehouse/hudi-db/trips_table_cow/.hoodie
s3a://warehouse/hudi-db/trips_table_cow/city=chicago
s3a://warehouse/hudi-db/trips_table_cow/city=new_york
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco


Hudi manages a table's metadata by storing it in a special directory within the base path. This metadata helps ensure that all tools reading and writing to the table follow the same rules.
One of the key files is hoodie.properties, which acts like a configuration file for the table, holding details like table name and version.

In [8]:
ls(f"{base_path}/{table_name_cow}/.hoodie")

s3a://warehouse/hudi-db/trips_table_cow/.hoodie/hoodie.properties
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/.aux
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/.index_defs
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/.schema
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/.temp
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/metadata
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/timeline


Another crucial part of this metadata is the Hudi Timeline, which consists of small files that log every change to the table. These meta-files follow the naming pattern below:

[action timestamp].[action type].[action state]

In [9]:
ls(f"{base_path}/{table_name_cow}/.hoodie/timeline")

s3a://warehouse/hudi-db/trips_table_cow/.hoodie/timeline/20250813173932729.commit.requested
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/timeline/20250813173932729.inflight
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/timeline/20250813173932729_20250813173936142.commit
s3a://warehouse/hudi-db/trips_table_cow/.hoodie/timeline/history


- An action timestamp is a unique, chronological identifier for each event, marking when it was scheduled.
- An action type describes the operation that took place. Examples include commit or deltacommit for data changes, compaction or clean for maintenance, and savepoint or restore for recovery.
- An action state shows the current status of the action. It can be **requested** (waiting to start), **inflight** (in progress), or **commit** (completed).

Now coming to the data management in Hudi, Hudi categorizes physical data files into Base File and Log File:

- Base File contains the main stored records in a Hudi table and is optimized for read.
- Log File contains the records' changes on top of its associated Base File and is optimized for write.

Within a partition path of a Hudi table, a single Base File and its associated Log Files (in case of MOR table) are grouped together as a File Slice.

In [10]:
ls(f"{base_path}/{table_name_cow}/city=san_francisco")

s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.hoodie_partition_metadata
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-24-56_20250813173932729.parquet


To easily query our newly created Hudi table, we first need to register it as a temporary SQL view. After that, we'll run a quick command to list all tables and confirm that it's ready to use.

In [11]:
spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").createOrReplaceTempView(f"{table_name_cow}")

In [12]:
spark.sql("show tables").show(truncate=False)

+---------+---------------+-----------+
|namespace|tableName      |isTemporary|
+---------+---------------+-----------+
|         |trips_table_cow|false      |
+---------+---------------+-----------+



### Upserting Records (Update)

Hudi's upsert is a powerful feature that allows us to insert new records or update existing ones. We are going to update the fare for 'rider-G' by multiplying it by 10. Here, we are showing the updated record before we apply the change to the table.

In [13]:
from pyspark.sql.functions import col
updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").filter(col("rider") == "rider-G").withColumn("fare", col("fare") * 10)

updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts").show()

+--------+-------+--------+-----+-------------+-------------------+
|    uuid|  rider|  driver| fare|         city|                 ts|
+--------+-------+--------+-----+-------------+-------------------+
|uuid-007|rider-G|driver-T|284.5|san_francisco|2025-08-10 14:10:05|
+--------+-------+--------+-----+-------------+-------------------+



### Upserting the modified record
Now, we'll perform the upsert. Because uuid is our record key, Hudi knows to find the original record and replace it with our new, updated one.

In [14]:
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Let's check the files in the **san_francisco** partition to see what happened after the upsert. Since this is a Copy-on-Write table, Hudi didn't just modify the existing record in place. Instead, it created a brand new Parquet file containing the updated record and all the other records for that partition.

In [15]:
ls(f"{base_path}/{table_name_cow}/city=san_francisco")

s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.24ca20d9-ed19-4507-a3f7-f31e74956271-0_20250813173937705.log.1_0-68-174.cdc
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.hoodie_partition_metadata
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-24-56_20250813173932729.parquet
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-68-174_20250813173937705.parquet


### Snapshot Query

This is the default query type when reading Hudi tables. Its goal is to give you a complete, up-to-the-minute view of your data. When you run this query on a Merge-on-Read (MOR) table, Hudi merges the recent changes from the log files with the base data files to present the latest records, which can affect performance.

Let's do a quick snapshot query to see the current state of our table. Notice the new _hoodie_commit_time for the updated record, which shows when the change was made.

In [16]:
snapshotQueryDF = spark.read \
        .format("hudi") \
        .load(f"{base_path}/{table_name_cow}" + "/*/*")

snapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").show()

+-------------------+--------+-------+--------+-----+-------------+-------------------+
|_hoodie_commit_time|    uuid|  rider|  driver| fare|         city|                 ts|
+-------------------+--------+-------+--------+-----+-------------+-------------------+
|  20250813173932729|uuid-002|rider-B|driver-Y|22.75|san_francisco|2025-08-10 09:22:10|
|  20250813173932729|uuid-005|rider-E|driver-V| 25.1|san_francisco|2025-08-10 12:55:15|
|  20250813173937705|uuid-007|rider-G|driver-T|284.5|san_francisco|2025-08-10 14:10:05|
|  20250813173932729|uuid-001|rider-A|driver-X| 18.5|     new_york|2025-08-10 08:15:30|
|  20250813173932729|uuid-004|rider-D|driver-W| 31.9|     new_york|2025-08-10 11:40:00|
|  20250813173932729|uuid-008|rider-H|driver-S|16.25|     new_york|2025-08-10 15:00:20|
|  20250813173932729|uuid-010|rider-J|driver-Q| 20.0|     new_york|2025-08-10 16:30:00|
|  20250813173932729|uuid-003|rider-C|driver-Z| 14.6|      chicago|2025-08-10 10:05:45|
|  20250813173932729|uuid-006|ri

After refreshing our table's metadata, we can run a SQL query to see the updated fare for 'rider-G'.

In [17]:
spark.sql(f"REFRESH TABLE {table_name_cow}")
tripsDF_read = spark.sql(f"select uuid, rider, driver, fare, city, ts from {table_name_cow}")
tripsDF_read.show(truncate=False)

+--------+-------+--------+-----+-------------+-------------------+
|uuid    |rider  |driver  |fare |city         |ts                 |
+--------+-------+--------+-----+-------------+-------------------+
|uuid-002|rider-B|driver-Y|22.75|san_francisco|2025-08-10 09:22:10|
|uuid-005|rider-E|driver-V|25.1 |san_francisco|2025-08-10 12:55:15|
|uuid-007|rider-G|driver-T|284.5|san_francisco|2025-08-10 14:10:05|
|uuid-001|rider-A|driver-X|18.5 |new_york     |2025-08-10 08:15:30|
|uuid-004|rider-D|driver-W|31.9 |new_york     |2025-08-10 11:40:00|
|uuid-008|rider-H|driver-S|16.25|new_york     |2025-08-10 15:00:20|
|uuid-010|rider-J|driver-Q|20.0 |new_york     |2025-08-10 16:30:00|
|uuid-003|rider-C|driver-Z|14.6 |chicago      |2025-08-10 10:05:45|
|uuid-006|rider-F|driver-U|19.8 |chicago      |2025-08-10 13:20:35|
|uuid-009|rider-I|driver-R|24.35|chicago      |2025-08-10 15:45:50|
+--------+-------+--------+-----+-------------+-------------------+



### Incremental Reads

Hudi's incremental query feature lets us efficiently process only the data that has changed since a specific point in time. We'll start by listing all the commit times in our table.

In [18]:
spark.sql(f"select distinct(_hoodie_commit_time) as commitTime from {table_name_cow} order by commitTime").show()

+-----------------+
|       commitTime|
+-----------------+
|20250813173932729|
|20250813173937705|
+-----------------+



Now, let's configure an incremental read to grab only the data committed after our initial write operation. Let's fetch the latest commit from the above.

In [19]:
# Get distinct commit times ordered
commits_df = spark.sql(f"SELECT DISTINCT(_hoodie_commit_time) AS commitTime FROM {table_name_cow} ORDER BY commitTime")

# Collect top 50 commit times as a list
commits = [row['commitTime'] for row in commits_df.take(50)]

incrementalTime = commits[-1]  # Commit time we are interested in
print(f"Incremental commit time: {incrementalTime}")

Incremental commit time: 20250813173937705


In [20]:
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': incrementalTime,
}

incrementalQueryDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(f"{base_path}/{table_name_cow}")
incrementalQueryDF.createOrReplaceTempView("trips_incremental")

When we query our temporary incremental table, you can see that it returns only the single record that was updated since our last write operation.

In [21]:
spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare, city, ts from trips_incremental").show(truncate=False)

+-------------------+--------+-------+--------+-----+-------------+-------------------+
|_hoodie_commit_time|uuid    |rider  |driver  |fare |city         |ts                 |
+-------------------+--------+-------+--------+-----+-------------+-------------------+
|20250813173937705  |uuid-007|rider-G|driver-T|284.5|san_francisco|2025-08-10 14:10:05|
+-------------------+--------+-------+--------+-----+-------------+-------------------+



In [22]:
# Get distinct commit times ordered
commits_df = spark.sql(f"SELECT DISTINCT(_hoodie_commit_time) AS commitTime FROM {table_name_cow} ORDER BY commitTime")

# Collect top 50 commit times as a list
commits = [row['commitTime'] for row in commits_df.take(50)]

beginTime = commits[-2]  # Commit time we are interested in
print(f"Begin commit time: {beginTime}")

Begin commit time: 20250813173932729


### Time Travel Query
Hudi also allows for time travel, which means we can query the state of our table at a specific point in the past. By specifying the commit time from our initial data insertion, we can view the table's contents before we performed the update.

In [23]:
spark.read.format("hudi") \
  .option("as.of.instant", beginTime) \
  .load(f"{base_path}/{table_name_cow}").createOrReplaceTempView("trips_time_travel")

As you can see, querying the historical view shows the original fare for 'rider-G' before we updated it. This is a great way to audit or restore data from the past.

In [24]:
spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare, city, ts from trips_time_travel").show(truncate=False)

+-------------------+--------+-------+--------+-----+-------------+-------------------+
|_hoodie_commit_time|uuid    |rider  |driver  |fare |city         |ts                 |
+-------------------+--------+-------+--------+-----+-------------+-------------------+
|20250813173932729  |uuid-002|rider-B|driver-Y|22.75|san_francisco|2025-08-10 09:22:10|
|20250813173932729  |uuid-005|rider-E|driver-V|25.1 |san_francisco|2025-08-10 12:55:15|
|20250813173932729  |uuid-007|rider-G|driver-T|28.45|san_francisco|2025-08-10 14:10:05|
|20250813173932729  |uuid-001|rider-A|driver-X|18.5 |new_york     |2025-08-10 08:15:30|
|20250813173932729  |uuid-004|rider-D|driver-W|31.9 |new_york     |2025-08-10 11:40:00|
|20250813173932729  |uuid-008|rider-H|driver-S|16.25|new_york     |2025-08-10 15:00:20|
|20250813173932729  |uuid-010|rider-J|driver-Q|20.0 |new_york     |2025-08-10 16:30:00|
|20250813173932729  |uuid-003|rider-C|driver-Z|14.6 |chicago      |2025-08-10 10:05:45|
|20250813173932729  |uuid-006|ri

## Change Data Capture (CDC)

Hudi's Change Data Capture (CDC) feature lets you read a stream of all the changes (inserts, updates, and deletes) that have been applied to your table. This is perfect for downstream systems that need to react to data modifications in real-time. We'll start by adding some new data and updating an existing record to generate some changes.

In [25]:
from pyspark.sql.functions import lit
from pyspark.sql import Row

# Define a DataFrame with one new record and one updated record
cdc_data = [
    ("2025-08-11 10:00:00", "uuid-011", "rider-K", "driver-P", 10.50, "chicago"), # new record
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 50.00, "san_francisco") # updated record
]

cdc_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
cdcDF = spark.createDataFrame(cdc_data).toDF(*cdc_columns)

Now, we'll perform an upsert with our new data. This will create a new commit with one insert and one update.

In [26]:
cdcDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

To see the changes from this specific transaction, we'll first get its commit time. We'll then use this as our starting point for the CDC query to capture all the changes from that moment forward.

In [10]:
latest_commit_time = spark.sql(f"select max(_hoodie_commit_time) from {table_name_cow}").collect()[0][0]
print(f"Latest commit time: {latest_commit_time}")

NameError: name 'table_name_cow' is not defined

Now we can perform a CDC query using a special incremental format. We'll set the query type to "incremental" and specify "hoodie.datasource.query.incremental.format": "cdc". By using the latest_commit_time we just fetched, we can capture all the changes from our last commit. The output will include the op column, which tells us whether a record was inserted, updated, or deleted.

In [28]:
cdc_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': latest_commit_time,
  'hoodie.datasource.query.incremental.format': 'cdc'
}

cdcQueryDF = spark.read.format("hudi"). \
  options(**cdc_read_options). \
  load(f"{base_path}/{table_name_cow}").show(truncate=False)
#cdcQueryDF.select("_hoodie_commit_time", "_hoodie_operation", "uuid", "rider", "fare", "city", "ts").show(truncate=False)

+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|op |ts_ms            |before                                                                                                                             |after                                                                                                                             |
+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|u  |20250813173940238|{"ts": "2025-08-10 09:22:10", "uuid": "uuid-002", "rider": "rider-B", "driver": "driver-Y", "fare": 22.75, "city": "

Let's look at the above output to see what happened:

**Update:** We have a record where **op is u**. This corresponds to the update we made to uuid-002. The before column shows the original fare of 22.75, and the after column shows the new fare of 50.0.

**Insert:** We also have a record where **op is i**. This is the new record for uuid-011. The before column is null because it didn't exist before this commit, while the after column contains all the new record's data.

### Delete a record
Finally, let's demonstrate how to delete a record. We'll remove the record for 'rider-G' from our table.

In [29]:
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "delete") \
    .options(**hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Since this is a Copy-on-Write table, Hudi will not simply remove the record in place. Instead, it will rewrite the Parquet file in the city=san_francisco partition, creating a new file that contains all the original records for that partition except for the one we deleted. We can confirm this by checking the file system and seeing a new Parquet file with a fresh commit timestamp.

In [30]:
ls("s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco")

s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.24ca20d9-ed19-4507-a3f7-f31e74956271-0_20250813173937705.log.1_0-68-174.cdc
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.24ca20d9-ed19-4507-a3f7-f31e74956271-0_20250813173940238.log.1_0-117-305.cdc
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.24ca20d9-ed19-4507-a3f7-f31e74956271-0_20250813173941637.log.1_0-161-388.cdc
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/.hoodie_partition_metadata
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-117-305_20250813173940238.parquet
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-161-388_20250813173941637.parquet
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-24-56_20250813173932729.parquet
s3a://warehouse/hudi-db/trips_table_cow/city=san_francisco/24ca20d9-ed19-4507-a3f7-f31e74956271-0_0-68-1

Running a snapshot query now confirms that the record for **'rider-G' (uuid-007)** is no longer present in our table.

In [31]:
snapshotQueryDF = spark.read \
        .format("hudi") \
        .load(f"{base_path}/{table_name_cow}" + "/*/*")

snapshotQueryDF.show(truncate=False)

+-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+-------------------+--------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                         |ts                 |uuid    |rider  |driver  |fare |city         |
+-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+-------------------+--------+-------+--------+-----+-------------+
|20250813173932729  |20250813173932729_2_0|uuid-001          |city=new_york         |e1096794-e164-4c38-81db-737ab3c3738f-0_2-24-58_20250813173932729.parquet  |2025-08-10 08:15:30|uuid-001|rider-A|driver-X|18.5 |new_york     |
|20250813173932729  |20250813173932729_2_1|uuid-004          |city=new_york         |e109679

In [32]:
spark.sql(f"REFRESH TABLE {table_name_cow}")
tripsDF_read = spark.sql(f"select uuid, rider, driver, fare, city, ts from {table_name_cow}")
tripsDF_read.show(truncate=False)

+--------+-------+--------+-----+-------------+-------------------+
|uuid    |rider  |driver  |fare |city         |ts                 |
+--------+-------+--------+-----+-------------+-------------------+
|uuid-001|rider-A|driver-X|18.5 |new_york     |2025-08-10 08:15:30|
|uuid-004|rider-D|driver-W|31.9 |new_york     |2025-08-10 11:40:00|
|uuid-008|rider-H|driver-S|16.25|new_york     |2025-08-10 15:00:20|
|uuid-010|rider-J|driver-Q|20.0 |new_york     |2025-08-10 16:30:00|
|uuid-003|rider-C|driver-Z|14.6 |chicago      |2025-08-10 10:05:45|
|uuid-006|rider-F|driver-U|19.8 |chicago      |2025-08-10 13:20:35|
|uuid-009|rider-I|driver-R|24.35|chicago      |2025-08-10 15:45:50|
|uuid-011|rider-K|driver-P|10.5 |chicago      |2025-08-11 10:00:00|
|uuid-002|rider-B|driver-Y|50.0 |san_francisco|2025-08-10 09:22:10|
|uuid-005|rider-E|driver-V|25.1 |san_francisco|2025-08-10 12:55:15|
+--------+-------+--------+-----+-------------+-------------------+



In [33]:
spark.sql("show tables").show(truncate=False)

+---------+-----------------+-----------+
|namespace|tableName        |isTemporary|
+---------+-----------------+-----------+
|         |trips_incremental|false      |
|         |trips_table_cow  |false      |
|         |trips_time_travel|false      |
+---------+-----------------+-----------+



## Merge-on-Read (MOR) Tables
For comparison, let's explore the Merge-on-Read (MOR) table type. In a MOR table, updates are written to a separate log file, which is then merged with the base data files when you read the table.

Here's the configuration for our MOR table.

In [34]:
table_name_mor = "trips_table_mor"
base_path = f"s3a://warehouse/hudi-db"

hudi_conf = {
    "hoodie.table.name": table_name_mor,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.datasource.write.hive_style_partitioning": "true"
}

### Inserting data

Let's insert the same initial dataset into our new MOR table.

In [35]:
# Write the DataFrame to a Hudi MOR table
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_mor}")

### Reading in Real-time Mode (Default)

When you perform a standard read on a MOR table, Hudi automatically merges the base and log files for you. This gives you the most up-to-date, real-time view of your data.

In [36]:
spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                 ts|    uuid|  rider|  driver| fare|         city|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|  20250813173942960|20250813173942960...|          uuid-002|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 09:22:10|uuid-002|rider-B|driver-Y|22.75|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-005|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 12:55:15|uuid-005|rider-E|driver-V| 25.1|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-007|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 14:10:05|uuid-007|rider-

### Reading in Read-Optimized Mode
Now, let's read the same table in read-optimized mode. This mode is faster because it only reads the base files, but it won't show any recent updates that are still in the log files.

In [37]:
spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(f"{base_path}/{table_name_mor}") \
    .show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                 ts|    uuid|  rider|  driver| fare|         city|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|  20250813173942960|20250813173942960...|          uuid-002|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 09:22:10|uuid-002|rider-B|driver-Y|22.75|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-005|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 12:55:15|uuid-005|rider-E|driver-V| 25.1|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-007|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 14:10:05|uuid-007|rider-

### Updating a Record in the MOR table
Let's update a record in our MOR table to see how it affects our read modes. We'll find the record for 'driver-W' and double its fare.

In [38]:
from pyspark.sql.functions import col
updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").filter(col("driver") == "driver-W").withColumn("fare", col("fare") * 2)

updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts").show()

+--------+-------+--------+----+--------+-------------------+
|    uuid|  rider|  driver|fare|    city|                 ts|
+--------+-------+--------+----+--------+-------------------+
|uuid-004|rider-D|driver-W|63.8|new_york|2025-08-10 11:40:00|
+--------+-------+--------+----+--------+-------------------+



Now we perform the upsert. In a MOR table, this update will be written to a log file, separate from the main Parquet data files.

In [39]:
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_mor}")

After the update, a real-time read correctly shows the new fare for 'driver-W'. This is because the log files containing our update were merged with the base files during this read operation.

In [40]:
spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                 ts|    uuid|  rider|  driver| fare|         city|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|  20250813173942960|20250813173942960...|          uuid-002|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 09:22:10|uuid-002|rider-B|driver-Y|22.75|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-005|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 12:55:15|uuid-005|rider-E|driver-V| 25.1|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-007|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 14:10:05|uuid-007|rider-

Finally, a read-optimized query of the same table still shows the old fare for 'driver-W'. This is because the read-optimized query only looks at the base data files and ignores the unmerged update in the log file.

In [41]:
spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(f"{base_path}/{table_name_mor}") \
    .show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                 ts|    uuid|  rider|  driver| fare|         city|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------------+--------+-------+--------+-----+-------------+
|  20250813173942960|20250813173942960...|          uuid-002|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 09:22:10|uuid-002|rider-B|driver-Y|22.75|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-005|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 12:55:15|uuid-005|rider-E|driver-V| 25.1|san_francisco|
|  20250813173942960|20250813173942960...|          uuid-007|    city=san_francisco|f4c57bc4-500f-48f...|2025-08-10 14:10:05|uuid-007|rider-

## Deduplication with Precombine Field

A key feature of Hudi is its ability to handle duplicate records automatically. Hudi uses the precombine.field to decide which record to keep when it encounters two or more records with the same recordkey in a single write operation. In our configuration, the precombine.field is set to ts, so Hudi will keep the record with the latest timestamp.

Let's create a new DataFrame with some duplicate data to see this in action. We'll add two records with the same uuid (uuid-001) but with different ts values. The second record has a later timestamp and a higher fare.

In [42]:
from pyspark.sql.functions import lit

duplicate_data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-Z", 25.50, "new_york"), # A duplicate record with an old timestamp
    ("2025-08-10 17:00:00", "uuid-001", "rider-A", "driver-A", 30.00, "new_york"), # Another duplicate record with a new timestamp
    ("2025-08-11 07:45:00", "uuid-012", "rider-L", "driver-T", 12.25, "chicago")   # A new record
]
duplicate_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
duplicatesDF = spark.createDataFrame(duplicate_data).toDF(*duplicate_columns)

duplicatesDF.show(truncate=False)

+-------------------+--------+-------+--------+-----+--------+
|ts                 |uuid    |rider  |driver  |fare |city    |
+-------------------+--------+-------+--------+-----+--------+
|2025-08-10 08:15:30|uuid-001|rider-A|driver-Z|25.5 |new_york|
|2025-08-10 17:00:00|uuid-001|rider-A|driver-A|30.0 |new_york|
|2025-08-11 07:45:00|uuid-012|rider-L|driver-T|12.25|chicago |
+-------------------+--------+-------+--------+-----+--------+



Now, let's upsert this data into our COW table. Hudi will process the duplicate records for uuid-001 and, based on our precombine.field (ts), it will only keep the record with the later timestamp.

In [43]:
table_name_cow = "trips_table_cow"
base_path = f"s3a://warehouse/hudi-db"

hudi_conf = {
    "hoodie.table.name": table_name_cow,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.table.cdc.enabled": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true"
}

In [44]:
duplicatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Finally, we'll query the table to see the result. As you can see in the output, only one record for uuid-001 exists, and it's the one with the latest timestamp (2025-08-10 17:00:00). The record with the older timestamp was discarded, and the new record for uuid-012 was successfully inserted.

In [45]:
spark.sql(f"REFRESH TABLE {table_name_cow}")
spark.sql(f"select uuid, rider, driver, fare, city, ts from {table_name_cow} where uuid = 'uuid-001' or uuid = 'uuid-012'").show(truncate=False)

+--------+-------+--------+-----+--------+-------------------+
|uuid    |rider  |driver  |fare |city    |ts                 |
+--------+-------+--------+-----+--------+-------------------+
|uuid-012|rider-L|driver-T|12.25|chicago |2025-08-11 07:45:00|
|uuid-001|rider-A|driver-A|30.0 |new_york|2025-08-10 17:00:00|
+--------+-------+--------+-----+--------+-------------------+



In [50]:
%run utils/display_utils.ipynb

In [51]:
display(inputDF)

ts,uuid,rider,driver,fare,city
2025-08-10 08:15:30,uuid-001,rider-A,driver-X,18.5,new_york
2025-08-10 09:22:10,uuid-002,rider-B,driver-Y,22.75,san_francisco
2025-08-10 10:05:45,uuid-003,rider-C,driver-Z,14.6,chicago
2025-08-10 11:40:00,uuid-004,rider-D,driver-W,31.9,new_york
2025-08-10 12:55:15,uuid-005,rider-E,driver-V,25.1,san_francisco
2025-08-10 13:20:35,uuid-006,rider-F,driver-U,19.8,chicago
2025-08-10 14:10:05,uuid-007,rider-G,driver-T,28.45,san_francisco
2025-08-10 15:00:20,uuid-008,rider-H,driver-S,16.25,new_york
2025-08-10 15:45:50,uuid-009,rider-I,driver-R,24.35,chicago
2025-08-10 16:30:00,uuid-010,rider-J,driver-Q,20.0,new_york


### Deleting a Record in a MOR Table
Just like with the upsert, deleting a record in a Merge-on-Read table is handled by writing a new log file, not by rewriting the entire base file. Hudi records the delete action in the log, and the record will appear to be gone in a snapshot/real-time query.

First, let's create a DataFrame that contains the record we want to delete. We'll delete the record for rider-E (uuid-005).