Skip to content

fix(streaming-write): use rolling ParquetWriter + OutputStream.tell() for spec-correct file sizes and bounded memory #3388

@paultmathew

Description

@paultmathew

Background

PR #3335 added pa.RecordBatchReader as a valid input to Table.append/Table.overwrite using a buffered bin-pack approach (bin_pack_record_batches). That implementation has two acknowledged caveats called out in its docstrings:

  1. Memory bound: peak memory is N_workers × write.target-file-size-bytes (~4 GiB at defaults) — better than materialising everything, but not constant.
  2. Byte semantics: write.target-file-size-bytes is interpreted as uncompressed in-memory Arrow bytes, not on-disk compressed Parquet bytes. Resulting files are typically 3–10× smaller than the property suggests — diverging from the Java/Spark/Flink writers.

Proposed fix

Replace the bin-pack approach with a rolling pq.ParquetWriter driven by OutputStream.tell() (added in #2998 specifically for this purpose):

with output_file.create(overwrite=True) as fos:
    with pq.ParquetWriter(fos, schema=..., ...) as writer:
        writer.write_batch(first_batch)
        while fos.tell() < target_file_size:   # ← compressed on-disk bytes
            batch = next(batches)
            writer.write_batch(batch)

This delivers:

  • Spec-correct file sizes: tell() reports compressed on-disk bytes, so write.target-file-size-bytes finally means what the Iceberg spec intends — consistent with the Java/Spark/Flink writers.
  • Truly bounded memory: peak RSS is bounded by one input batch + Parquet page buffer (~1 MiB × columns) + S3 multipart pool (~5 MiB × ~8 parts), regardless of target_file_size, dataset size, or number of files produced.
  • No public API change: same tbl.append(reader) / tbl.overwrite(reader) interface.

Fix

#3336

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions