Skip to content

Parquet vectored I/O hardcoded ON + on-heap allocator causes executor OOM on S3FileIO reads of high-compression tables (regression in 1.11.0) #16600

@andyguwc

Description

@andyguwc

Summary

Iceberg 1.11.0 made two changes in iceberg-parquet that, in combination, force every S3FileIO-backed Parquet read through parquet-hadoop's vectored I/O code path using a hardcoded on-heap HeapByteBufferAllocator. For tables with high compression ratios (common for JSON / wide-text columns under zstd), this leads to executor java.lang.OutOfMemoryError: Java heap space during BatchScan reads — especially during MERGE on the target side — with no user-facing configuration to disable the behavior from outside Iceberg source.

This regression did not exist in 1.10.x: the prerequisite adapter class was not present, and the read-options builder did not hardcode vectored I/O on.

Environment

  • Apache Iceberg: 1.11.0 (iceberg-spark-runtime-3.5_2.12-1.11.0.jar)
  • Spark 3.5 (AWS Glue 5.1 runtime), Java 17 (Corretto 17.0.19)
  • FileIO: S3FileIO, Glue Data Catalog
  • Affected workers: G.1X (4 vCPU / 16 GB), R.2X (8 vCPU / 64 GB), various counts
  • Affected table characteristics (observed across ≥5 production CDC tables):
    • write.parquet.compression-codec=zstd
    • write.target-file-size-bytes=536870912 (512 MB)
    • default write.parquet.row-group-size-bytes (128 MB)
    • observed zstd compression ratios: 7.4×, 7.6×, 13.2×, 14.3×

Workload shape

The typical CDC MERGE that triggers this (target is bucketed by _id):

MERGE INTO catalog.db.target t
USING staging s
ON t._id = s._id
WHEN MATCHED AND s._op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

The OOM fires on the target-side BatchScan (reading existing Parquet files) regardless of join strategy. Any sufficiently large BatchScan on this data shape would hit it; MERGE is the typical trigger because COW MERGE reads the entire target.

Symptom

java.lang.OutOfMemoryError: Java heap space during target-side BatchScan of a MERGE INTO, cascading executor losses (Remote RPC client disassociated), job aborts after 4 task retries on the same stage.

Representative stack trace:

java.lang.OutOfMemoryError: Java heap space
  at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
  at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
  at org.apache.iceberg.shaded.org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:34)
  at org.apache.iceberg.parquet.ParquetIO$ParquetRangeReadableInputStreamAdapter$$Lambda$.../apply(Unknown Source)
  at org.apache.iceberg.io.RangeReadable.readVectored(RangeReadable.java:108)
  at org.apache.iceberg.parquet.ParquetIO$ParquetRangeReadableInputStreamAdapter.readVectored(ParquetIO.java:180)
  at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readVectored(ParquetFileReader.java:1357)
  at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1274)
  at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1185)
  at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1135)
  at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:161)
  ...
  at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:139)

Root cause (line-cited against apache-iceberg-1.11.0)

Two changes landed together in 1.11.0:

1. ParquetIO introduced a new adapter that exposes parquet-hadoop's multi-range readVectored to any SeekableInputStream that also implements RangeReadable (which includes S3InputStream):

parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java#L141-L185

static class ParquetRangeReadableInputStreamAdapter<
        T extends org.apache.iceberg.io.SeekableInputStream & RangeReadable>
    extends DelegatingSeekableInputStream implements RangeReadable {
  ...
  @Override
  public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allocate)
      throws IOException {
    List<FileRange> delegateRange = convertRanges(ranges);
    delegate.readVectored(delegateRange, allocate::allocate);
  }
}

2. Parquet.ReadBuilder unconditionally enables vectored I/O when building ParquetReadOptions for the reader path:

parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L1486

optionsBuilder.withUseHadoopVectoredIo(true);
ParquetReadOptions options = optionsBuilder.build();

There is no config lookup, no if branch, no honoring of parquet.hadoop.vectored.io.enabled. Even when a user explicitly sets spark.hadoop.parquet.hadoop.vectored.io.enabled=false (which flows into the Hadoop Configuration), line 1486 overrides it back to true. Empirically confirmed by identical post-override stack traces.

In combination, every S3FileIO-backed read going through Iceberg's batch/row reader path now uses parquet-hadoop's vectored read codepath. The Parquet vectored read allocates one ByteBuffer per column chunk via the allocator returned by ParquetReadOptions.getAllocator(). That allocator defaults to HeapByteBufferAllocator.getInstance() because Iceberg never calls .withAllocator(...) on the builder.

For a 128 MB compressed row group on a table with 14× compression ratio, this single allocation pre-stages ~128 MB on-heap to be decompressed to ~1.5 GB working set per row group. With 4 concurrent tasks per executor on a G.1X-class worker (~10 GB heap), cumulative on-heap pressure overruns the JVM.

Why this is a regression

apache-iceberg-1.10.2 for comparison:

  • ParquetIO.java does NOT contain ParquetRangeReadableInputStreamAdapter. Only ParquetInputStreamAdapter exists, which does not implement RangeReadable. parquet-hadoop's f instanceof RangeReadable check fails and readAllPartsVectoredOrNormal takes the "Normal" branch.
  • Parquet.java does NOT call optionsBuilder.withUseHadoopVectoredIo(true).

Net effect on 1.10.2 and earlier: vectored I/O was effectively inert for S3FileIO-backed reads, the on-heap allocator was never invoked for bulk column-chunk reads, and the OOM described above does not occur.

Why this is impactful

  • No outside-source mitigation exists. Setting spark.hadoop.parquet.hadoop.vectored.io.enabled=false is silently overridden by line 1486. There is no Spark / Iceberg configuration that disables this code path.
  • No allocator configuration is exposed. ParquetReadOptions.Builder supports .withAllocator(...), but Iceberg does not call it, so users cannot supply DirectByteBufferAllocator to move these buffers off-heap.
  • The user-visible message is misleading. Java heap space + executor losses suggests "grow your workers," but doing so just works around a misallocation rather than fixing it. The buffers in question are bulk I/O staging — they belong off-heap (alongside Spark shuffle / unified memory), not on-heap.
  • Production workarounds we've had to apply per affected table:
    • spark.executor.cores=2 (reduce concurrent on-heap allocations)
    • Worker class bump (G.1X → R.1X / R.2X)
    • Reduce write.parquet.row-group-size-bytes from 128 MB to 64 MB or 32 MB, then rewrite_data_files with rewrite-all: true to migrate existing files

Each carries cost (compute), runtime tax (cores=2 halves slot count), or a one-time data-rewrite cost (row-group reduction).

Proposed fixes

Two small, complementary changes resolve the regression:

Fix A — honor the existing Parquet/Hadoop conf for vectored I/O. Change Parquet.java#L1486 from:

optionsBuilder.withUseHadoopVectoredIo(true);

to either honor the standard Parquet property explicitly (default still true):

optionsBuilder.withUseHadoopVectoredIo(
    PropertyUtil.propertyAsBoolean(properties,
        ParquetReader.HADOOP_VECTORED_IO_ENABLED,  // or a new constant
        true));

…or simply drop the line and let ParquetReadOptions resolve from the underlying ParquetConfiguration it was built with (the HadoopReadOptions.builder(conf) branch at line 1469 already passes the Hadoop conf through).

This gives operators an escape hatch matching standard Parquet semantics.

Fix B — make the allocator configurable, default to off-heap for the vectored read path. Expose a Parquet-allocator selection via an Iceberg read property (e.g. read.parquet.bytes-allocator=heap|direct) and route it into the builder:

ByteBufferAllocator allocator = selectAllocator(properties);
optionsBuilder.withAllocator(allocator);

Fix A alone is sufficient to unblock production users immediately. Fix B is the correct architectural answer — bulk Parquet column-chunk staging buffers belong off-heap, not on the JVM execution heap, regardless of whether vectored I/O is opted into.

Reproduction

Any Iceberg table satisfying:

  • Backed by S3FileIO
  • write.parquet.compression-codec=zstd (or another high-ratio codec)
  • Wide rows / JSON-ish columns (so the uncompressed working set is large)
  • ≥128 MB row groups (the default)

Run a MERGE INTO or BatchScan workload on Iceberg 1.11.0 with default executor configuration on a worker with ≤16 GB heap. Heap OOM appears in HeapByteBufferAllocator.allocate inside ParquetRangeReadableInputStreamAdapter.readVectored.

Setting spark.hadoop.parquet.hadoop.vectored.io.enabled=false does not prevent the OOM — confirmed by identical post-override stack traces.

Versions tested

Version Vectored I/O code path OOM observed
1.7.2 inert (no RangeReadable adapter) no
1.10.0 inert no (separate Connection pool shut down issue)
1.10.2 inert no
1.11.0 active + hardcoded on-heap yes, persistent

Happy to provide additional event-log metrics or a minimal reproducible Spark job if useful.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions