feat(parquet): make PushBuffers boundary-agnostic for prefetch IO#9697
feat(parquet): make PushBuffers boundary-agnostic for prefetch IO#9697HippoBaro wants to merge 1 commit intoapache:mainfrom
PushBuffers boundary-agnostic for prefetch IO#9697Conversation
5eab5b9 to
5d60935
Compare
|
@alamb This PR changes code you previously wrote, and I’d value your take on the direction. Beyond fixing the quadratic complexity issue, it also pushes |
|
I will review it -- I am currently working on reviewing Thanks @HippoBaro |
504035a to
2f891e7
Compare
|
Rebased onto |
The `PushDecoder` (introduced in apache#7997, apache#8080) is designed to decouple IO and CPU. It holds non-contiguous byte ranges, with a `NeedsData`/`push_range` protocol. However, it requires each logical read to be satisfied in full by a single physical buffer: `has_range`, `get_bytes`, and `Read::read` all searched for one buffer that entirely covered the requested range. This assumption conflates two orthogonal IO strategies: - Coalescing: the IO layer merges adjacent requested ranges into fewer, larger fetches. - Prefetching: the IO layer pushes data ahead of what the decoder has requested. This is an inversion of control: the IO layer speculatively fills buffers at offsets not yet requested and for arbitrary buffer sizes. These two strategies interact poorly with the current release mechanism (`clear_ranges`), which matches buffers by exact range equality: - Coalescing is both rewarded and punished. It is load bearing because without it, the number of physical buffers scale with ranges requested, and `clear_ranges` performs an O(N×M) scan to remove consumed ranges, producing quadratic overhead on wide schemas. But it is also punished because a coalesced buffer never exactly matches any individual requested range, so `clear_ranges` silently skips it: the buffer leaks in `PushBuffers` until the decoder finishes or the caller manually calls `release_all_ranges` (apache#9624). This increases peak RSS proportionally to the amount of data coalesced ahead of the current row group. - Prefetching is structurally impossible: speculatively pushed buffers will straddle future read boundaries, so the decoder cannot consume them, and `clear_ranges` cannot release them. This commit makes `PushBuffers` boundary-agnostic, completing the prefetching story, and changes the internals to scale with buffer count instead of range count: - Buffer stitching: `has_range`, `get_bytes`, and `Read::read` resolve logical ranges across multiple contiguous physical buffers via binary search, so the IO layer is free to push arbitrarily-sized parts without knowing future read boundaries. This is a nice improvement, because some IO layer can be made much more efficient when using uniform buffers and vectorized reads. - Incremental release (`release_through`): replaces `clear_ranges` with a watermark-based release that drops all buffers below a byte offset, trimming straddling buffers via zero-copy `Bytes::slice`. The decoder calls this automatically at row-group boundaries. Benchmark results (vs baseline): push_decoder/1buf/1000ranges 321.9 µs (was 323.5 µs, −1%) push_decoder/1buf/10000ranges 3.26 ms (was 3.25 ms, +0%) push_decoder/1buf/100000ranges 34.9 ms (was 34.6 ms, +1%) push_decoder/1buf/500000ranges 192.2 ms (was 185.3 ms, +4%) push_decoder/Nbuf/1000ranges 363.9 µs (was 437.2 µs, −17%) push_decoder/Nbuf/10000ranges 3.82 ms (was 10.7 ms, −64%) push_decoder/Nbuf/100000ranges 42.1 ms (was 711.6 ms, −94%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
2f891e7 to
d2ea6c4
Compare
etseidl
left a comment
There was a problem hiding this comment.
Thanks @HippoBaro. This looks interesting. I still have to do a deep dive on #9653 before diving into this.
| /// single range). Coalescing at this level would require copying the data but | ||
| /// the caller may already have the needed data in a single buffer which would | ||
| /// require no copying. | ||
| /// This buffer does not coalesce (merging adjacent ranges of bytes into a ingle |
There was a problem hiding this comment.
| /// This buffer does not coalesce (merging adjacent ranges of bytes into a ingle | |
| /// This buffer does not coalesce (merging adjacent ranges of bytes into a single |
|
Nice, @HippoBaro. One finding: this change introduces a regression in reverse row-group scans, which surfaced to me in a Datafusion test: // Test reverse scan
let opener = make_opener(true);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;
// The forward scan should return data in the order written
assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
// With reverse scan, row groups are reversed, so we expect:
// Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1 (1,2,3)
assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);result: Basically, the incremental release watermark invariant conflicts with reverse row-group traversal. From what I understand, incremental release decodes a row group, marks its ending byte offset as the watermark, and then expects subsequent ranges to stay at or above that offset. So not able to do reverse traversal and safely release in this way. could potentially support a symmetric reverse version of the same monotonic watermark tracking you've introduced here? |
|
|
||
| /// Use [`Self::release_all`] instead. | ||
| #[deprecated(since = "58.1.0", note = "Use `release_all` instead")] | ||
| pub fn clear_all_ranges(&mut self) { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
There was a problem hiding this comment.
Honestly, we can keep the symbol—it’s just a rename. I’m trying not to expose the concept of “ranges” on the release side, since the watermark mechanism supersedes it.
| /// | ||
| /// Because IO completions are expected to generally arrive in-order, | ||
| /// `push_range` appends without sorting. We instead delay sorting until | ||
| /// conumption to amortize its cost, if necessary. |
There was a problem hiding this comment.
| /// conumption to amortize its cost, if necessary. | |
| /// consumption to amortize its cost, if necessary. |
|
run benchmark push_decoder |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing boundary_agnostic_pushbuffers (d2ea6c4) to 711fac8 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
@nathanb9 Thank you for the feedback; I'll see what I can do there |
That is certainly nice looking |
|
run benchmark arrow_reader |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing boundary_agnostic_pushbuffers (d2ea6c4) to 711fac8 (merge-base) diff File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
Thank you @HippoBaro and @nathanb9 -- this is a great start.
In general I have two major concerns:
- The assumption of range usage in the parquet decoder
- The manual management of the sorted flag / parallel buffers
Let me know what you think
| /// Clear any staged ranges currently buffered for future decode work. | ||
| pub fn clear_all_ranges(&mut self) { | ||
| self.buffers.clear_all_ranges(); | ||
| /// Release all staged ranges currently buffered for future decode work. |
| // All data for this row group has been extracted into the | ||
| // InMemoryRowGroup. Release physical buffers up to the end | ||
| // of this row group so streaming IO can reclaim memory. | ||
| self.buffers |
There was a problem hiding this comment.
You can configure the parquet reader to read row groups in some arbitrary order with_row_groups
Also technically there is no reason that row groups have to be written in order (though most writers will do that) -- for example, you could have a file where the bytes for row group 0 are after the bytes for row group 1.
So I think assuming that the reader will never want any bytes prior to the current row group should be reconsidered.
Can we instead perhaps release data for the start/end of the row group? rather than just a one sided range?
There was a problem hiding this comment.
Also technically there is no reason that row groups have to be written in order (though most writers will do that) -- for example, you could have a file where the bytes for row group 0 are after the bytes for row group 1.
Indeed, coworkers of mine are using this property as a means to do deletions from parquet files. Rewrite a single row group, tack it onto the end of the file, and then modify the footer to point to the new row group and ignore the original.
| self.file_offset | ||
| } | ||
|
|
||
| /// Returns the byte offset just past the last column chunk in this row group. |
There was a problem hiding this comment.
I think practically speaking most parquet files will have the column chunks for one row group written contiguously in the file, but I am not sure the spec requires this. I do think it effectively requires all pages for a column to be in a contiguous range
| /// | ||
| /// Thus, the implementation defers to the caller to coalesce subsequent requests | ||
| /// if desired. | ||
| /// # No Speculative Prefetching |
| /// The buffers of data that can be used to decode the Parquet file | ||
| buffers: Vec<Bytes>, | ||
| /// High-water mark set by [`Self::release_through`]. After a release, | ||
| /// no push, has_range, or read may target offsets below this value. |
There was a problem hiding this comment.
it is probably good to point outwhat "may not" means (like does the code panic if it is tried?)
| /// binary search (`has_range`, `get_bytes`, `release_through`, | ||
| /// `Read::read`). Callers that hold `&mut PushBuffers` should call this | ||
| /// once before lending `&PushBuffers` to read-side code. | ||
| pub fn ensure_sorted(&mut self) { |
| return; | ||
| } | ||
|
|
||
| // Insertion sort: zero-allocation and linear on nearly-sorted input |
There was a problem hiding this comment.
this is n^2 on reverse sorted input though, right?
| watermark: u64, | ||
| /// Whether `ranges`/`buffers` are sorted by range start. | ||
| /// Set to `false` on every `push_range`, restored lazily before reads. | ||
| sorted: bool, |
There was a problem hiding this comment.
What if we encoded the sort invariant in the type system rather than relying on the flag to be set correctly? Something like
enum Buffers {
Sorted {
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
UnSorted {
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
}Maybe it is overly complicated but it make it much clearer that all paths correctly update the sorting
| @@ -48,6 +59,13 @@ pub(crate) struct PushBuffers { | |||
| ranges: Vec<Range<u64>>, | |||
There was a problem hiding this comment.
If the goal is to keep a list sorted by start range, did you consider using a BTreeSet? You could then define some sort of wrapper over Range/Bytes liie
struct RangeAndData {
range: Range<u64>,
buffer: Bytes
}
impl PartialOrd for RangeAndData {
// define comparison from start range
}
pub(crate) struct PushBuffers {
...
buffers: BtreeSet<RangeAndData>,
}That would probably simplify the accounting significantly
|
I was thinking more about this change and I thought maybe we could take a step back and figure out what you are trying to accomplish For example, if your usecase is to clear all IO after reading a row group's data, I wonder if we could you just call Recently, I have been working on something downstream in DataFusion where I would like a similar API One solution could be to make a way split off RemainingRowGroups perhaps into a different decoder 🤔 |
Which issue does this PR close?
PushBuffers::clear_rangesis quadratic and leaks #9695 .Rationale for this change
The
PushDecoder(introduced in #7997, #8080) is designed to decouple IO and CPU. It holds non-contiguous byte ranges, with aNeedsData/push_rangeprotocol. However, it requires each logical read to be satisfied in full by a single physical buffer:has_range,get_bytes, andRead::readall searched for one buffer that entirely covered the requested range.This assumption conflates two orthogonal IO strategies:
These two strategies interact poorly with the current release mechanism (
clear_ranges), which matches buffers by exact range equality:Coalescing is both rewarded and punished. It is load bearing because without it, the number of physical buffers scale with ranges requested, and
clear_rangesperforms an O(N×M) scan to remove consumed ranges, producing quadratic overhead on wide schemas. But it is also punished because a coalesced buffer never exactly matches any individual requested range, soclear_rangessilently skips it: the buffer leaks inPushBuffersuntil the decoder finishes or the caller manually callsrelease_all_ranges(ParquetPushDecoder API to clear all buffered ranges #9624). This increases peak RSS proportionally to the amount of data coalesced ahead of the current row group.Prefetching is structurally impossible: speculatively pushed buffers will straddle future read boundaries, so the decoder cannot consume them, and
clear_rangescannot release them.What changes are included in this PR?
This commit makes
PushBuffersboundary-agnostic, completing the prefetching story, and changes the internals to scale with buffer count instead of range count:Buffer stitching:
has_range,get_bytes, andRead::readresolve logical ranges across multiple contiguous physical buffers via binary search, so the IO layer is free to push arbitrarily-sized parts without knowing future read boundaries. This is a nice improvement, because some IO layer can be made much more efficient when using uniform buffers and vectorized reads.Incremental release (
release_through): replacesclear_rangeswith a watermark-based release that drops all buffers below a byte offset, trimming straddling buffers via zero-copyBytes::slice. The decoder calls this automatically at row-group boundaries.Are these changes tested?
Significant coverage added, all tests passing. Benchmark results (vs baseline in #9696):
Are there any user-facing changes?
PushBuffers:: clear_all_rangesmarked as deprecated in favor of the newerPushBuffers::clear_all.