Pluggable page spilling for the Parquet ArrowWriter (PageStore)#10020
Pluggable page spilling for the Parquet ArrowWriter (PageStore)#10020adriangb wants to merge 5 commits into
Conversation
Introduce a "dumb" key/value page store that the ArrowWriter uses to buffer completed, serialized pages while a row group is being written. The store maps an opaque, store-allocated PageKey to a blob of bytes and knows nothing about pages, dictionaries, ordering, or offsets — the caller keeps the handles and decides what they mean. The default InMemoryPageStore keeps blobs in a Vec<Bytes>, byte-for-byte equivalent to the previous buffering with zero overhead. A PageStoreFactory is threaded through ArrowWriterOptions -> ArrowRowGroupWriterFactory -> ArrowColumnWriterFactory so users can plug in a backend (temp file, object storage) to bound peak write memory independently of row group size. ArrowColumnChunkData now holds (store, keys) and materializes blobs in write order at splice time, preserving the existing append_column path. Tests: - column::page_store unit tests for the in-memory backend contract. - A byte-identical round-trip test using a custom HashMap-backed store with sparse, non-contiguous handles, proving the writer relies only on the opaque-handle contract. - An always-on dhat integration test capturing the in-memory peak-heap baseline (memory grows with the row group), against which a spilling backend will be measured. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously the splice materialized an entire column chunk back into a Vec<Bytes> before copying it into the output file, so peak memory during the splice phase was bounded by the largest column chunk — defeating a spilling backend for skewed schemas. Replace the materialize-then-copy path with StreamingColumnChunkReader, a Read that takes each page blob back out of the store in write order as it is consumed and releases it immediately, so the splice holds at most one page in memory at a time. SerializedRowGroupWriter::append_column is refactored to delegate to a new append_column_from_read that consumes an owned Read (append_column itself is unchanged for external ChunkReader callers). For the default in-memory store this is behavior-preserving (it already holds the bytes); for a spilling store it keeps the splice within the memory bound. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pilling backend PageKey's field was private, so an external PageStore implementor could not mint the handle it must return from put() — the trait was unusable outside the crate. Add public PageKey::new/get so any backend can allocate its own opaque, dense handles. Extend the dhat integration test with a temp-file PageStore backend (one unlinked temp file per column chunk; put appends, take seeks+reads) and assert the headline invariant: writing a skewed ~16 MiB single row group, peak heap drops from ~18 MiB with the in-memory store to ~3 MiB with the spilling store — bounded by the in-flight encoder/dictionary buffers rather than the row group size. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…oint #2) Dictionary-encoded columns buffered every completed data page in GenericColumnWriter.data_pages until close(), because the dictionary page must be written first but isn't final until all values are seen. Those pages never reached the PageStore, so spilling couldn't bound them — a low-cardinality 4.2M-row column peaked ~2.5 MiB regardless of backend. Add PageWriter::defers_dictionary_ordering(): a writer that buffers the whole chunk and splices it later (the Arrow path) can accept data pages before the dictionary page and order them itself. When set, the column writer streams dictionary-column data pages straight through instead of buffering them. ArrowPageWriter returns true, holds the (bounded) dictionary page in memory since it now arrives last, and at splice emits it first; the buffer-relative page offsets recorded in production order are rewritten to the dictionary-first layout there. The column-at-a-time SerializedFileWriter path is unchanged (defaults to false). Also fix memory_size() accounting: instead of counting bytes written (which over-reports once pages are spilled off-heap), ask the page writer how much it actually holds resident via PageWriter::buffered_memory_size() and PageStore::memory_size(). For the in-memory store this is unchanged; for a spilling store it drops to ~0 plus the retained dictionary page. Result: the dict-column case drops from ~2.69 MiB to ~0.48 MiB peak heap with a spilling backend. Adds an offset-index-disabled dictionary round-trip test and store memory-size unit tests; extends the dhat test with the dictionary-column scenario. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The test uses parquet::arrow, so without a required-features entry it was auto-discovered and compiled under --all-targets --no-default-features, breaking that CI compilation check. Mirror the other arrow integration tests with required-features = ["arrow"]. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmark arrow_writer |
|
run benchmark writer_overhead |
|
run benchmark parquet_round_trip |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-spill (36db7ea) to fd1c5b3 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-spill (36db7ea) to fd1c5b3 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-spill (36db7ea) to fd1c5b3 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark parquet_round_trip |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-spill (36db7ea) to fd1c5b3 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-spill (36db7ea) to fd1c5b3 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
Problem description
We currently buffer entire row groups in memory. From our own docs:
For our production workload where we have ~400 columns with large data skews (some much larger than others) this causes >=12GBs of memory consumed just to write Parquet.
When
ArrowWriterwrites a row group, record batches arrive with all columnsinterleaved, but each Parquet column chunk must be contiguous on disk. So every
column's compressed pages are buffered for the whole row group and only
spliced into the output at flush. Peak
ArrowWritermemory is therefore≈ Σ(compressed bytes of every column chunk) for one row group, and it grows with
the row group size.
Today the only lever against this is
[ArrowWriter::in_progress_size](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#method.in_progress_size)()+ flushingsmaller row groups — which trades away compression and read-time (page/row-group)
pruning. This has negative consequences for encoding efficiency, read performance, etc. Parquet already has pages, we don't need one column to force the layout of another. Ideally what we'd want in a case like this is a large (lets just say 1M row) row group with ~4 1MB pages for the
id: i32column and N ~1MB pages for thelarge_textcolumn. Reading the small id column has no data fragmentation penalty, no page index bloat penalty, etc.Related issues
Some of the issues I could find
ArrowWriter#5484Proposed solution
Introduce a trait for pluggable buffering. In particular we would like to implement spilling (spill buffered completed pages to disk). If this works well it can be upstreamed / made easily configurable and usable for all arrow users. I am not adding an implementation here to avoid discussing those APIs (is it a temp dir, how does it get configured, etc.).
What changes are included in this PR?
A small, intentionally "dumb" key/value store trait and its wiring, in four
stacked commits:
PageStore+PageKey+PageStoreFactory+InMemoryPageStore, wiredinto
ArrowWriter. The store maps an opaque, store-allocatedPageKeyto ablob of bytes and knows nothing about pages, dictionaries, ordering, or
offsets — the caller keeps the handles and decides what they mean. The
default
InMemoryPageStore(aVec<Bytes>) is byte-for-byte equivalent tothe previous buffering with zero overhead. A
PageStoreFactoryis threadedthrough
ArrowWriterOptions::with_page_store_factory→ArrowRowGroupWriterFactory→ArrowColumnWriterFactory.materialize-then-copy splice with a
Readthat takes each page blob back outof the store in write order as it is consumed and releases it immediately,
so the splice never holds more than one page in memory at a time (essential
for a spilling backend on skewed schemas).
append_columnis unchanged forexternal
ChunkReadercallers.PageKey::new/get(so external backends can mint their ownhandles) + a dhat memory regression test with a temp-file backend.
buffered every completed data page in
GenericColumnWriter.data_pagesuntilclose()(the dictionary page must be written first but isn't final until allvalues are seen), so those pages never reached the store. A new
PageWriter::defers_dictionary_ordering()lets a writer that buffers thewhole chunk and splices later (the Arrow path) accept data pages before the
dictionary page and order them itself; the column writer then streams
dictionary-column data pages straight through.
ArrowPageWriterholds the(bounded, ≤
dict_page_size_limit) dictionary page in memory — it now arriveslast — and emits it first at splice, where the production-order page offsets
are rewritten to the dictionary-first layout. The column-at-a-time
SerializedFileWriterpath is unchanged (it commits bytes live and stillbuffers, which is inherent there). This commit also fixes
memory_size()toreport bytes the writer actually holds resident (via
PageStore::memory_size/PageWriter::buffered_memory_size) rather thanbytes written, so it drops to ~0 once pages are spilled off-heap.
Are these changes tested?
Yes:
A byte-identical round-trip test using a custom
PageStorewith sparse,non-contiguous,
HashMap-backed handles, proving the writer relies only onthe opaque-handle contract across dictionary and non-dictionary columns and
multiple row groups.
A dictionary round-trip test with the offset index disabled, covering the
path where only the chunk-level dictionary/data page offsets are rewritten.
Unit tests for the in-memory backend contract and its resident-byte reporting.
An always-on
dhatintegration test (parquet/tests/page_spill_memory.rs)measuring peak heap, for both a skewed wide row group (~16 MiB) and a
low-cardinality dictionary column (~4.2M rows):
i.e. the spilling backend bounds peak write memory by the in-flight
encoder/dictionary buffers rather than the row group size, for both the page
buffer and the dictionary-column data pages.
Are there any user-facing changes?
New, additive public API (default behavior unchanged):
ArrowWriterOptions::with_page_store_factoryPageStore,PageKey,PageStoreFactory,InMemoryPageStore,InMemoryPageStoreFactory(re-exported fromparquet::arrow::arrow_writer,defined in
parquet::column::page_store).PageWritertrait methodsdefers_dictionary_ordering()andbuffered_memory_size()(both default to the previous behavior), and adefaulted
PageStore::memory_size().Not covered (by design)
SerializedFileWriterpath still buffersdictionary-column data pages: it commits bytes to the file live, so the
dictionary-first ordering must be resolved during encoding. That path already
has minimal memory otherwise.
(already bounded by the page/dict size limits), as do bloom filters.
🤖 Generated with Claude Code