compute: store correction v2 chunks in a columnar chunk region#36577
Conversation
def-
left a comment
There was a problem hiding this comment.
The update count is double counted:
diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs
index 23ef00ebf9..a243558737 100644
--- a/src/compute/src/sink/correction_v2.rs
+++ b/src/compute/src/sink/correction_v2.rs
@@ -1519,3 +1519,20 @@ impl<D: Data> Ord for MergeCursor<D> {
(t1, d1).cmp(&(t2, d2)).reverse()
}
}
+
+#[cfg(test)]
+mod tests {
+ use mz_repr::{Diff, Timestamp};
+
+ use super::ChainBuilder;
+
+ #[mz_ore::test]
+ fn chain_builder_update_count_matches_items() {
+ let mut builder = ChainBuilder::<i64>::default();
+ for i in 0..10_i64 {
+ builder.push_owned(&(i, Timestamp::new(i as u64), Diff::ONE));
+ }
+ let chain = builder.finish();
+ assert_eq!(chain.update_count, chain.iter().count());
+ }
+}fails:
---- sink::correction_v2::tests::chain_builder_update_count_matches_items stdout ----
thread 'sink::correction_v2::tests::chain_builder_update_count_matches_items' (1838738) panicked at src/compute/src/sink/correction_v2.rs:1536:9:
assertion `left == right` failed
left: 20
e91476b to
f9c0ff8
Compare
ReviewRefactor replacing Blocking
Strong suggestions
Nits
Good
Bottom line: solid refactor, but the spill path — the point of the PR — has no test. Add one minting-crossing test and I'd consider it mergeable. 🤖 Generated with Claude Code |
The MV sink's spillable correction buffer (PR #30083) stored each chunk in a `StackWrapper`/`ColumnationStack`, which has two issues: * the outer `Vec<T>` is heap-allocated (not lgalloc-backed), so it does not spill; * the inner columnation regions over-allocate (no convenient way to pre-size the storage), so per-chunk memory is hard to predict. Switch the chunk storage to a `ChunkData` enum (`Typed` + `Align`) backed by the `columnar` crate, and add a `ChunkBuilder` that mints chunks at the same ~2 MiB serialized-size boundary used by the merge-batcher's `ColumnBuilder`. Each minted chunk is a single aligned `Vec<u64>`, so chunks are fixed-size and per-chunk memory tracks usage closely. The `Bytes` variant of `Column` is omitted because it wraps `Arc<dyn Any>` and is `!Sync`, which would prevent `updates_before` from returning a `+ Send` iterator.
- Drop the local TripleContainer/TripleBorrowed/UpdateRef aliases in favor of columnar's ContainerOf/BorrowedOf/Ref. - Bound D::Container's Ref<'_> by Eq + Ord so the merge code (merge_2, merge_many's MergeCursor::cmp, Chain::can_accept) can compare refs directly instead of cloning through into_owned. - Add #[columnar(derive(Eq, PartialEq, Ord, PartialOrd))] to DataflowErrorSer so its auto-generated Reference satisfies the new Ref-level Ord bound. - MergeHeap::pop_equal still falls back to into_owned because the heap-top Ref (lifetime tied to self.0) and the parameter Ref have unrelated lifetimes that can't be unified through reborrow_ref; using into_owned both ends the immutable borrow before self.pop() and avoids the lifetime gymnastics. Comparison is guarded behind a time check so the clone only fires when timestamps match.
Use a `refs_eq` helper that explicitly reborrows both columnar refs to a fresh local lifetime via `Columnar::reborrow` before comparing. The projected `<C as Borrow>::Ref<'a>` is invariant in `'a`, so the compiler won't shorten the inputs by variance on its own — the helper bridges that gap and lets the `for<'a> Ref<'a>: Eq` bound on `Data` do the rest.
`ChainBuilder::push_{ref,owned}` were incrementing `chain.update_count`
by 1 per call, while `Chain::push_chunk` (invoked from `drain`/`finish`)
also adds `chunk.len()` when each minted chunk lands in the chain. Drop
the per-push increment and let `push_chunk` be the sole accounting
point. Add a regression test from the reviewer that asserts
`chain.update_count == chain.iter().count()`.
The `workflow_materialized_view_correction_pruning` test asserted that the cumulative correction-buffer insertions metric exceeds 1000 after a 1000-row snapshot has been consolidated away. The metric is wired to *net length deltas*, not raw record insertions, so the right floor is `>= 1000`: the desired side grows the buffer by 1000 and the persist-retraction side then shrinks it back to 0, contributing one insertion-delta of 1000 and one deletion-delta of 1000 — never a brief 2000. Before the columnar chunk-region refactor, each new chunk allocation added a stray `+1` to `Chain::update_count` due to a double-count in `Chain::push`, which inflated the cumulative insertions just enough for `> 1000` to hold. The new chunk-builder counts accurately and lands at exactly 1000, which is the correct value; tighten the test to that.
The correction v2 chain invariant kept each chain at least `chain_proportionality` times as large as the next, comparing chains by `Chain::len()` (chunk count). That was a faithful proxy under the old columnation storage, where each chunk held a fixed number of updates. With byte-bounded chunks, chunk count is no longer proportional to update count: any chain below the ~2 MiB spill boundary is a single chunk regardless of how many updates it holds. With the default 8 KiB `chunk_size` and the 2 MiB boundary, that covers the common case, so the comparison degenerated to 1-vs-1, the geometric invariant collapsed, and every insert cascaded a full-buffer merge -- O(N^2) inserts up to the 2 MiB boundary instead of O(log N) amortized. Compare on `update_count` instead, which is what merge cost is actually proportional to and which restores the pre-refactor amortization. Remove the now-unused `Chain::len()`. Also add a test that pushes enough updates to cross at least one `mint()` boundary and asserts `iter()` roundtrips values, order, and diffs across the `Align` encode/decode spill path, which previously had no coverage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`timely_bytes` 0.30 marked `Bytes` as `unsafe impl Send + Sync` (the `Arc<dyn Any>` is only ever read by reference), so `Column<C>` no longer needs the custom `ChunkData` enum that was carved out to dodge the `!Sync` `Bytes` variant. Drop the wrapper and store the column directly; the `ChunkBuilder` now delegates to the existing `mz_timely_util::columnar::builder::ColumnBuilder`, which already implements the ~2 MiB serialized-size minting logic this file was duplicating. Note in the `Chunk` doc that the `ColumnPager` from #36552 is the next spill layer to wire in. The integration needs a deeper rework of the cursor sharing model (`Cursor` holds `Rc<Chunk<D>>` and accesses chunks through `&self`, while `pager.take` consumes its `PagedColumn`) so it belongs in its own PR.
e9b42f9 to
a96d476
Compare
a96d476 to
b1680dc
Compare
| /// [`Columnar::reborrow`] before letting the inner `==` pick up the `for<'a> Ref<'a>: Eq` | ||
| /// bound on [`Data`]. | ||
| #[inline] | ||
| fn refs_eq<D: Data>(a: Ref<'_, D>, b: Ref<'_, D>) -> bool { |
There was a problem hiding this comment.
I tried to change the Data trait to make equality work without this reborrow helper but it quickly turned into a monster
DAlperin
left a comment
There was a problem hiding this comment.
LGTM. Probably wants a feature-benchmark run for memory usage
Alphadelta14
left a comment
There was a problem hiding this comment.
providing a stamp from console and cloud.
I didn't see obvious fallacies in my skim through.
|
Ran nightly, no performance regressions (https://buildkite.com/materialize/nightly/builds/16697) |
### Motivation #30083 added a spillable MV correction buffer that uses `StackWrapper`/`ColumnationStack` for the chunk type. That storage has two issues for the spilling use case: 1. The outer `Vec<T>` lives on the regular heap (not lgalloc-backed), so it does not spill. 2. The columnation regions inside grow without a convenient way to pre-size them, so per-chunk memory is hard to predict. This PR replaces the chunk storage with a columnar chunk region: each chunk becomes a single, fixed-size allocation, so the correction buffer's memory behavior is predictable and spillable. Part of MaterializeInc/database-issues#8464. ### Changes **`Chunk` storage.** Each chunk now holds a `ChunkData<D>` enum with two variants: - `Typed(ContainerOf<(D, Timestamp, Diff)>)` — typed columnar container, used while a chunk is still being filled. - `Align(Vec<u64>)` — finished chunk encoded into a single aligned allocation. This mirrors `mz_timely_util::columnar::Column` minus its `Bytes` variant (which wraps `Arc<dyn Any>` and is `!Sync`, breaking the `+ Send` iterator returned by `updates_before`). **`ChunkBuilder`.** New helper that mints chunks at the same ~2 MiB serialized-size boundary used by `mz_timely_util::columnar::builder::ColumnBuilder` (`SHIP_WORDS = 1 << 18`). When the in-progress container reaches that boundary, its bytes are encoded into a `Vec<u64>` of exactly the required capacity and emitted as a `ChunkData::Align`. Each minted chunk is therefore a single predictably-sized allocation, and remainder data is flushed as a `ChunkData::Typed` on finalize. **`ChainBuilder`.** Wraps a `ChunkBuilder` and drains minted chunks into a `Chain<D>`. All the chain-producing paths (`Stage::{insert, flush}`, `Cursor::{into_chain, try_unwrap}`, `CorrectionV2::{merge_2, merge_many}`) go through it; `Chain` itself no longer owns a partial chunk or carries a `chunk_capacity`. **`Data` trait.** Now requires `Columnar` (with `Container: Send + Sync + Clone`) instead of `Columnation`, plus `for<'a> Borrow<Ref<'a>: Eq + Ord>` so the merge/heap code can compare update refs directly without `into_owned` on the hot path. Both `Row` and `DataflowErrorSer` already implement `Columnar`; the latter gained `#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]` so its auto-generated `Reference` satisfies the new ref-level bound. **Lifetime helper for `pop_equal`.** `<C as Borrow>::Ref<'a>` is invariant in `'a` through trait projection, so the compiler can't shorten two refs with unrelated lifetimes by variance on its own. A small `refs_eq` helper explicitly reborrows both inputs to a fresh local lifetime via `Columnar::reborrow` before delegating to `Eq`. **Chain invariant.** The `chain_proportionality` invariant now compares chains by update count rather than chunk count. Chunk count was a faithful proxy under columnation (fixed updates per chunk) but is not anymore: any chain below the ~2 MiB boundary is a single chunk regardless of update count, so the comparison would degenerate to 1-vs-1, collapse the geometric invariant, and turn inserts into O(N^2) up to the boundary. Comparing update counts restores the pre-refactor O(log N) amortization. ### Notes for reviewers - `compute_correction_v2_chunk_size` changed meaning: it now only sizes the in-memory `Stage` staging buffer and the ship cadence. On-chain chunk byte size is fixed by the builder's ~2 MiB boundary (`SHIP_WORDS`) and no longer follows this knob. - Correction-size metrics (`mz_persist_sink_correction_*`) now report serialized bytes (`indexed::length_in_bytes`) instead of columnation heap bytes. Balance-based assertions (`correction-buffer-metrics.td`) are unaffected, but absolute byte values shift — heads-up for anyone dashboarding them. - Added `chain_builder_roundtrips_across_mint_boundary`, exercising the `Align` encode/decode spill path (crossing at least one `mint()`), which previously had no coverage. ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. - [ ] This PR has an associated up-to-date design doc, is a design doc, or is sufficiently small to not require a design. - [ ] If this PR evolves an existing `$T ⇔ Proto$T` mapping (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR. - [ ] If this PR includes major user-facing behavior changes, I have pinged the relevant PM. --------- Co-authored-by: Claude <noreply@anthropic.com>
Motivation
#30083 added a spillable MV correction buffer that uses
StackWrapper/ColumnationStackfor the chunk type. That storage has two issues for the spilling use case:Vec<T>lives on the regular heap (not lgalloc-backed), so it does not spill.This PR replaces the chunk storage with a columnar chunk region: each chunk becomes a single, fixed-size allocation, so the correction buffer's memory behavior is predictable and spillable.
Part of MaterializeInc/database-issues#8464.
Changes
Chunkstorage. Each chunk now holds aChunkData<D>enum with two variants:Typed(ContainerOf<(D, Timestamp, Diff)>)— typed columnar container, used while a chunk is still being filled.Align(Vec<u64>)— finished chunk encoded into a single aligned allocation.This mirrors
mz_timely_util::columnar::Columnminus itsBytesvariant (which wrapsArc<dyn Any>and is!Sync, breaking the+ Senditerator returned byupdates_before).ChunkBuilder. New helper that mints chunks at the same ~2 MiB serialized-size boundary used bymz_timely_util::columnar::builder::ColumnBuilder(SHIP_WORDS = 1 << 18). When the in-progress container reaches that boundary, its bytes are encoded into aVec<u64>of exactly the required capacity and emitted as aChunkData::Align. Each minted chunk is therefore a single predictably-sized allocation, and remainder data is flushed as aChunkData::Typedon finalize.ChainBuilder. Wraps aChunkBuilderand drains minted chunks into aChain<D>. All the chain-producing paths (Stage::{insert, flush},Cursor::{into_chain, try_unwrap},CorrectionV2::{merge_2, merge_many}) go through it;Chainitself no longer owns a partial chunk or carries achunk_capacity.Datatrait. Now requiresColumnar(withContainer: Send + Sync + Clone) instead ofColumnation, plusfor<'a> Borrow<Ref<'a>: Eq + Ord>so the merge/heap code can compare update refs directly withoutinto_ownedon the hot path. BothRowandDataflowErrorSeralready implementColumnar; the latter gained#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]so its auto-generatedReferencesatisfies the new ref-level bound.Lifetime helper for
pop_equal.<C as Borrow>::Ref<'a>is invariant in'athrough trait projection, so the compiler can't shorten two refs with unrelated lifetimes by variance on its own. A smallrefs_eqhelper explicitly reborrows both inputs to a fresh local lifetime viaColumnar::reborrowbefore delegating toEq.Chain invariant. The
chain_proportionalityinvariant now compares chains by update count rather than chunk count. Chunk count was a faithful proxy under columnation (fixed updates per chunk) but is not anymore: any chain below the ~2 MiB boundary is a single chunk regardless of update count, so the comparison would degenerate to 1-vs-1, collapse the geometric invariant, and turn inserts into O(N^2) up to the boundary. Comparing update counts restores the pre-refactor O(log N) amortization.Notes for reviewers
compute_correction_v2_chunk_sizechanged meaning: it now only sizes the in-memoryStagestaging buffer and the ship cadence. On-chain chunk byte size is fixed by the builder's ~2 MiB boundary (SHIP_WORDS) and no longer follows this knob.mz_persist_sink_correction_*) now report serialized bytes (indexed::length_in_bytes) instead of columnation heap bytes. Balance-based assertions (correction-buffer-metrics.td) are unaffected, but absolute byte values shift — heads-up for anyone dashboarding them.chain_builder_roundtrips_across_mint_boundary, exercising theAlignencode/decode spill path (crossing at least onemint()), which previously had no coverage.Checklist
$T ⇔ Proto$Tmapping (possibly in a backwards-incompatible way), then it is tagged with aT-protolabel.