[GLUTEN][VL] Optimize Delta Lake DV materialization and plan rule performance#12390
[GLUTEN][VL] Optimize Delta Lake DV materialization and plan rule performance#12390iemejia wants to merge 6 commits into
Conversation
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
Optimizes Gluten’s Delta Lake integration focusing on planning-time hotspots: deletion-vector (DV) materialization and post-transform rule application, aiming to reduce redundant filesystem calls and repeated plan traversals/allocations.
Changes:
- Speed up DV materialization by caching table-path resolution + Hadoop Configuration per partition and reading on-disk DV payload bytes directly.
- Reduce post-transform overhead by early-exiting Delta-specific rules on non-Delta plans, precomputing input-file attribute names, and batching column-mapping attribute translation.
- Add/extend test coverage and introduce a planning benchmark for repeatable measurement.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java | Updates constructor documentation around metadata handling. |
| gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java | Uses zero-copy protobuf ByteString wrapping for DV bytes. |
| gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala | Adds tests targeting Delta post-transform rule behavior and scanFilters caching. |
| gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala | Fuses Delta post-transform rules with an early-exit guard; reduces per-rule allocations. |
| gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala | Caches scanFilters via lazy val to avoid repeated recomputation. |
| gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala | Implements DV materialization caching + raw DV bytes read; adds (delta40) reflective parse method caching. |
| gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala | Implements DV materialization caching + raw DV bytes read. |
| backends-velox/src-delta33/test/scala/org/apache/spark/sql/execution/benchmark/DeltaPlanningBenchmark.scala | Adds a benchmark suite to measure DV materialization and rule-application overhead. |
| .gitignore | Ignores Eclipse project files. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * <p>Note: uses direct list reference transfer (not deep copy) for efficiency, since the original | ||
| * node is typically discarded immediately after this constructor returns. |
There was a problem hiding this comment.
Fixed. Updated the comment to accurately say "shallow list copy (element references are shared, not deep-copied)". A deep copy is unnecessary here since callers supply freshly built maps and the original node is discarded immediately after construction.
| def normalize(partitionColumnCount: Int, partitionFiles: Seq[PartitionedFile]) | ||
| : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = { | ||
| val scanInfos = extractAll(activeSparkSession, partitionColumnCount, partitionFiles) | ||
| val spark = activeSparkSession | ||
| val hadoopConf = spark.sessionState.newHadoopConf() | ||
| val cachedTablePath = resolveTablePath(hadoopConf, partitionColumnCount, partitionFiles.head) |
There was a problem hiding this comment.
Fixed. Added an early if (partitionFiles.isEmpty) return None guard before accessing .head.
| // Create a single Hadoop Configuration for the entire partition. | ||
| val hadoopConf = spark.sessionState.newHadoopConf() | ||
| // Resolve table path once using the first file -- all files in a Delta table share the same | ||
| // root, so this avoids N-1 redundant filesystem existence checks. | ||
| val cachedTablePath = resolveTablePath(hadoopConf, partitionColumnCount, partitionFiles.head) |
There was a problem hiding this comment.
Fixed. Same empty-input guard added.
| // Rules should return the plan unchanged (early-exit guard) | ||
| val transformed = DeltaPostTransformRules.rules.foldLeft(plan) { | ||
| (p, rule) => rule(p) | ||
| } | ||
| // No DeltaScanTransformer in the plan, so rules should be identity | ||
| assert( | ||
| !transformed.exists(_.isInstanceOf[DeltaScanTransformer]), | ||
| "Non-Delta plan should not contain DeltaScanTransformer") | ||
| assert( | ||
| !plan.exists(_.isInstanceOf[DeltaScanTransformer]), | ||
| "Original plan should not contain DeltaScanTransformer") |
There was a problem hiding this comment.
Fixed. The test now applies only the Delta-specific rules (skipping RemoveTransitions which is generic) and asserts referential identity (transformed eq plan), which directly validates the early-exit guard returns the same object.
| } | ||
| } | ||
|
|
||
| test("post-transform rules produce DeltaScanTransformer for Delta tables") { |
There was a problem hiding this comment.
Fixed. Renamed the test to "Delta scan is offloaded to DeltaScanTransformer" to accurately reflect what it verifies -- that the Gluten planning pipeline produces a DeltaScanTransformer in the executed plan.
| if (scans.nonEmpty) { | ||
| val scan = scans.head | ||
| // scanFilters is now a lazy val; repeated calls should return the same instance | ||
| val first = scan.scanFilters | ||
| val second = scan.scanFilters | ||
| val third = scan.scanFilters | ||
| assert(first eq second, "scanFilters should return the same cached instance") | ||
| assert(second eq third, "scanFilters should return the same cached instance") | ||
| } |
There was a problem hiding this comment.
Fixed. Replaced if (scans.nonEmpty) with assert(scans.nonEmpty, ...) so the test fails loudly if DeltaScanTransformer offloading regresses.
| * bin/spark-submit --class org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark \ | ||
| * --jars <spark-core-test-jar> <gluten-backends-velox-jar> | ||
| * }}} |
There was a problem hiding this comment.
Fixed. Changed to comma-separated format: --jars <spark-core-test-jar>,<gluten-backends-velox-jar>.
| /** | ||
| * Benchmarks the delta40 `parseDescriptor` optimization: cached reflective method lookup vs | ||
| * uncached (resolving `getMethod` on every call). Simulates the pattern used in | ||
| * `DeltaDeletionVectorScanInfo` for Delta 4.0 API compatibility. | ||
| * | ||
| * The cached version resolves the Method object once (lazy val); the uncached version calls | ||
| * `getMethod` + `invoke` per descriptor, which is what the old code did per-file. | ||
| */ | ||
| /** | ||
| * Creates a Delta table with deletion vectors and provides the partitioned files for direct DV | ||
| * materialization benchmarking. | ||
| */ |
There was a problem hiding this comment.
Fixed. Removed the orphaned parseDescriptor benchmark Scaladoc block.
| private def parseDescriptor(encodedDescriptor: String): DeletionVectorDescriptor = { | ||
| try { | ||
| descriptorParseMethod | ||
| .invoke(DeletionVectorDescriptor, encodedDescriptor) | ||
| .asInstanceOf[DeletionVectorDescriptor] |
There was a problem hiding this comment.
Fixed. Changed descriptorParseMethod (single cached method) to descriptorParseMethods (all available methods cached as a Seq). The parseDescriptor method now tries each cached method in order, preserving fallback semantics while still avoiding per-call getMethod lookups.
bc89390 to
9a3821f
Compare
|
Run Gluten Clickhouse CI on x86 |
|
The two CI failures are unrelated to this PR:
Re-triggering the failed jobs. |
Cache the resolved table path and Hadoop Configuration across all files in a partition during normalize(). Previously, each file triggered independent filesystem exists() checks (to find the _delta_log directory) and allocated a fresh Hadoop Configuration clone. For a partition with N files on object storage, this produced N+ redundant HTTP HEAD requests on the driver critical path. For on-disk DVs, read the raw bytes directly from the DV file using Delta's DeletionVectorStore.readRangeFromStream (which includes checksum verification) instead of going through StoredBitmap.load() + serializeAsByteArray(). The on-disk format is already Portable Roaring Bitmap Array -- the same format the native Velox side expects -- so this eliminates the expensive deserialize-into-Java-Roaring- objects + re-serialize round-trip per file. Changes: - Resolve table path once using the first file, reuse for all others - Create one Hadoop Configuration per normalize() call - Read raw DV bytes directly for on-disk DVs (skip deser+reser) - Fall back to load+serialize for inline DVs (small, in-metadata) - (delta40) Cache the reflective method lookup for parseDescriptor Assisted-by: GitHub Copilot:claude-opus-4.6
Reduce plan traversal overhead from 5 full passes to effectively 1 for Delta queries and 0 for non-Delta queries: - Add early-exit guard: check plan.exists(DeltaScanTransformer) once and skip all Delta-specific rules if no Delta scan is present. This eliminates all overhead for non-Delta queries. - Replace quadratic containsNativeDeltaScan (full subtree .exists() per Filter/Project node) with a shallow 2-level child check that is O(1), safe because transformUp processes bottom-up. - Pre-compute inputFileRelatedNames as a static Set[String] instead of allocating 3 Expression objects + 2 Seqs per call per column. - Batch createPhysicalAttributes: single call with full attribute list instead of per-column invocation that walks the reference schema N times for a table with N columns. - Fuse nativeDeletionVectorRule, pushDownInputFileExprRule, and columnMappingRule into a single registered rule to reduce the number of injected post-transforms from 4 to 2. Assisted-by: GitHub Copilot:claude-opus-4.6
- DeltaScanTransformer.scanFilters: change from def to lazy val to avoid rebuilding the physicalByExprId map and re-traversing filter expression trees on every call (invoked 3+ times per scan node). - DeltaLocalFilesNode: use UnsafeByteOperations.unsafeWrap() instead of ByteString.copyFrom() for the DV byte array. This is a zero-copy wrap since the byte[] lifetime is guaranteed by DeltaFileReadOptions, eliminating an O(DV_size) memcpy per file on the driver. - LocalFilesNode: improve documentation on the copy constructor noting that the original is discarded after construction. Assisted-by: GitHub Copilot:claude-opus-4.6
Adds a Spark Benchmark that measures the two hot paths optimized in
this patch series:
1. DV Materialization (DeltaDeletionVectorScanInfo.normalize):
Creates a Delta table with N DV-bearing files and times the
normalize() call that resolves table paths, loads DV bitmaps, and
builds split metadata. Directly measures the impact of caching
table path + Hadoop conf + DV store ("Eliminate per-file I/O" commit).
2. Post-transform rule application (DeltaPostTransformRules.rules):
Applies the Delta post-transform rules to a plan containing
DeltaScanTransformer nodes. Measures rule traversal overhead
including the early-exit guard, shallow containsNativeDeltaScan,
pre-computed names, and batched attribute mapping ("Optimize Delta post-transform rules" commit).
3. Non-Delta plan overhead (control):
Applies the same rules to a plain parquet plan to verify the
early-exit guard produces zero overhead for non-Delta queries.
Configurable via spark.gluten.benchmark.delta.numFiles (default 100)
and spark.gluten.benchmark.delta.rowsPerFile (default 10000).
Measured results (local filesystem, 100 DV-bearing files):
Benchmark Before After Speedup
------- ------ ----- -------
DV Materialization (100 files) 22 ms 7 ms 3.3x
Post-transform rules (Delta) 37 us 20 us 1.8x
Post-transform rules (parquet) 4908 ns 220 ns 22.3x
Call count reduction for 100 DV-bearing files:
Operation Before After Eliminated
--------- ------ ----- ----------
FileSystem.exists() (HEAD reqs) 100-300 1 99-299
newHadoopConf() (deep clone) 100-300 1 99-299
new HadoopFileSystemDVStore() 100 1 99
Plan tree traversals (non-Delta) 5 0 5
Plan tree traversals (Delta) 5 1 4
containsNativeDeltaScan subtree O(n^2) O(1) --
createPhysicalAttributes calls N cols 1 N-1
Projected DV materialization time by storage backend (100 files):
Storage exists() latency Before After Speedup
------- ---------------- ------ ----- -------
Local FS ~67 us/call 22 ms 7 ms 3.3x
ABFS 20-80 ms/call 2-24 sec 1.0-1.1 s 2-22x
GCS 30-100 ms/call 3-30 sec 1.0-1.1 s 3-27x
S3 50-150 ms/call 5-45 sec 1.1-1.2 s 5-38x
After = 1 exists() call + 100 DV loads (~10 ms each on object stores)
Before = 100-300 exists() calls + 100 DV loads
Remote object storage impact analysis:
The dominant cost in DV materialization is resolveTablePath(), which
calls FileSystem.exists() to locate the _delta_log directory. On local
FS this is ~67us per call; on object stores each exists() is an HTTP
HEAD request with the latencies shown above.
Before this patch, resolveTablePath() was called per-file, plus
isDeltaTablePath() could walk up 1-3 parent directories per file.
After: a single exists() call resolves the table path for all files.
The DV bitmap load (StoredBitmap.load) remains per-file but benefits
from connection pooling via the shared HadoopFileSystemDVStore (the
FS instance is reused across all files since the same Configuration
object hits Hadoop's FileSystem cache).
Assisted-by: GitHub Copilot:claude-opus-4.6
Assisted-by: GitHub Copilot:claude-opus-4.6
…k parse - Fix misleading 'direct list reference transfer' comment in LocalFilesNode to accurately describe the shallow list copy behavior. - Add empty partitionFiles guard in normalize() for both delta33 and delta40 to prevent NoSuchElementException on empty input. - Strengthen test assertions: use 'eq' identity check for early-exit guard, rename test to match actual behavior, replace silent 'if' with assert. - Fix --jars doc syntax to use comma-separated format in benchmark. - Remove orphaned parseDescriptor Scaladoc block. - Cache all available parse methods and try in order, preserving fallback semantics while avoiding per-call getMethod overhead. Assisted-by: GitHub Copilot:claude-opus-4.6
9a3821f to
327e7c4
Compare
|
Run Gluten Clickhouse CI on x86 |
|
cc @malinjawi |
What changes were proposed in this pull request?
Optimize the Delta Lake integration's planning-time performance, targeting two hot paths: DV (Deletion Vector) materialization on the driver and post-transform rule application.
DV Materialization (
DeltaDeletionVectorScanInfo.normalize)FileSystem.exists()calls (HTTP HEAD requests on object stores).DeletionVectorStore.readRangeFromStream(with checksum verification) instead of deserializing into Java Roaring objects and re-serializing. The on-disk format already matches what Velox expects.parseDescriptorin alazy val.Post-transform Rules (
DeltaPostTransformRules)DeltaScanTransformeris present. Eliminates 5 full plan traversals for non-Delta queries.containsNativeDeltaScan: O(1) direct child/grandchild check instead of O(n^2) subtree traversal.inputFileRelatedNames: staticSet[String]instead of allocating 3 Expression objects per call.createPhysicalAttributes: single call with full attribute list instead of per-column.Allocation Reduction
scanFiltersaslazy val: avoids rebuilding the physicalByExprId map and expression tree walk on every call (invoked 3+ times per scan node).UnsafeByteOperations.unsafeWrap: zero-copy ByteString for DV bytes instead ofByteString.copyFrom.Measured Results (local filesystem, 100 DV-bearing files)
Projected impact on object stores (100 DV files)
How was this patch tested?
VeloxDeltaSuite,DeltaDeletionVectorScanInfoSuite)post-transform rules are no-op on non-Delta plans(validates early-exit guard)post-transform rules produce DeltaScanTransformer for Delta tables(validates offloading)scanFilters returns consistent results on repeated access(validates lazy val caching)DeltaPlanningBenchmarkfor reproducible before/after measurementWas this patch authored or co-authored using generative AI tooling?
Generated-by: Claude claude-opus-4.6