Acceleration : Iceberg table compaction#3519
Acceleration : Iceberg table compaction#3519Shekharrajak wants to merge 14 commits intoapache:mainfrom
Conversation
| // under the License. | ||
|
|
||
| //! Iceberg Parquet writer operator for writing RecordBatches to Parquet files | ||
| //! with Iceberg-compatible metadata (DataFile structures). |
There was a problem hiding this comment.
DataFusion execution operator that writes Arrow RecordBatches to Parquet files with Iceberg-compatible metadata.
It enables native Rust to produce files that Iceberg's Java API can directly commit.
Metadata is serialized as JSON and passed back to JVM via JNI for commit.
|
|
||
| //! JNI bridge for Iceberg compaction operations. | ||
| //! | ||
| //! This module provides JNI functions for native Iceberg compaction (scan + write). |
There was a problem hiding this comment.
JNI bridge that exposes native Rust compaction to Scala/JVM.
executeIcebergCompaction() | JNI entry point - reads Parquet files via DataFusion, writes compacted output
|
|
||
| /// Configuration for Iceberg table metadata passed from JVM | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct IcebergTableConfig { |
There was a problem hiding this comment.
Table metadata from JVM (identifier, warehouse, snapshot ID, file IO props)
|
|
||
| logDebug(s"Executing native compaction with config: $configJson") | ||
|
|
||
| val resultJson = native.executeIcebergCompaction(configJson) |
There was a problem hiding this comment.
JNI entry point - reads Parquet files via DataFusion, writes compacted output
|
|
||
| def isAvailable: Boolean = { | ||
| try { | ||
| val version = new Native().getIcebergCompactionVersion() |
There was a problem hiding this comment.
Returns native library version for compatibility checks
| "Iceberg reflection failure: Failed to get filter expressions from SparkScan: " + | ||
| s"${e.getMessage}") | ||
| None | ||
| findMethodInHierarchy(scan.getClass, "filterExpressions").flatMap { filterExpressionsMethod => |
There was a problem hiding this comment.
previously we were assuming a fixed Iceberg class hierarchy, this findMethodInHierarchy walks up the class tree - better approach.
For compaction to work, we need to extract FileScanTask objects from the scan. Different Iceberg scan types expose tasks differently:
SparkBatchQueryScan -> tasks() method
SparkStagedScan -> taskGroups() method (returns groups, need to extract tasks from each)
08ea1bf to
961ac46
Compare
|
Interesting PR, thanks @Shekharrajak! To help start the review process, could you:
|
ba454b4 to
326e6cc
Compare
The rewrite commit API reference : apache/iceberg-rust#2106 - so in this PR commit is happening in JVM, in future PRs we can have it native as well.
The compaction is all about reading small files -> writing back larger files, so it is I/O intensive work. Making read and write in rust is improving the performance: The entire I/O pipeline (Parquet read -> Arrow RecordBatch -> Parquet write) happens in Rust (reading and writing Parquet through the same Arrow memory layout), eliminating the entire Spark orchestration layer, not just replacing individual operators within it. |
| // Measure Spark compaction (single run - compaction is destructive) | ||
| val sparkStart = System.nanoTime() | ||
| val sparkTable = Spark3Util.loadIcebergTable(spark, icebergTableName) | ||
| SparkActions.get(spark).rewriteDataFiles(sparkTable).binPack().execute() |
There was a problem hiding this comment.
default Spark action API
| file_io_properties = fileIOProperties) | ||
| } | ||
|
|
||
| /** Plan file groups using bin-pack strategy. */ |
There was a problem hiding this comment.
Right now, we are using bin-pack runner strategy for small files into groups.
We can extend the other runner and planners.
| * Integration tests for CALL rewrite_data_files() procedure intercepted by CometCompactionRule. | ||
| * Verifies that the SQL procedure path routes through native compaction when enabled. | ||
| */ | ||
| class CometIcebergCompactionProcedureSuite extends CometTestBase { |
There was a problem hiding this comment.
Validation for calling rewrite API using procedure for iceberg table compaction.

Which issue does this PR close?
Ref #3371
PR Description
Rationale for this change
Iceberg table compaction using Spark's default
rewriteDataFiles()action is slow due to Spark shuffle and task scheduling overhead. This PR adds native Rust-based compaction using DataFusion for direct Parquet read/write, achieving 1.5-1.8x speedup over Spark's default compaction.What changes are included in this PR?
CometNativeCompactionclass that executes native compaction (Executes native scan + write via JNI) and commits via Iceberg Java APIspark.comet.iceberg.compaction.enabledconfig optionHow are these changes tested?
CometIcebergCompactionSuitecovering:CometIcebergTPCCompactionBenchmark) measuring performance on lineitem, orders, customer tables