## README
* In this notebook, we are going to explore the behavior of Merge on Read table.
* This is outside of the Hudi quick start guide.
### Goal
* See the definition of MoR.
* Will the result be written back to the parquet file after read?

### Clean up first

In [1]:
import sys.process._
"rm -rf /tmp/hudimor/".!!
"mkdir -p /tmp/hudimor".!!
lazy val result3 = "tree -a /tmp/hudimor/".!!
println(result3)

/tmp/hudimor/

0 directories, 0 files



### Setup project

In [2]:
import $ivy.`org.apache.hudi:hudi-spark3.3-bundle_2.12:1.0.0`
// import $ivy.`org.apache.hudi:hudi-common:1.0.0`
import $ivy.`org.apache.spark:spark-sql_2.12:3.3.2`
import $ivy.`org.apache.spark:spark-avro_2.12:3.3.2`
import $ivy.`org.apache.spark:spark-hive_2.12:3.3.2`
import $cp.`CustomMergeIntoConnector.jar`

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.hive.HiveExternalCatalog
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import com.payloads.CustomMergeIntoConnector

val spark = SparkSession.builder()
  .appName("HudiLocalSession")
  .master("local[1]")  // Runs on local machine with 1 local worker / thread, so we can see the impact on COPY_ON_WRITE
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
  .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
  .config("spark.driver.extraJavaOptions", "-Dscala.repl.maxprintstring=0")
  .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
  
// http://localhost:4040/jobs/
println("Spark with Hudi is ready!")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties


25/03/05 20:27:59 WARN Utils: Your hostname, DESKTOP-M94RUSC resolves to a loopback address: 127.0.1.1; using 172.17.75.227 instead (on interface eth0)
25/03/05 20:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/03/05 20:28:00 INFO SparkContext: Running Spark version 3.3.2
25/03/05 20:28:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/05 20:28:00 INFO ResourceUtils: No custom resources configured for spark.driver.
25/03/05 20:28:00 INFO SparkContext: Submitted application: HudiLocalSession
25/03/05 20:28:00 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/03/05 20:28:00 INFO ResourceProfile: Limiting resource is cpu
25/0

[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$cp.$[39m
[32mimport [39m[36morg.apache.spark.sql._[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[32mimport [39m[36morg.apache.hudi.QuickstartUtils._[39m
[32mimport [39m[36morg.apache.spark.sql.hive.HiveExternalCatalog[39m
[32mimport [39m[36mscala.collection.JavaConversions._[39m
[32mimport [39m[36morg.apache.spark.sql.SaveMode._[39m
[32mimport [39m[36morg.apache.hudi.DataSourceReadOptions._[39m
[32mimport [39m[36morg.apache.hudi.DataSourceWriteOptions._[39m
[32mimport [39m[36morg.apache.hudi.common.table.HoodieTableConfig._[39m
[32mimport [39m[36morg.apache.hudi.config.HoodieWriteConfig._[39m
[32mimport [39m[36morg.apache.hudi.keygen.constant.KeyGeneratorOptions._[39m
[32mimport [39m[36morg.apache.hudi.common.model.HoodieRecord[39m
[32mimport [39m[36mcom.payloads.

In [3]:
lazy val result = "tree -a /tmp/hudimor/".!!
println(result)

/tmp/hudimor/

0 directories, 0 files



### Create and Insert Table

In [4]:
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

val tableName = "trips_table"
val basePath = "file:///tmp/hudimor/trips_table"
var inserts = spark.createDataFrame(data).toDF(columns:_*)

inserts.write.format("hudi").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.datasource.write.hive_style_partitioning", "true"). // Force hive style dir naming for partitions.
  option("hoodie.datasource.write.storage.type", "MERGE_ON_READ").
  option("hoodie.table.name", tableName).
  mode(Overwrite).
  save(basePath)





In [5]:
lazy val result = "tree -a /tmp/hudimor/".!!
println(result)

/tmp/hudimor/
└── trips_table
    ├── .hoodie
    │   ├── .aux
    │   │   └── .bootstrap
    │   │       ├── .fileids
    │   │       └── .partitions
    │   ├── .hoodie.properties.crc
    │   ├── .schema
    │   ├── .temp
    │   ├── hoodie.properties
    │   ├── metadata
    │   │   ├── .hoodie
    │   │   │   ├── .aux
    │   │   │   │   └── .bootstrap
    │   │   │   │       ├── .fileids
    │   │   │   │       └── .partitions
    │   │   │   ├── .hoodie.properties.crc
    │   │   │   ├── .schema
    │   │   │   ├── .temp
    │   │   │   ├── hoodie.properties
    │   │   │   └── timeline
    │   │   │       ├── .00000000000000000.deltacommit.inflight.crc
    │   │   │       ├── .00000000000000000.deltacommit.requested.crc
    │   │   │       ├── .00000000000000000_20250305202805121.deltacommit.crc
    │   │   │       ├── .20250305202803543.deltacommit.inflight.crc
    │   │   │       ├── .20250305202803543.deltacommit.requested.crc
    │   │   │       ├── .20250305202803543_202503

* If we check the Hudi table dir, we can find several parquet files.
* Here we have 1 parquet file for each city/partition.
* This is because we set the number of worker to 1 and the file is tiny.

In [6]:
val command = "find /tmp/hudimor/trips_table -name \\\"*.parquet\\\" -exec stat -c \\\"%y %s %n\\\" {} + | sort"
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
2025-03-05 20:28:06.932652197 +0900 435920 /tmp/hudimor/trips_table/city=chennai/57c4470e-e641-4289-a64e-f46601c1dfaa-2_0-6-0_20250305202803543.parquet
2025-03-05 20:28:06.932652197 +0900 435948 /tmp/hudimor/trips_table/city=sao_paulo/57c4470e-e641-4289-a64e-f46601c1dfaa-1_0-6-0_20250305202803543.parquet



* Lets observe the current state of the commit files.
* Unlike CoW, MoR tables do not have commit files.
    * MoR tables do still have deltacommit files.
    * The timeliune dir has .deltacommit files instead of .commit.

In [7]:
val command = """find /tmp/hudimor/trips_table/.hoodie  -name "*commit" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:05.114264103 +0900 9913 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250305202805121.deltacommit
2025-03-05 20:28:07.224268429 +0900 9985 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/20250305202803543_20250305202807229.deltacommit
2025-03-05 20:28:07.254440790 +0900 3584 /tmp/hudimor/trips_table/.hoodie/timeline/20250305202803543_20250305202807266.deltacommit



### Query data

In [8]:
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")

spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()

+--------------------+-----+-------------+-------+--------+-------------+
|                uuid| fare|           ts|  rider|  driver|         city|
+--------------------+-----+-------------+-------+--------+-------------+
|e96c4396-3fad-413...| 27.7|1695091554788|rider-C|driver-M|san_francisco|
|9909a8b1-2d15-4d3...| 33.9|1695046462179|rider-D|driver-L|san_francisco|
|e3cf430c-889d-401...|34.15|1695516137016|rider-F|driver-P|    sao_paulo|
+--------------------+-----+-------------+-------+--------+-------------+

+-------------------+--------------------+----------------------+-------+--------+-----+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|  rider|  driver| fare|
+-------------------+--------------------+----------------------+-------+--------+-----+
|  20250305202803543|20250305202803543...|    city=san_francisco|rider-A|driver-K| 19.1|
|  20250305202803543|20250305202803543...|    city=san_francisco|rider-C|driver-M| 27.7|
|  20250305202803543|202503052028035

[36mtripsDF[39m: [32mDataFrame[39m = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

In [9]:
val command = "find /tmp/hudimor/trips_table -name \\\"*.parquet\\\" -exec stat -c \\\"%y %s %n\\\" {} + | sort"
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)
val command2 = """find /tmp/hudimor/trips_table/.hoodie  -name "*commit" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result2 = Seq("bash", "-c", s"eval $command2").!!
println(result2)

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
2025-03-05 20:28:06.932652197 +0900 435920 /tmp/hudimor/trips_table/city=chennai/57c4470e-e641-4289-a64e-f46601c1dfaa-2_0-6-0_20250305202803543.parquet
2025-03-05 20:28:06.932652197 +0900 435948 /tmp/hudimor/trips_table/city=sao_paulo/57c4470e-e641-4289-a64e-f46601c1dfaa-1_0-6-0_20250305202803543.parquet

2025-03-05 20:28:05.114264103 +0900 9913 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250305202805121.deltacommit
2025-03-05 20:28:07.224268429 +0900 9985 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/20250305202803543_20250305202807229.deltacommit
2025-03-05 20:28:07.254440790 +0900 3584 /tmp/hudimor/trips_table/.hoodie/timeline/20250305202803543_20250305202807266.deltacommit



* Nothing has changed after this read query.

### Update data

In [10]:
// Lets read data from target Hudi table, modify fare column for rider-D and update it. 
import spark.implicits._
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)

updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  mode(Append).
  save(basePath)

[32mimport [39m[36mspark.implicits._[39m
[36mupdatesDf[39m: [32mDataFrame[39m = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

* Notice after the update, we have 2 commit files and 3 deltacommit files (1 is initial state for table creation, labeled all 0).

In [11]:
lazy val result = "tree -a /tmp/hudimor/trips_table".!!
println(result)

/tmp/hudimor/trips_table
├── .hoodie
│   ├── .aux
│   │   └── .bootstrap
│   │       ├── .fileids
│   │       └── .partitions
│   ├── .hoodie.properties.crc
│   ├── .schema
│   ├── .temp
│   ├── hoodie.properties
│   ├── metadata
│   │   ├── .hoodie
│   │   │   ├── .aux
│   │   │   │   └── .bootstrap
│   │   │   │       ├── .fileids
│   │   │   │       └── .partitions
│   │   │   ├── .hoodie.properties.crc
│   │   │   ├── .schema
│   │   │   ├── .temp
│   │   │   ├── hoodie.properties
│   │   │   └── timeline
│   │   │       ├── .00000000000000000.deltacommit.inflight.crc
│   │   │       ├── .00000000000000000.deltacommit.requested.crc
│   │   │       ├── .00000000000000000_20250305202805121.deltacommit.crc
│   │   │       ├── .20250305202803543.deltacommit.inflight.crc
│   │   │       ├── .20250305202803543.deltacommit.requested.crc
│   │   │       ├── .20250305202803543_20250305202807229.deltacommit.crc
│   │   │       ├── .20250305202809755.deltacommit.inflight.crc
│   │   │       ├

* Now we got 2 extra deltacommit files.
* Compared to CoW commit file, the deltacommit file for MoR for this insertion is a tiny bit larger (3024 vs 3099 bytes).
* I assume that this is because the deltacommit file is the delta file for the target parquet file. 

In [12]:
val command = """find /tmp/hudimor/trips_table/.hoodie  -name "*commit" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:05.114264103 +0900 9913 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250305202805121.deltacommit
2025-03-05 20:28:07.224268429 +0900 9985 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/20250305202803543_20250305202807229.deltacommit
2025-03-05 20:28:07.254440790 +0900 3584 /tmp/hudimor/trips_table/.hoodie/timeline/20250305202803543_20250305202807266.deltacommit
2025-03-05 20:28:10.445133245 +0900 9985 /tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/20250305202809755_20250305202810457.deltacommit
2025-03-05 20:28:10.475354810 +0900 3099 /tmp/hudimor/trips_table/.hoodie/timeline/20250305202809755_20250305202810482.deltacommit



* Lets observe the parquet files in the update.
* They still have the same data and timestamp as before the update.

In [13]:
val command = """find /tmp/hudimor/trips_table -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
2025-03-05 20:28:06.932652197 +0900 435920 /tmp/hudimor/trips_table/city=chennai/57c4470e-e641-4289-a64e-f46601c1dfaa-2_0-6-0_20250305202803543.parquet
2025-03-05 20:28:06.932652197 +0900 435948 /tmp/hudimor/trips_table/city=sao_paulo/57c4470e-e641-4289-a64e-f46601c1dfaa-1_0-6-0_20250305202803543.parquet



#### Observing updated partition (san_francisco)
* With COPY_ON_WRITE (CoW), modified row forces the recreation of the affected file (not the whole partition).
* In our case we have only 1 file, so we recreate the entire partition.
* If we have more than 1 file, only the affected file is recreated (try with local[*]).
* This means, for every write, Hudi must find the affected file and rewrite the entire file.
    * This is why CoW is more expensive than MERGE_ON_READ for writes.

In [14]:
val command = """find /tmp/hudimor/trips_table -path "*san_francisco*" -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val parquetPaths = result.split("\n")
for(parquetPath <- parquetPaths){
    val path = parquetPath.split(" ")(4)
    println(parquetPath)
    spark.read.format("parquet").load(path).show(truncate=false)
    println()
}

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key   |_hoodie_partition_path|_hoodie_file_name                                                     |ts           |uuid                                |rider  |driver  |fare|city         |
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|20250305202803543  |20250305202803543_0_1|20250305202803543_0_0|city=san_francisco    |57c4470e-e641

#### Inspecting commit files.
* There are two commit files: deltacommit and commit.
    * deltacommit contains the partition write stats (files and their metadata).
    * commit contains the updated partitions and their files plus schema for the file.
* https://github.com/apache/hudi/issues/2288
    * Each file is created with extra extensions instead of renamed to improve compatibility with different DFSs.
    * Might be an issue for some DFS because it might creates too many tiny files.
    * Can be expensive for metadata server.

In [15]:
val command = """find /tmp/hudimor/trips_table -name "*.deltacommit" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val hudiMetas = result.split("\n")
for(hudiMeta <- hudiMetas){
    val path = hudiMeta.split(" ")(4)
    println(path)
    spark.read.format("avro").load(path).select("operationType", "partitionToWriteStats", "extraMetadata", "version").show(truncate=false)
    println()
}

/tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250305202805121.deltacommit
+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

* Several things that I notice here for MoR vs CoW:
    * The timeline deltacommit is having .log* file instead of .parquet.
    * The number of rows is 1 instead of 3 in the CoW table.
        * This is because we only have a single row change.
    * If we inspect the .log* file in the partition dir, we can see the updated row there.

In [16]:
val command = """find /tmp/hudimor/trips_table -name *.log.* -not -name "*.crc"  -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
val command2 = """find /tmp/hudimor/trips_table -name *.log.* -not -path "*.hoodie*" -not -name "*.crc"  -exec cat {} +"""
lazy val result2 = Seq("bash", "-c", s"eval $command2").!!
println(result + "\n")
println(result2)

2025-03-05 20:28:04.361697952 +0900 80 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0
2025-03-05 20:28:07.163923707 +0900 14235 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202803543.log.1_0-11-10
2025-03-05 20:28:10.243656160 +0900 1079 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22
2025-03-05 20:28:10.414911681 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202809755.log.1_0-26-27


#HUDI#      )              	{"type":"record","name":"trips_table_record","namespace":"hoodie.trips_table","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default"

* Note that in deltacommit, the initial 00000000 file is BULK_INSERT and the other insertion are UPSERT_PREPPED.

In [17]:
val hudiMetadata2 = "/tmp/hudimor/trips_table/.hoodie/timeline/*commit"
val dfHudiMeta2 = spark.read.format("avro").load(hudiMetadata2)
dfHudiMeta2.select("operationType", "partitionToWriteStats", "extraMetadata", "version").show(truncate=false)

+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

[36mhudiMetadata2[39m: [32mString[39m = [32m"/tmp/hudimor/trips_table/.hoodie/timeline/*commit"[39m
[36mdfHudiMeta2[39m: [32mDataFrame[39m = [partitionToWriteStats: map<string,array<struct<fileId:string,path:string,prevCommit:string,numWrites:bigint,numDeletes:bigint,numUpdateWrites:bigint,totalWriteBytes:bigint,totalWriteErrors:bigint,partitionPath:string,totalLogRecords:bigint,totalLogFiles:bigint,totalUpdatedRecordsCompacted:bigint,numInserts:bigint,totalLogBlocks:bigint,totalCorruptLogBlock:bigint,totalRollbackBlocks:bigint,fileSizeInBytes:bigint,logVersion:int,logOffset:bigint,baseFile:string,logFiles:array<string>,cdcStats:map<string,bigint>>>>, extraMetadata: map<string,string> ... 2 more fields]

* The initial write is BULK_INSERT.
* The update is UPSERT_PREPPED.

### MoR before and after read.
* We basically already recorded the state before the read above.
* Now lets see if we read the table and see the resulting data.

In [18]:
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql(s"SELECT _hoodie_commit_time, fare, rider FROM trips_table;").show()

+-------------------+-----+-------+
|_hoodie_commit_time| fare|  rider|
+-------------------+-----+-------+
|  20250305202803543| 19.1|rider-A|
|  20250305202803543| 27.7|rider-C|
|  20250305202809755|339.0|rider-D|
|  20250305202803543|34.15|rider-F|
|  20250305202803543|17.85|rider-J|
+-------------------+-----+-------+



[36mtripsDF[39m: [32mDataFrame[39m = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

* As expected, the fare increased for rider-D.
* But what about the log file and parquet file, are they merged?

In [19]:
val command = """find /tmp/hudimor/trips_table -name *.log.* -not -name "*.crc"  -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:04.361697952 +0900 80 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0
2025-03-05 20:28:07.163923707 +0900 14235 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202803543.log.1_0-11-10
2025-03-05 20:28:10.243656160 +0900 1079 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22
2025-03-05 20:28:10.414911681 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202809755.log.1_0-26-27



* The log file is there.
* But what about the parquet file?
    * Are they modified?

In [20]:
val command = """find /tmp/hudimor/trips_table -path "*san_francisco*" -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val parquetPaths = result.split("\n")
for(parquetPath <- parquetPaths){
    val path = parquetPath.split(" ")(4)
    println(parquetPath)
    spark.read.format("parquet").load(path).show(truncate=false)
    println()
}

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key   |_hoodie_partition_path|_hoodie_file_name                                                     |ts           |uuid                                |rider  |driver  |fare|city         |
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|20250305202803543  |20250305202803543_0_1|20250305202803543_0_0|city=san_francisco    |57c4470e-e641

* Nope, its still the same.

### Merging Data

In [21]:
val adjustedFareDF = spark.read.format("hudi").
  load(basePath).limit(2).
  withColumn("fare", col("fare") * 10)

adjustedFareDF.write.format("hudi").
  option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
  mode(Append).
  save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
// spark.read.format("hudi").load(basePath).show()

[36madjustedFareDF[39m: [32mDataFrame[39m = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

* After this operation, will we see some new log files in the partition dirs.

In [22]:
val command = """find /tmp/hudimor/trips_table -name *.log.* -not -name "*.crc"  -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:04.361697952 +0900 80 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0
2025-03-05 20:28:07.163923707 +0900 14235 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202803543.log.1_0-11-10
2025-03-05 20:28:10.243656160 +0900 1079 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22
2025-03-05 20:28:10.414911681 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202809755.log.1_0-26-27
2025-03-05 20:28:14.215505366 +0900 1294 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202813772.log.1_0-47-47
2025-03-05 20:28:14.387010038 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202813772.log.1_0-54-52



In [23]:
val command = """find /tmp/hudimor/trips_table -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val parquetPaths = result.split("\n")
for(parquetPath <- parquetPaths){
    val path = parquetPath.split(" ")(4)
    println(parquetPath)
    spark.read.format("parquet").load(path).show(truncate=false)
    println()
}

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key   |_hoodie_partition_path|_hoodie_file_name                                                     |ts           |uuid                                |rider  |driver  |fare|city         |
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|20250305202803543  |20250305202803543_0_1|20250305202803543_0_0|city=san_francisco    |57c4470e-e641

* As expected, based on the result from the update, the parquet files are not touched even after read.
* But, what about the commit files?

In [24]:
val command = """find /tmp/hudimor/trips_table -name "*.deltacommit" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val hudiMetas = result.split("\n")
for(hudiMeta <- hudiMetas){
    val path = hudiMeta.split(" ")(4)
    println(path)
    spark.read.format("avro").load(path).select("operationType", "partitionToWriteStats", "extraMetadata", "version").show(truncate=false)
    println()
}

/tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250305202805121.deltacommit
+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

* We got the similar commit files like in the CoW table.

#### Now lets try to read the data and see if there is any file merge on the parquet files.

In [25]:
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql(s"SELECT * FROM trips_table;").show()

+-------------------+--------------------+--------------------+----------------------+--------------------+-------------+--------------------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|           ts|                uuid|  rider|  driver| fare|         city|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------+--------------------+-------+--------+-----+-------------+
|  20250305202813772|20250305202813772...|20250305202803543...|    city=san_francisco|57c4470e-e641-428...|1695159649087|334e26e9-8355-45c...|rider-A|driver-K|191.0|san_francisco|
|  20250305202813772|20250305202813772...|20250305202803543...|    city=san_francisco|57c4470e-e641-428...|1695091554788|e96c4396-3fad-413...|rider-C|driver-M|277.0|san_francisco|
|  20250305202809755|20250305202809755...|20250305202803543...|    city=san_francisco|57c4470e-e641-

[36mtripsDF[39m: [32mDataFrame[39m = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

In [26]:
val command = """find /tmp/hudimor/trips_table -name *.log.* -not -name "*.crc"  -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:04.361697952 +0900 80 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0
2025-03-05 20:28:07.163923707 +0900 14235 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202803543.log.1_0-11-10
2025-03-05 20:28:10.243656160 +0900 1079 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22
2025-03-05 20:28:10.414911681 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202809755.log.1_0-26-27
2025-03-05 20:28:14.215505366 +0900 1294 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202813772.log.1_0-47-47
2025-03-05 20:28:14.387010038 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202813772.log.1_0-54-52



In [27]:
val command = """find /tmp/hudimor/trips_table -path "*san_francisco*" -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val parquetPaths = result.split("\n")
for(parquetPath <- parquetPaths){
    val path = parquetPath.split(" ")(4)
    println(parquetPath)
    spark.read.format("parquet").load(path).show(truncate=false)
    println()
}

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key   |_hoodie_partition_path|_hoodie_file_name                                                     |ts           |uuid                                |rider  |driver  |fare|city         |
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|20250305202803543  |20250305202803543_0_1|20250305202803543_0_0|city=san_francisco    |57c4470e-e641

* Now we understand what is the definition of MoR:
    * Read from parquet.
    * Read from the .log* file.
    * Merge the results.
    * The results won't be written back to the original table just yet.
    * Perhaps will be compacted later automatically.

### Delete data

In [28]:
// Lets  delete rider: rider-D
val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")

deletesDF.write.format("hudi").
  option("hoodie.datasource.write.operation", "delete").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  mode(Append).
  save(basePath)

[36mdeletesDF[39m: [32mDataset[39m[[32mRow[39m] = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

In [29]:
val command = """find /tmp/hudimor/trips_table -name *.log.* -not -name "*.crc"  -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:04.361697952 +0900 80 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0
2025-03-05 20:28:07.163923707 +0900 14235 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202803543.log.1_0-11-10
2025-03-05 20:28:10.243656160 +0900 1079 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22
2025-03-05 20:28:10.414911681 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202809755.log.1_0-26-27
2025-03-05 20:28:14.215505366 +0900 1294 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202813772.log.1_0-47-47
2025-03-05 20:28:14.387010038 +0900 14180 /tmp/hudimor/trips_table/.hoodie/metadata/files/.files-0000-0_20250305202813772.log.1_0-54-52
2025-03-05 20:28:16.929976223 +0900 907 /tmp/hudimor/trips_table/city=sao_paulo/.57c4470e-e641-4289-a64e-f46601c1dfaa-1_20250305202816548.log.1_0-7

* After delete, we got 1 extra log file like the other two operations we did before.
* I think this is the tombstone for the deleted row(s).

In [30]:
val command = """find /tmp/hudimor/trips_table -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val parquetPaths = result.split("\n")
for(parquetPath <- parquetPaths){
    val path = parquetPath.split(" ")(4)
    println(parquetPath)
    spark.read.format("parquet").load(path).show(truncate=false)
    println()
}

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key   |_hoodie_partition_path|_hoodie_file_name                                                     |ts           |uuid                                |rider  |driver  |fare|city         |
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|20250305202803543  |20250305202803543_0_1|20250305202803543_0_0|city=san_francisco    |57c4470e-e641

* We can still see the original file as before.
* Lets see the deltacommit files.

In [31]:
val command = """find /tmp/hudimor/trips_table -name "*.deltacommit" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val hudiMetas = result.split("\n")
for(hudiMeta <- hudiMetas){
    val path = hudiMeta.split(" ")(4)
    println(path)
    spark.read.format("avro").load(path).select("operationType", "partitionToWriteStats", "extraMetadata", "version").show(truncate=false)
    println()
}

/tmp/hudimor/trips_table/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250305202805121.deltacommit
+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

* Now we can see DELETE_PREPPED like in CoW.

In [32]:
spark.read.format("hudi").load(basePath).show()

+-------------------+--------------------+--------------------+----------------------+--------------------+-------------+--------------------+-------+--------+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|           ts|                uuid|  rider|  driver| fare|         city|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------+--------------------+-------+--------+-----+-------------+
|  20250305202813772|20250305202813772...|20250305202803543...|    city=san_francisco|57c4470e-e641-428...|1695159649087|334e26e9-8355-45c...|rider-A|driver-K|191.0|san_francisco|
|  20250305202813772|20250305202813772...|20250305202803543...|    city=san_francisco|57c4470e-e641-428...|1695091554788|e96c4396-3fad-413...|rider-C|driver-M|277.0|san_francisco|
|  20250305202809755|20250305202809755...|20250305202803543...|    city=san_francisco|57c4470e-e641-

* For MoR, the parquet file is not touched even if we already did 3 operations.
* Instead, the changes are recorded in the .log* files.

In [33]:
val command = """find /tmp/hudimor/trips_table -path "*sao_paulo*" -name " *.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:06.932652197 +0900 435948 /tmp/hudimor/trips_table/city=sao_paulo/57c4470e-e641-4289-a64e-f46601c1dfaa-1_0-6-0_20250305202803543.parquet



In [38]:
val command = """find /tmp/hudimor/trips_table -not -path "*.hoodie/*" -name " *.log*" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
println(result)

2025-03-05 20:28:10.243656160 +0900 1079 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22
2025-03-05 20:28:10.243656160 +0900 20 /tmp/hudimor/trips_table/city=san_francisco/..57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202809755.log.1_0-20-22.crc
2025-03-05 20:28:14.215505366 +0900 1294 /tmp/hudimor/trips_table/city=san_francisco/.57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202813772.log.1_0-47-47
2025-03-05 20:28:14.215505366 +0900 20 /tmp/hudimor/trips_table/city=san_francisco/..57c4470e-e641-4289-a64e-f46601c1dfaa-0_20250305202813772.log.1_0-47-47.crc
2025-03-05 20:28:16.929976223 +0900 16 /tmp/hudimor/trips_table/city=sao_paulo/..57c4470e-e641-4289-a64e-f46601c1dfaa-1_20250305202816548.log.1_0-77-76.crc
2025-03-05 20:28:16.929976223 +0900 907 /tmp/hudimor/trips_table/city=sao_paulo/.57c4470e-e641-4289-a64e-f46601c1dfaa-1_20250305202816548.log.1_0-77-76



In [39]:
val command = """find /tmp/hudimor/trips_table -name "*.parquet" -exec stat -c \"%y %s %n\" {} + | sort"""
lazy val result = Seq("bash", "-c", s"eval $command").!!
lazy val parquetPaths = result.split("\n")
for(parquetPath <- parquetPaths){
    val path = parquetPath.split(" ")(4)
    println(parquetPath)
    spark.read.format("parquet").load(path).show(truncate=false)
    println()
}

2025-03-05 20:28:06.922601843 +0900 436116 /tmp/hudimor/trips_table/city=san_francisco/57c4470e-e641-4289-a64e-f46601c1dfaa-0_0-6-0_20250305202803543.parquet
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key   |_hoodie_partition_path|_hoodie_file_name                                                     |ts           |uuid                                |rider  |driver  |fare|city         |
+-------------------+---------------------+---------------------+----------------------+----------------------------------------------------------------------+-------------+------------------------------------+-------+--------+----+-------------+
|20250305202803543  |20250305202803543_0_1|20250305202803543_0_0|city=san_francisco    |57c4470e-e641

## Conclusion
* MoR writes to tiny log files instead of recreating the .parquet file.
* During read, there is an extra cost of doing data merge (read amplification).
* During write, the write amplification is smaller than CoW tables.
    * No need to rewrite the whole data.

In [40]:
spark.stop()