Is your feature request related to a problem or challenge?
PR #4591 adds native support for Spark's in-memory cache via ArrowCachedBatchSerializer and CometInMemoryTableScanExec. The goal is to land the functional version first. This issue tracks performance optimizations identified during review that can follow in later PRs.
Benchmark from #4591 (CometInMemoryCacheBenchmark, 5M-row cached table, Apple M3 Ultra, JDK 17, Spark 3.5) showed the native path is about 1.5x faster on a full repeated scan but only about 1.1x on a selective filter, which suggests there is headroom, especially on the read path.
Describe the potential solution
Read path (runs on every cached scan, highest impact)
-
Hoist UnsafeProjection.create out of the per-batch loop. In ArrowCachedBatchSerializer.convertCachedBatchToInternalRow, UnsafeProjection.create(...) is currently constructed inside the flatMap over decoded batches, so it recompiles Janino codegen once per cached batch. It should be built once per partition (inside mapPartitions, outside flatMap).
-
Reduce the per-scan deep copy. CometInMemoryTableScanExec sets arrow_ffi_safe = false, so scan.rs deep-copies every column of every surviving batch on each scan. This is correct today because ArrowReaderIterator owns and closes each decoded batch, but it is the main structural cost on the read path and is the likely reason the selective-filter benchmark only reached ~1.1x. Removing it requires reworking the decode path to transfer ownership to native rather than closing the batch on the JVM side. This is a larger, riskier change and should be scoped carefully.
-
Optional uncompressed cache format. Utils.decodeBatches decompresses (LZ4 by default) on every scan. A knob to store the Arrow bytes uncompressed would trade memory for faster repeated reads.
Write path (runs once per cache(), but slow for large caches)
-
Specialize computeStats per column and stop boxing. readValue returns Any, so every primitive read boxes, and both compare(dt, ...) and tracksBounds(dt) re-dispatch on DataType per row even though the type is invariant per column. Dispatch on the type once per column, then run a specialized primitive loop that keeps min/max in primitive locals.
-
Copy-on-update for string stats. readValue does col.getUTF8String(rowId).copy() for every non-null string, but only the min and max are kept. Compare against the current bound using the borrowed UTF8String and copy only when it becomes the new lower or upper bound.
Suggested priority
- Hoist
UnsafeProjection.create (trivial, clear win).
- Specialize
computeStats and add copy-on-update for strings (biggest build-time win).
- Investigate reducing the per-scan deep copy, with a scan-isolating benchmark variant to confirm the pruning benefit.
Additional context
Follow-on to #4591. See the review discussion there for full detail.
Is your feature request related to a problem or challenge?
PR #4591 adds native support for Spark's in-memory cache via
ArrowCachedBatchSerializerandCometInMemoryTableScanExec. The goal is to land the functional version first. This issue tracks performance optimizations identified during review that can follow in later PRs.Benchmark from #4591 (
CometInMemoryCacheBenchmark, 5M-row cached table, Apple M3 Ultra, JDK 17, Spark 3.5) showed the native path is about 1.5x faster on a full repeated scan but only about 1.1x on a selective filter, which suggests there is headroom, especially on the read path.Describe the potential solution
Read path (runs on every cached scan, highest impact)
Hoist
UnsafeProjection.createout of the per-batch loop. InArrowCachedBatchSerializer.convertCachedBatchToInternalRow,UnsafeProjection.create(...)is currently constructed inside theflatMapover decoded batches, so it recompiles Janino codegen once per cached batch. It should be built once per partition (insidemapPartitions, outsideflatMap).Reduce the per-scan deep copy.
CometInMemoryTableScanExecsetsarrow_ffi_safe = false, soscan.rsdeep-copies every column of every surviving batch on each scan. This is correct today becauseArrowReaderIteratorowns and closes each decoded batch, but it is the main structural cost on the read path and is the likely reason the selective-filter benchmark only reached ~1.1x. Removing it requires reworking the decode path to transfer ownership to native rather than closing the batch on the JVM side. This is a larger, riskier change and should be scoped carefully.Optional uncompressed cache format.
Utils.decodeBatchesdecompresses (LZ4 by default) on every scan. A knob to store the Arrow bytes uncompressed would trade memory for faster repeated reads.Write path (runs once per
cache(), but slow for large caches)Specialize
computeStatsper column and stop boxing.readValuereturnsAny, so every primitive read boxes, and bothcompare(dt, ...)andtracksBounds(dt)re-dispatch onDataTypeper row even though the type is invariant per column. Dispatch on the type once per column, then run a specialized primitive loop that keeps min/max in primitive locals.Copy-on-update for string stats.
readValuedoescol.getUTF8String(rowId).copy()for every non-null string, but only the min and max are kept. Compare against the current bound using the borrowedUTF8Stringand copy only when it becomes the new lower or upper bound.Suggested priority
UnsafeProjection.create(trivial, clear win).computeStatsand add copy-on-update for strings (biggest build-time win).Additional context
Follow-on to #4591. See the review discussion there for full detail.