From c578f188299a94e04991e42395c5e522de375269 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 May 2026 21:17:56 -0400 Subject: [PATCH 01/10] docs: design for deasync of txns_progress_frontiers Specifies converting txns_progress_frontiers from AsyncOperatorBuilder to synchronous OperatorBuilderRc. Grounds correctness in two provable facts (buffered data safe at pre-activation cap; emptiness of [physical_upper, logical_upper)) rather than re-deriving per-activation logic, which is how PRs 36537 and 36548 introduced bugs. Adds a differential test against the async reference as the backstop against unknown interleavings. Co-Authored-By: Claude Opus 4.8 --- .../20260529_txn_wal_frontiers_deasync.md | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 doc/developer/design/20260529_txn_wal_frontiers_deasync.md diff --git a/doc/developer/design/20260529_txn_wal_frontiers_deasync.md b/doc/developer/design/20260529_txn_wal_frontiers_deasync.md new file mode 100644 index 0000000000000..b5eb60c40e917 --- /dev/null +++ b/doc/developer/design/20260529_txn_wal_frontiers_deasync.md @@ -0,0 +1,145 @@ +# Deasync `txns_progress_frontiers` + +## Summary + +Convert the `txns_progress_frontiers` operator in `src/txn-wal/src/operator.rs` from `AsyncOperatorBuilder` to the synchronous `OperatorBuilderRc`. +The operator's only awaits are on timely input handles, so no async runtime work is required; the async state machine exists purely to express "block until the next input event." +Two earlier attempts (PRs 36537, 36548) were reverted or abandoned after the conversion silently changed behavior. +This document specifies a conversion whose correctness rests on two provable facts rather than on a re-derivation of the operator's per-activation logic. + +## Motivation + +The goal is to drop the `builder_async` dependency for this operator and to make its logic auditable as plain Rust. +The async lowering also produces large debug-mode stack frames, but the stack-overflow fix (PR 36538) is a separate concern and is out of scope here. +The async state machine's semantics are subtle: it blocks on two independent inputs and its correctness depends on the precise interleaving of those blocking points. +A naive flattening into a per-activation drain loses that interleaving, which is how both prior attempts introduced bugs. + +## Scope + +In scope: + +* `txns_progress_frontiers` only. + +Out of scope: + +* `txns_progress_source_global`, whose awaits are genuine persist I/O (`rx.recv`, `open_writer`, `data_subscribe`) and which legitimately needs async. +* The `forget_at` debug-mode stack-overflow boxing (PR 36538). + +## Reference: The async operator's semantics + +The async operator holds one output capability `cap`; `cap.time()` records how far the passthrough input has been copied. +It maintains `remap: Option>`, initialized to `{physical: min, logical: min}`, where `None` signals shutdown. +A `DataRemapEntry` carries the invariant `physical_upper <= logical_upper`, and `[physical_upper, logical_upper)` is known empty of writes in the definite sense. + +The operator blocks on exactly one input at a time, selected by a single comparison: + +* When `remap.physical_upper <= cap.time()` (enough data copied), it advances `cap` to `logical_upper` and blocks on the **remap** input for the next entry. + The passthrough frontier is not consulted in this mode. +* Otherwise (more data needed), it blocks on the **passthrough** input, emitting data at `cap` and advancing `cap` via passthrough progress. + The `until` and empty-frontier shutdown checks run only in this mode. + +```mermaid +stateDiagram-v2 + [*] --> WaitingForPassthrough + WaitingForPassthrough --> WaitingForRemap: physical_upper <= cap.time() + WaitingForRemap --> WaitingForPassthrough: next remap entry has larger physical_upper + WaitingForRemap --> [*]: remap input closes + WaitingForPassthrough --> [*]: until <= frontier, or empty frontier +``` + +The "two modes" are not extra state; they are derived from `remap.physical_upper <= cap.time()`. + +## Prior failures + +All three known bugs came from flattening the two independently-blocking awaits into one stateless per-activation `for_each` drain: + +* **PER-4 stall** (fixed in 36537): the async impl drops the last remap entry when the remap input reaches the empty antichain, after which `cap` can only advance via the passthrough frontier, which is bounded by the data shard's physical upper. + `cap` then stalls below `logical_upper`. +* **replica-targeted-select-abort** (addressed in 36537): consulting the passthrough frontier while waiting for remap drops the capability prematurely, so `SELECT AS OF MAX` completes empty instead of blocking. +* **SQL-299 data loss** (attempted in 36548): applying the `until`-driven capability drop before draining the buffered passthrough data discards buffered rows at the tail of a `SUBSCRIBE ... UP TO`. + +A mechanical async-to-sync recipe does not prevent these, because the hard part is semantic, not mechanical. + +## Approach + +Keep the operator as a synchronous `OperatorBuilderRc` with `builder_async::button` providing the `PressOnDropButton` return. +Persist three pieces of state across activations in the schedule closure: + +* `capability: Option>` — `cap.time()` is how far passthrough data has been copied; `None` means shut down. +* `remap: DataRemapEntry` — the last observed entry, retained even after the remap input closes; initialized to `{min, min}`. +* `remap_closed: bool` — whether the remap input has reached the empty antichain. + +No `VecDeque` and no `latest_remap_log` (both present in 36548). +The stepwise advance those provided is recovered by folding remap eagerly each activation plus the per-activation decision below. + +### The two facts + +Correctness reduces to two provable facts. + +**Fact 1 — buffered passthrough data is always safe to emit at the pre-activation cap.** +`cap` only moves forward. +Before an activation, every data record with time `< cap.time()` was already emitted, because `cap` is only advanced past a time after the data input frontier passed it. +So every record still buffered has time `>= cap.time()`, and emitting at the pre-activation `cap` satisfies `send_time <= record_time` unconditionally. + +**Fact 2 — no data exists in `[physical_upper, logical_upper)`.** +This is the definition of `DataRemapEntry`. +When the decision logic downgrades `cap` to `logical_upper`, every not-yet-emitted record has time `< physical_upper` (already emitted) or `>= logical_upper`, never strictly between. +So the remap-driven downgrade never strands a record below the new cap. + +### Per-activation algorithm + +```text +1. if shutdown button pressed -> capability = None +2. drain remap_input.for_each -> fold into `remap`: + keep larger logical_upper, assert physical monotone +3. fold remap frontier: Some(l) -> bump remap.logical_upper + None -> remap_closed = true +4. EMIT all buffered passthrough data at current cap // Fact 1; BEFORE any downgrade +5. waiting_for_remap = !remap_closed && remap.physical_upper <= cap.time() + if waiting_for_remap: + if cap.time() < remap.logical_upper: cap.downgrade(logical_upper) // Fact 2 + else: + pf = passthrough frontier + if until <= pf -> capability = None + else if pf == empty -> capability = None + else if cap.time() < pf -> cap.downgrade(pf) +6. after remap close, still advance cap to logical_upper // Fact 2 / PER-4 divergence +``` + +Step 4 before step 5 is the SQL-299 fix, forced by Fact 1 rather than bolted on. +Steps 2 and 3 plus the `waiting_for_remap` gate are the replica-select-abort fix. + +The async `loop`/`continue` that walked remap entries one at a time collapses here, because step 2 folds all buffered remap entries eagerly. +If the differential test (below) surfaces a stepwise case this misses — for example a passthrough frontier sitting between two entries' physical uppers — step 5 becomes an inner fixpoint loop over the folded remap state. +That uncertainty is exactly what the differential test resolves. + +### Intentional divergence from async + +The async impl sets `remap = None` on remap-input close, which disables the whole `cap.downgrade(logical_upper)` path and leaves the passthrough frontier as the only driver. +That frontier is bounded by the physical upper, producing the PER-4 stall. +The sync impl deliberately diverges: it **retains the last `remap`** after close and keeps advancing `cap` to `logical_upper` (step 6). +This is the single legitimate point of divergence from the async reference, and the differential harness must special-case it. + +## Testing + +Tests are built in this order. + +1. **Regression tests, written first against the current async impl, which must pass.** + Deterministic, single-worker, hand-driven `worker.step()`: + * PER-4 stall: `cap` advances to `logical_upper` after the remap input closes. + * replica-targeted-select-abort: `SELECT AS OF MAX` blocks rather than completing empty. + * SQL-299: `SUBSCRIBE ... UP TO` returns the full row set with no tail loss. +2. **Differential test, the backstop against unknown interleavings.** + Generate randomized interleavings of remap entries, passthrough data, and frontier advances; drive the async and sync operators on the identical schedule; assert identical emitted data and capability frontier. + The only allowed divergence is after the remap input closes, where the sync impl may advance further (PER-4): there, assert the sync frontier is `>=` the async frontier rather than equal. +3. **Existing crate tests stay green.** + `data_subscribe`, `as_of_until`, `subscribe_shard_finalize`, `subscribe_shard_register_forget`. + +Regression tests for known bugs are necessary but insufficient: they guard only against failures already found. +Both prior attempts died from unknown interleavings, which only the differential test catches. + +## Risks + +* The sync timely API exposes data via `for_each` and the frontier as a post-activation snapshot, not as the interleaved one-at-a-time event stream the async impl consumed. + The conversion therefore relies on Fact 1 and Fact 2 to reorder work safely rather than reproducing the interleaving directly. +* If step 5's straight-line decision proves insufficient, it escalates to a fixpoint loop; the differential test gates this decision. From 54033dcd6cedfb209e6db2f903bfefd54380d079 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 May 2026 21:33:29 -0400 Subject: [PATCH 02/10] txn-wal: add differential test harness for txns_progress_frontiers Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 149 +++++++++++++++++++++++++++++++++++- 1 file changed, 148 insertions(+), 1 deletion(-) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index c6a4d8db4eb4d..5f498f4629a50 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -38,7 +38,7 @@ use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::capture::Event; use timely::dataflow::operators::vec::{Broadcast, Map}; -use timely::dataflow::operators::{Capture, Leave, Probe}; +use timely::dataflow::operators::{Capture, Input, Leave, Probe}; use timely::dataflow::{ProbeHandle, Scope, StreamVec}; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; @@ -752,6 +752,153 @@ mod tests { use super::*; + /// One scripted action applied to the operator's two inputs. + #[derive(Debug, Clone)] + enum Action { + /// Send a `DataRemapEntry` on the remap input. + Remap { + physical_upper: u64, + logical_upper: u64, + }, + /// Advance the remap input frontier to `ts` (empty antichain if `None`). + RemapFrontier(Option), + /// Send passthrough data records (as `(payload, time)`), then leave them buffered. + Pass { records: Vec<(i64, u64)> }, + /// Advance the passthrough input frontier to `ts` (empty antichain if `None`). + PassFrontier(Option), + /// Step the worker once. + Step, + } + + /// Runs `schedule` against the operator built by `build`, returning the + /// captured output events and the final exclusive output frontier. Each + /// event is shaped as `(payload, time, count)`, where `count` is synthesized + /// as `1` so the output looks like a differential collection. + fn run_schedule( + build: impl for<'a> Fn( + StreamVec<'a, u64, DataRemapEntry>, + StreamVec<'a, u64, i64>, + Antichain, + ) -> (StreamVec<'a, u64, i64>, PressOnDropButton), + until: Antichain, + schedule: &[Action], + ) -> (Vec<(i64, u64, i64)>, u64) { + let mut worker = Worker::new( + WorkerConfig::default(), + timely::communication::Allocator::Thread( + timely::communication::allocator::Thread::default(), + ), + Some(std::time::Instant::now()), + ); + + let (mut remap_handle, mut pass_handle, probe, capture) = + worker.dataflow::(|scope| { + let (remap_handle, remap_stream) = scope.new_input::>>(); + let (pass_handle, pass_stream) = scope.new_input::>(); + let (out, _button) = build(remap_stream, pass_stream, until.clone()); + let probe = ProbeHandle::new(); + let out = out.probe_with(&probe); + (remap_handle, pass_handle, probe, out.capture()) + }); + + // timely input handles can only `advance_to` forward in time. Track the + // last time used on each input so we can fail loudly with a useful + // message instead of panicking deep inside timely on a decreasing time. + let mut last_remap_ts = 0u64; + let mut last_pass_ts = 0u64; + for action in schedule { + match action.clone() { + // `Remap` is a `send` at the handle's current time, so it carries + // no explicit time and needs no monotonicity assert. + Action::Remap { + physical_upper, + logical_upper, + } => remap_handle.send(DataRemapEntry { + physical_upper, + logical_upper, + }), + Action::RemapFrontier(Some(ts)) => { + assert!( + ts >= last_remap_ts, + "Action::RemapFrontier time {ts} < previous remap time {last_remap_ts}; per-input times must be non-decreasing" + ); + last_remap_ts = ts; + remap_handle.advance_to(ts); + } + Action::RemapFrontier(None) => { + last_remap_ts = u64::MAX; + remap_handle.advance_to(u64::MAX); + } + Action::Pass { records } => { + for (payload, time) in records { + assert!( + time >= last_pass_ts, + "Action::Pass time {time} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing" + ); + last_pass_ts = time; + // `advance_to` is what makes each record's time visible to + // the operator; the subsequent `send` emits the payload at + // that time. Both impls consume the identical schedule, so + // the exact send mechanics need only be self-consistent. + pass_handle.advance_to(time); + pass_handle.send(payload); + } + } + Action::PassFrontier(Some(ts)) => { + assert!( + ts >= last_pass_ts, + "Action::PassFrontier time {ts} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing" + ); + last_pass_ts = ts; + pass_handle.advance_to(ts); + } + Action::PassFrontier(None) => { + last_pass_ts = u64::MAX; + pass_handle.advance_to(u64::MAX); + } + Action::Step => { + worker.step(); + } + } + } + // Drain: flush inputs and step until the output probe frontier stops + // advancing. A hard cap PANICS so a buggy operator that never settles + // fails loudly instead of silently returning partial results. + remap_handle.flush(); + pass_handle.flush(); + let mut last = probe.with_frontier(|f| f.to_owned()); + let mut stable = 0; + for step in 0.. { + assert!( + step < 4096, + "run_schedule did not quiesce within 4096 steps" + ); + worker.step(); + let now = probe.with_frontier(|f| f.to_owned()); + if now == last { + stable += 1; + // Require a few consecutive no-change steps so in-flight messages flush. + if stable >= 8 { + break; + } + } else { + stable = 0; + last = now; + } + } + + let frontier = probe.with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX)); + let mut output = Vec::new(); + while let Ok(event) = capture.try_recv() { + if let Event::Messages(time, msgs) = event { + for payload in msgs { + output.push((payload, time, 1)); + } + } + } + (output, frontier) + } + impl TxnsHandle where K: Debug + Codec, From 1f249dc062659ec63462e41522fe1fd32aadd80d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 May 2026 22:01:04 -0400 Subject: [PATCH 03/10] txn-wal: rename async frontiers operator to _async (oracle) Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 5f498f4629a50..a288944fc807d 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -129,7 +129,7 @@ where // Each of the `txns_frontiers` workers wants the full copy of the remap // information. let remap = remap.broadcast(); - let (passthrough, frontiers_button) = txns_progress_frontiers::( + let (passthrough, frontiers_button) = txns_progress_frontiers_async::( remap, passthrough, name, @@ -228,7 +228,7 @@ where (remap_stream, shutdown_button.press_on_drop()) } -fn txns_progress_frontiers<'scope, K, V, T, D, P, C>( +fn txns_progress_frontiers_async<'scope, K, V, T, D, P, C>( remap: StreamVec<'scope, T, DataRemapEntry>, passthrough: StreamVec<'scope, T, P>, name: &str, From bbc297f4977fd97882df05ed24732d662ab59be8 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 May 2026 22:03:45 -0400 Subject: [PATCH 04/10] txn-wal: deasync txns_progress_frontiers Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 196 +++++++++++++++++++++++++++++++++++- 1 file changed, 192 insertions(+), 4 deletions(-) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index a288944fc807d..695b1af8924ed 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -19,6 +19,7 @@ use std::time::Duration; use differential_dataflow::Hashable; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; +#[cfg(test)] use futures::StreamExt; use mz_dyncfg::{Config, ConfigSet}; use mz_ore::cast::CastFrom; @@ -30,15 +31,20 @@ use mz_persist_client::{Diagnostics, PersistClient, ShardId}; use mz_persist_types::codec_impls::{StringSchema, UnitSchema}; use mz_persist_types::txn::TxnsCodec; use mz_persist_types::{Codec, Codec64, StepForward}; +#[cfg(test)] +use mz_timely_util::builder_async::{AsyncInputHandle, Event as AsyncEvent, InputConnection}; use mz_timely_util::builder_async::{ - AsyncInputHandle, Event as AsyncEvent, InputConnection, - OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, + OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, button, }; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; +#[cfg(test)] +use timely::dataflow::operators::Input; use timely::dataflow::operators::capture::Event; +use timely::dataflow::operators::generic::OutputBuilder; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; use timely::dataflow::operators::vec::{Broadcast, Map}; -use timely::dataflow::operators::{Capture, Input, Leave, Probe}; +use timely::dataflow::operators::{Capture, Leave, Probe}; use timely::dataflow::{ProbeHandle, Scope, StreamVec}; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; @@ -129,7 +135,7 @@ where // Each of the `txns_frontiers` workers wants the full copy of the remap // information. let remap = remap.broadcast(); - let (passthrough, frontiers_button) = txns_progress_frontiers_async::( + let (passthrough, frontiers_button) = txns_progress_frontiers::( remap, passthrough, name, @@ -228,6 +234,187 @@ where (remap_stream, shutdown_button.press_on_drop()) } +/// Synchronous port of [`txns_progress_frontiers_async`], which is retained as +/// a `#[cfg(test)]` reference oracle. The block ordering inside the schedule +/// closure is load-bearing: buffered passthrough data is emitted at the +/// pre-activation capability BEFORE any capability downgrade, which keeps the +/// differential invariant `send_time <= record_time` and avoids dropping +/// buffered rows when the passthrough frontier crosses `until` in the same +/// activation (SQL-299). Do not reorder. +fn txns_progress_frontiers<'scope, K, V, T, D, P, C>( + remap: StreamVec<'scope, T, DataRemapEntry>, + passthrough: StreamVec<'scope, T, P>, + name: &str, + data_id: ShardId, + until: Antichain, + unique_id: u64, +) -> (StreamVec<'scope, T, P>, PressOnDropButton) +where + K: Debug + Codec, + V: Debug + Codec, + T: Timestamp + Lattice + TotalOrder + StepForward + Codec64, + D: Clone + 'static + Monoid + Codec64 + Send + Sync, + P: Debug + Clone + 'static, + C: TxnsCodec, +{ + let scope = passthrough.scope(); + let name = format!("txns_progress_frontiers({})", name); + let mut builder = OperatorBuilderRc::new(name.clone(), scope.clone()); + let info = builder.operator_info(); + let name = format!( + "{} [{}] {}/{} {:.9}", + name, + unique_id, + scope.index(), + scope.peers(), + data_id.to_string(), + ); + let (passthrough_output, passthrough_stream) = builder.new_output::>(); + let mut passthrough_output = OutputBuilder::from(passthrough_output); + // Both inputs are disconnected from the output: capability advancement is + // driven manually based on the remap stream and the passthrough frontier. + let mut remap_input = builder.new_input_connection(remap, Pipeline, []); + let mut passthrough_input = builder.new_input_connection(passthrough, Pipeline, []); + + let (shutdown_handle, shutdown_button) = button(scope, info.address); + + builder.build(move |capabilities| { + // The output capability's time tracks how far we've progressed in + // copying along the passthrough input. `None` indicates that we've + // dropped the capability to shut down. + let [cap]: [_; 1] = capabilities.try_into().expect("one capability per output"); + let mut capability = Some(cap); + // The most recently observed remap state. Retained even after the remap + // input closes so we can still advance the output capability to the + // last known `logical_upper` while the passthrough input is draining. + // This deliberately diverges from the async impl, which dropped the + // entry on close and stalled (PER-4). + let mut remap = DataRemapEntry { + physical_upper: T::minimum(), + logical_upper: T::minimum(), + }; + // Whether the remap input has reached the empty antichain. + let mut remap_closed = false; + + move |frontiers| { + if shutdown_handle.local_pressed() { + capability = None; + } + + // Fold new DataRemapEntries, keeping the one with the largest + // logical_upper. The ordering of incoming entries is not assumed. + remap_input.for_each(|_input_cap, data| { + for x in data.drain(..) { + debug!("{} got remap {:?}", name, x); + if remap.logical_upper < x.logical_upper { + assert!( + remap.physical_upper <= x.physical_upper, + "previous remap physical upper {:?} is ahead of new remap physical upper {:?}", + remap.physical_upper, + x.physical_upper, + ); + // TODO: If the physical upper has advanced, that's a very + // strong hint that the data shard is about to be written to. + // Because the data shard's upper advances sparsely (on write, + // but not on passage of time) which invalidates the "every 1s" + // assumption of the default tuning, we've had to de-tune the + // listen sleeps on the paired persist_source. Maybe we use "one + // state" to wake it up in case pubsub doesn't and remove the + // listen polling entirely? (NB: This would have to happen in + // each worker so that it's guaranteed to happen in each + // process.) + remap = x; + } + } + }); + + // Apply the remap input's frontier as a `logical_upper` bump. We do + // not discard `remap` on the empty antichain: the last observed + // entry remains valid and lets the capability still advance past + // `physical_upper` while the passthrough input drains. + if let Some(logical_upper) = frontiers[0].frontier().as_option() { + if remap.logical_upper < *logical_upper { + remap.logical_upper = logical_upper.clone(); + } + } else { + remap_closed = true; + } + + debug!("{} remap {:?} remap_closed={}", name, remap, remap_closed); + + // Emit buffered passthrough data at the current (pre-downgrade) + // capability, BEFORE any downgrade below. `cap.time()` here equals + // the pre-activation frontier, which is `<=` every buffered + // record's time, so the differential invariant `send_time <= + // record_time` holds. Doing this before the `until`-driven drop is + // the SQL-299 fix. NB: nothing to do for `until` because the + // shard_source (before) and mfp_and_decode (after) filter. + if let Some(cap) = capability.as_ref() { + let mut output = passthrough_output.activate(); + passthrough_input.for_each(|_input_cap, data| { + debug!("{} emitting data {:?}", name, data); + output.session(cap).give_container(data); + }); + } else { + // Still drain to avoid stalling the dataflow. + passthrough_input.for_each(|_input_cap, _data| {}); + } + + // Only consult the passthrough frontier when not waiting on remap to + // push `physical_upper` past the capability. While `physical_upper + // <= cap.time()` and the remap input is open, the next expected + // event is a remap update that jumps `cap` to `logical_upper`, not a + // passthrough advance. Consulting the passthrough frontier then can + // drop the capability prematurely (e.g. `SELECT AS OF MAX`, where no + // remap update ever arrives and the passthrough side reports the + // empty antichain). Once remap is closed, the passthrough frontier + // is the only remaining driver. + let waiting_for_remap = match capability.as_ref() { + Some(cap) => !remap_closed && remap.physical_upper.less_equal(cap.time()), + None => false, + }; + if !waiting_for_remap { + let pass_frontier = frontiers[1].frontier(); + if PartialOrder::less_equal(&until.borrow(), &pass_frontier) { + debug!( + "{} progress {:?} has passed until {:?}", + name, + pass_frontier, + until.elements(), + ); + capability = None; + } else if let Some(new_progress) = pass_frontier.as_option() { + if let Some(cap) = capability.as_mut() { + if cap.time() < new_progress { + debug!("{} downgrading cap to {:?}", name, new_progress); + cap.downgrade(new_progress); + } + } + } else { + // Reached the empty frontier; shut down. + capability = None; + } + } + + // If we've copied passthrough data to at least `physical_upper`, we + // can artificially advance the output to `logical_upper`. By the + // emptiness of `[physical_upper, logical_upper)`, no buffered record + // lies below `logical_upper`, so this never strands data. + if let Some(cap) = capability.as_mut() { + assert!(remap.physical_upper <= remap.logical_upper); + let phys_reached = remap.physical_upper.less_equal(cap.time()); + let logical_ahead = cap.time() < &remap.logical_upper; + if phys_reached && logical_ahead { + cap.downgrade(&remap.logical_upper); + } + } + } + }); + + (passthrough_stream, shutdown_button.press_on_drop()) +} + +#[cfg(test)] fn txns_progress_frontiers_async<'scope, K, V, T, D, P, C>( remap: StreamVec<'scope, T, DataRemapEntry>, passthrough: StreamVec<'scope, T, P>, @@ -355,6 +542,7 @@ where (passthrough_stream, shutdown_button.press_on_drop()) } +#[cfg(test)] async fn txns_progress_frontiers_read_remap_input( name: &str, input: &mut AsyncInputHandle>, C>, From e3d03d9f40e70609ea916b8105ee0848c6828847 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 30 May 2026 07:06:25 -0400 Subject: [PATCH 05/10] txn-wal: regression test for SQL-299 UP TO tail loss Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 107 +++++++++++++++++++++++++++++++----- 1 file changed, 92 insertions(+), 15 deletions(-) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 695b1af8924ed..ea5df8a568dac 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -979,14 +979,17 @@ mod tests { Some(std::time::Instant::now()), ); - let (mut remap_handle, mut pass_handle, probe, capture) = + // The button must outlive the run: dropping it presses the shutdown + // handle, which makes the operator drop its capability on the next + // activation. Hold it until after the drain loop completes. + let (remap_handle, pass_handle, probe, capture, _button) = worker.dataflow::(|scope| { let (remap_handle, remap_stream) = scope.new_input::>>(); let (pass_handle, pass_stream) = scope.new_input::>(); - let (out, _button) = build(remap_stream, pass_stream, until.clone()); + let (out, button) = build(remap_stream, pass_stream, until.clone()); let probe = ProbeHandle::new(); let out = out.probe_with(&probe); - (remap_handle, pass_handle, probe, out.capture()) + (remap_handle, pass_handle, probe, out.capture(), button) }); // timely input handles can only `advance_to` forward in time. Track the @@ -994,6 +997,13 @@ mod tests { // message instead of panicking deep inside timely on a decreasing time. let mut last_remap_ts = 0u64; let mut last_pass_ts = 0u64; + // Held in `Option`s so a `*Frontier(None)` action can `take` and drop the + // handle, which closes the input to the empty antichain. Advancing to + // `u64::MAX` is NOT equivalent: it leaves the input's frontier at + // `Some(u64::MAX)`, which the operator (correctly) treats as a finite + // `logical_upper`/passthrough advance rather than a closed input. + let mut remap_handle = Some(remap_handle); + let mut pass_handle = Some(pass_handle); for action in schedule { match action.clone() { // `Remap` is a `send` at the handle's current time, so it carries @@ -1001,23 +1011,31 @@ mod tests { Action::Remap { physical_upper, logical_upper, - } => remap_handle.send(DataRemapEntry { - physical_upper, - logical_upper, - }), + } => remap_handle + .as_mut() + .expect("remap input still open") + .send(DataRemapEntry { + physical_upper, + logical_upper, + }), Action::RemapFrontier(Some(ts)) => { assert!( ts >= last_remap_ts, "Action::RemapFrontier time {ts} < previous remap time {last_remap_ts}; per-input times must be non-decreasing" ); last_remap_ts = ts; - remap_handle.advance_to(ts); + remap_handle + .as_mut() + .expect("remap input still open") + .advance_to(ts); } + // Drop the handle to close the input to the empty antichain. Action::RemapFrontier(None) => { last_remap_ts = u64::MAX; - remap_handle.advance_to(u64::MAX); + drop(remap_handle.take()); } Action::Pass { records } => { + let handle = pass_handle.as_mut().expect("passthrough input still open"); for (payload, time) in records { assert!( time >= last_pass_ts, @@ -1028,8 +1046,8 @@ mod tests { // the operator; the subsequent `send` emits the payload at // that time. Both impls consume the identical schedule, so // the exact send mechanics need only be self-consistent. - pass_handle.advance_to(time); - pass_handle.send(payload); + handle.advance_to(time); + handle.send(payload); } } Action::PassFrontier(Some(ts)) => { @@ -1038,11 +1056,15 @@ mod tests { "Action::PassFrontier time {ts} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing" ); last_pass_ts = ts; - pass_handle.advance_to(ts); + pass_handle + .as_mut() + .expect("passthrough input still open") + .advance_to(ts); } + // Drop the handle to close the input to the empty antichain. Action::PassFrontier(None) => { last_pass_ts = u64::MAX; - pass_handle.advance_to(u64::MAX); + drop(pass_handle.take()); } Action::Step => { worker.step(); @@ -1052,8 +1074,12 @@ mod tests { // Drain: flush inputs and step until the output probe frontier stops // advancing. A hard cap PANICS so a buggy operator that never settles // fails loudly instead of silently returning partial results. - remap_handle.flush(); - pass_handle.flush(); + if let Some(handle) = remap_handle.as_mut() { + handle.flush(); + } + if let Some(handle) = pass_handle.as_mut() { + handle.flush(); + } let mut last = probe.with_frontier(|f| f.to_owned()); let mut stable = 0; for step in 0.. { @@ -1305,4 +1331,55 @@ mod tests { assert!(max_progress_ts < until, "{max_progress_ts} < {until}"); } } + + /// Builds the sync operator for the harness. + fn build_sync<'a>( + remap: StreamVec<'a, u64, DataRemapEntry>, + pass: StreamVec<'a, u64, i64>, + until: Antichain, + ) -> (StreamVec<'a, u64, i64>, PressOnDropButton) { + txns_progress_frontiers::( + remap, + pass, + "test", + ShardId::new(), + until, + 0, + ) + } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // too slow + fn frontiers_sql_299_up_to_no_tail_loss() { + // until = 0. A remap entry with physical_upper = 5 keeps the operator + // out of the `waiting_for_remap` state (5 > cap.time() = 0), so the + // until check actually fires. Buffer a record at time 0 (payload 4) and + // leave it pending. In the single activation, the operator sees both the + // buffered record and the passthrough frontier at 0, which already + // satisfies `until <= pass_frontier` and drops the capability. The + // record must be emitted before that drop, not discarded. Buffering at + // time 0 (the cap's time) is what makes the record and the + // until-crossing land in the same activation — with the ordered + // `new_input` handle, advancing the passthrough frontier past the record + // would deliver the record in an earlier activation and mask the bug. + let schedule = vec![ + Action::Remap { + physical_upper: 5, + logical_upper: 5, + }, + Action::RemapFrontier(Some(5)), + Action::Pass { + records: vec![(4, 0)], + }, + Action::PassFrontier(None), + Action::Step, + ]; + let (output, _frontier) = run_schedule(build_sync, Antichain::from_elem(0), &schedule); + let payloads: Vec = output.iter().map(|(p, _, _)| *p).collect(); + assert!( + payloads.contains(&4), + "buffered record at time 0 must be emitted before until-driven shutdown, got {output:?}" + ); + } + } From dcc35b2d2d351c1562d803bfefa5bfe3a949165a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 30 May 2026 07:06:58 -0400 Subject: [PATCH 06/10] txn-wal: regression test for PER-4 advance after remap close Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index ea5df8a568dac..56ade1e831972 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -1382,4 +1382,35 @@ mod tests { ); } + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // too slow + fn frontiers_per4_advance_after_remap_close() { + // Emit a remap entry whose logical_upper (10) exceeds its physical_upper + // (5). Close the remap input while the passthrough frontier is still + // below physical_upper (so the capability has NOT yet advanced to + // logical_upper), then advance the passthrough frontier up to + // physical_upper (5). The capability must still advance to logical_upper + // (10) using the remap entry retained across the close, not stall at the + // passthrough frontier (5). The async impl dropped the entry on close and + // stalled here (PER-4). + let schedule = vec![ + Action::Remap { + physical_upper: 5, + logical_upper: 10, + }, + Action::RemapFrontier(Some(10)), + Action::Step, + // Close remap before the passthrough frontier reaches physical_upper. + Action::RemapFrontier(None), + Action::Step, + // Only now does the passthrough frontier reach physical_upper. + Action::PassFrontier(Some(5)), + Action::Step, + ]; + let (_output, frontier) = run_schedule(build_sync, Antichain::new(), &schedule); + assert_eq!( + frontier, 10, + "capability must advance to logical_upper after remap close, got {frontier}" + ); + } } From 61098bd83134d5b96541c4f2e961d5adfc0a691f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 30 May 2026 07:07:33 -0400 Subject: [PATCH 07/10] txn-wal: regression test for SELECT AS OF MAX blocking Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 56ade1e831972..22b85d8b2ab20 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -1413,4 +1413,28 @@ mod tests { "capability must advance to logical_upper after remap close, got {frontier}" ); } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // too slow + fn frontiers_select_as_of_max_blocks() { + // Mimic `SELECT AS OF MAX`: a remap entry exists with physical_upper == 0 + // (so physical_upper <= cap.time() and the operator waits for remap), no + // further remap update arrives, and the passthrough frontier reaches the + // empty antichain. The operator must NOT drop its capability (must keep + // blocking), so the output frontier stays finite (0), not u64::MAX. + let schedule = vec![ + Action::Remap { + physical_upper: 0, + logical_upper: 0, + }, + Action::RemapFrontier(Some(0)), + Action::PassFrontier(None), + Action::Step, + ]; + let (_output, frontier) = run_schedule(build_sync, Antichain::new(), &schedule); + assert_eq!( + frontier, 0, + "operator must block (retain capability) while waiting for remap, got {frontier}" + ); + } } From a64d4e9a77f461aaa311e8dd5ebc1fd906845cbd Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 30 May 2026 07:20:44 -0400 Subject: [PATCH 08/10] txn-wal: relax as_of_until to the deasync stream-timestamp contract The deasynced operator emits each batch at its current capability, so per-batch stream timestamps depend on scheduling cadence and are not contractual. Assert the record set and the differential invariant (stream ts <= record time) instead of exact stream timestamps. Co-Authored-By: Claude Opus 4.8 --- src/txn-wal/src/operator.rs | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 22b85d8b2ab20..7e46827d4b319 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -1313,11 +1313,39 @@ mod tests { Event::Progress(progress) => Either::Left(progress), Event::Messages(ts, data) => Either::Right((ts, data)), }); - let expected = vec![ - (3, vec![("2".to_owned(), 3, 1), ("3".to_owned(), 3, 1)]), - (3, vec![("4".to_owned(), 4, 1)]), + // Aggregate the captured records, ignoring the stream-level + // timestamp on each batch. The operator emits each container at + // whatever capability it currently holds (which is determined by + // its scheduling cadence and the upstream frontiers it has + // observed), so the per-batch `ts` is not deterministic and not + // part of the operator's contract. Per-record `(key, time, diff)` + // tuples are what callers see, and the differential invariant + // (stream `ts <= record time`) is checked separately below. + let mut actual_records: Vec<(String, u64, i64)> = actual_events + .iter() + .flat_map(|(_ts, data)| data.iter().cloned()) + .collect(); + actual_records.sort(); + let expected_records: Vec<(String, u64, i64)> = vec![ + ("2".to_owned(), 3, 1), + ("3".to_owned(), 3, 1), + ("4".to_owned(), 4, 1), ]; - assert_eq!(actual_events, expected); + assert_eq!(actual_records, expected_records); + + // Verify the differential invariant: each batch's stream + // timestamp `ts` must be `<= record_time` for every record it + // carries. The operator's contract requires this so that + // downstream differential operators can integrate the records + // at their declared times. + for (ts, data) in &actual_events { + for (_key, record_ts, _diff) in data { + assert!( + ts <= record_ts, + "differential invariant violated: stream ts {ts} > record time {record_ts}", + ); + } + } // The number and contents of progress messages is not guaranteed and // depends on the downgrade behavior. The only thing we can assert is From c489e1d7301e0bbd9038aa282e20b959861b0ff9 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 30 May 2026 07:51:40 -0400 Subject: [PATCH 09/10] txn-wal: fuzz-test txns_progress_frontiers; remove async impl Replace the planned differential-vs-async-oracle test with an oracle-free property fuzz test: over random input interleavings, assert the sync operator emits exactly the sent payload multiset (no loss or duplication) and never prematurely drops its capability. This is sound under arbitrary schedules and directly targets the data-loss and premature-shutdown bug classes. The async txns_progress_frontiers was kept only as a potential test oracle, but it strands data on schedules that violate the remap emptiness contract (it trusts the contract and advances its capability past buffered data), so it is not a sound oracle in a synthetic harness. Remove it and its read_remap_input helper, along with the now-unused async imports. Co-Authored-By: Claude Opus 4.8 --- .../20260529_txn_wal_frontiers_deasync.md | 38 ++- src/txn-wal/src/operator.rs | 316 +++++++----------- 2 files changed, 147 insertions(+), 207 deletions(-) diff --git a/doc/developer/design/20260529_txn_wal_frontiers_deasync.md b/doc/developer/design/20260529_txn_wal_frontiers_deasync.md index b5eb60c40e917..0214a289fce28 100644 --- a/doc/developer/design/20260529_txn_wal_frontiers_deasync.md +++ b/doc/developer/design/20260529_txn_wal_frontiers_deasync.md @@ -109,37 +109,43 @@ So the remap-driven downgrade never strands a record below the new cap. Step 4 before step 5 is the SQL-299 fix, forced by Fact 1 rather than bolted on. Steps 2 and 3 plus the `waiting_for_remap` gate are the replica-select-abort fix. -The async `loop`/`continue` that walked remap entries one at a time collapses here, because step 2 folds all buffered remap entries eagerly. -If the differential test (below) surfaces a stepwise case this misses — for example a passthrough frontier sitting between two entries' physical uppers — step 5 becomes an inner fixpoint loop over the folded remap state. -That uncertainty is exactly what the differential test resolves. +The async `loop`/`continue` that walked remap entries one at a time collapses here, because step 2 folds all buffered remap entries to the one with the largest `logical_upper` (asserting `physical_upper` stays monotone), and step 3 bumps `logical_upper` from the remap frontier. +When several entries arrive in one activation this skips the intermediate `logical_upper`s and advances `cap` straight to the latest once its `physical_upper` is reached; the frontier still reaches the same final value, so it is a granularity, not a correctness, difference. +This is the structure that shipped in the reverted PR 36537 (whose only defect was the SQL-299 ordering bug fixed by step 4), so it is production-proven for frontier advancement; the fuzz test additionally confirms no data loss under arbitrary interleavings. ### Intentional divergence from async The async impl sets `remap = None` on remap-input close, which disables the whole `cap.downgrade(logical_upper)` path and leaves the passthrough frontier as the only driver. That frontier is bounded by the physical upper, producing the PER-4 stall. The sync impl deliberately diverges: it **retains the last `remap`** after close and keeps advancing `cap` to `logical_upper` (step 6). -This is the single legitimate point of divergence from the async reference, and the differential harness must special-case it. +This is the single legitimate point of divergence from the async impl. ## Testing -Tests are built in this order. +All tests live in a single-worker harness that drives the operator on a scripted sequence of actions (send remap entry, advance remap frontier, send passthrough data, advance passthrough frontier, step) via `scope.new_input`, captures the output stream, and reports the final output frontier. -1. **Regression tests, written first against the current async impl, which must pass.** - Deterministic, single-worker, hand-driven `worker.step()`: +1. **Targeted regression tests, one per known failure.** + Deterministic, hand-driven schedules: * PER-4 stall: `cap` advances to `logical_upper` after the remap input closes. - * replica-targeted-select-abort: `SELECT AS OF MAX` blocks rather than completing empty. - * SQL-299: `SUBSCRIBE ... UP TO` returns the full row set with no tail loss. -2. **Differential test, the backstop against unknown interleavings.** - Generate randomized interleavings of remap entries, passthrough data, and frontier advances; drive the async and sync operators on the identical schedule; assert identical emitted data and capability frontier. - The only allowed divergence is after the remap input closes, where the sync impl may advance further (PER-4): there, assert the sync frontier is `>=` the async frontier rather than equal. + * replica-targeted-select-abort: `SELECT AS OF MAX` blocks (capability retained) rather than completing empty. + * SQL-299: a record buffered when the passthrough frontier crosses `until` in one activation is emitted, not dropped. + Each test was confirmed to fail when its corresponding fix is reverted in the operator, proving it has teeth. +2. **Oracle-free property fuzz test, the backstop against unknown interleavings.** + Generate randomized interleavings of remap entries, passthrough data, and frontier advances, and assert two sound properties of the sync operator directly: + * No data loss or duplication: the emitted payload multiset equals the sent payload multiset. + The operator passes through all passthrough data while it holds a capability, so this holds under arbitrary interleavings — including schedules that violate the remap emptiness contract, which only strengthens the test. + * No premature shutdown: with `until = ∅` and no passthrough close, the operator never legitimately drops its capability, so the output frontier stays finite. + + An earlier design compared the sync operator against the async impl as a differential oracle. + That was abandoned: the async impl strands data on contract-violating random schedules (it trusts the emptiness contract and advances its capability past buffered data), so it is not a sound oracle in a synthetic harness, and making it sound would require faithfully re-modeling the data-shard/txns protocol in the generator — a second layer of subtle invariants as error-prone as the operator itself. + The direct property assertions catch the same bug classes (data loss, premature shutdown) without that risk. 3. **Existing crate tests stay green.** - `data_subscribe`, `as_of_until`, `subscribe_shard_finalize`, `subscribe_shard_register_forget`. + `data_subscribe`, `subscribe_shard_finalize`, `subscribe_shard_register_forget`, and `as_of_until` (relaxed: the deasynced operator emits each batch at its current capability, so per-batch stream timestamps are cadence-dependent and not contractual; the test now asserts the record set and the differential invariant `stream_ts <= record_time`). -Regression tests for known bugs are necessary but insufficient: they guard only against failures already found. -Both prior attempts died from unknown interleavings, which only the differential test catches. +The async impl is removed entirely once the sync impl lands; it is not retained as a test oracle. ## Risks * The sync timely API exposes data via `for_each` and the frontier as a post-activation snapshot, not as the interleaved one-at-a-time event stream the async impl consumed. The conversion therefore relies on Fact 1 and Fact 2 to reorder work safely rather than reproducing the interleaving directly. -* If step 5's straight-line decision proves insufficient, it escalates to a fixpoint loop; the differential test gates this decision. +* The fuzz test asserts data preservation and non-shutdown but not exact frontier advancement; the specific frontier behaviors (PER-4 advance, AS-OF-MAX block) are covered by the targeted regression tests instead. diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 7e46827d4b319..14c5f6e5e3590 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -19,8 +19,6 @@ use std::time::Duration; use differential_dataflow::Hashable; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; -#[cfg(test)] -use futures::StreamExt; use mz_dyncfg::{Config, ConfigSet}; use mz_ore::cast::CastFrom; use mz_persist_client::cfg::RetryParameters; @@ -31,8 +29,6 @@ use mz_persist_client::{Diagnostics, PersistClient, ShardId}; use mz_persist_types::codec_impls::{StringSchema, UnitSchema}; use mz_persist_types::txn::TxnsCodec; use mz_persist_types::{Codec, Codec64, StepForward}; -#[cfg(test)] -use mz_timely_util::builder_async::{AsyncInputHandle, Event as AsyncEvent, InputConnection}; use mz_timely_util::builder_async::{ OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, button, }; @@ -234,13 +230,11 @@ where (remap_stream, shutdown_button.press_on_drop()) } -/// Synchronous port of [`txns_progress_frontiers_async`], which is retained as -/// a `#[cfg(test)]` reference oracle. The block ordering inside the schedule -/// closure is load-bearing: buffered passthrough data is emitted at the -/// pre-activation capability BEFORE any capability downgrade, which keeps the -/// differential invariant `send_time <= record_time` and avoids dropping -/// buffered rows when the passthrough frontier crosses `until` in the same -/// activation (SQL-299). Do not reorder. +/// The block ordering inside the schedule closure is load-bearing: buffered +/// passthrough data is emitted at the pre-activation capability BEFORE any +/// capability downgrade, which keeps the differential invariant `send_time <= +/// record_time` and avoids dropping buffered rows when the passthrough frontier +/// crosses `until` in the same activation (SQL-299). Do not reorder. fn txns_progress_frontiers<'scope, K, V, T, D, P, C>( remap: StreamVec<'scope, T, DataRemapEntry>, passthrough: StreamVec<'scope, T, P>, @@ -273,6 +267,11 @@ where let mut passthrough_output = OutputBuilder::from(passthrough_output); // Both inputs are disconnected from the output: capability advancement is // driven manually based on the remap stream and the passthrough frontier. + // NB: the output is created BEFORE the inputs on purpose. `new_output` + // connects to whatever inputs already exist (here, none); the `[]` + // connection arg below records the input-to-output summary but does not by + // itself disconnect the output. Creating an input before the output would + // silently connect them and break the manual capability management. let mut remap_input = builder.new_input_connection(remap, Pipeline, []); let mut passthrough_input = builder.new_input_connection(passthrough, Pipeline, []); @@ -374,6 +373,17 @@ where None => false, }; if !waiting_for_remap { + // Apply the passthrough input's frontier. + // + // If `until.less_equal(pass_frontier)`, it means that all + // subsequent batches will contain only times greater or equal + // to `until`, which means they can be dropped in their entirety. + // + // Ideally this check would live in `txns_progress_source`, but + // that turns out to be much more invasive (requires replacing + // lots of `T`s with `Antichain`s). Given that we've been + // thinking about reworking the operators, do the easy but more + // wasteful thing for now. let pass_frontier = frontiers[1].frontier(); if PartialOrder::less_equal(&until.borrow(), &pass_frontier) { debug!( @@ -384,6 +394,10 @@ where ); capability = None; } else if let Some(new_progress) = pass_frontier.as_option() { + // Recall that any reads of the data shard are always + // correct, so given that we've passed through any data from + // the input, that means we're free to pass through frontier + // updates too. if let Some(cap) = capability.as_mut() { if cap.time() < new_progress { debug!("{} downgrading cap to {:?}", name, new_progress); @@ -414,186 +428,6 @@ where (passthrough_stream, shutdown_button.press_on_drop()) } -#[cfg(test)] -fn txns_progress_frontiers_async<'scope, K, V, T, D, P, C>( - remap: StreamVec<'scope, T, DataRemapEntry>, - passthrough: StreamVec<'scope, T, P>, - name: &str, - data_id: ShardId, - until: Antichain, - unique_id: u64, -) -> (StreamVec<'scope, T, P>, PressOnDropButton) -where - K: Debug + Codec, - V: Debug + Codec, - T: Timestamp + Lattice + TotalOrder + StepForward + Codec64, - D: Clone + 'static + Monoid + Codec64 + Send + Sync, - P: Debug + Clone + 'static, - C: TxnsCodec, -{ - let name = format!("txns_progress_frontiers({})", name); - let mut builder = AsyncOperatorBuilder::new(name.clone(), passthrough.scope()); - let name = format!( - "{} [{}] {}/{} {:.9}", - name, - unique_id, - passthrough.scope().index(), - passthrough.scope().peers(), - data_id.to_string(), - ); - let (passthrough_output, passthrough_stream) = - builder.new_output::>>(); - let mut remap_input = builder.new_disconnected_input(remap, Pipeline); - let mut passthrough_input = builder.new_disconnected_input(passthrough, Pipeline); - - let shutdown_button = builder.build(move |capabilities| async move { - let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output"); - - // None is used to indicate that both uppers are the empty antichain. - let mut remap = Some(DataRemapEntry { - physical_upper: T::minimum(), - logical_upper: T::minimum(), - }); - // NB: The following loop uses `cap.time()`` to track how far we've - // progressed in copying along the passthrough input. - loop { - debug!("{} remap {:?}", name, remap); - if let Some(r) = remap.as_ref() { - assert!(r.physical_upper <= r.logical_upper); - // If we've passed through data to at least `physical_upper`, - // then it means we can artificially advance the upper of the - // output to `logical_upper`. This also indicates that we need - // to wait for the next DataRemapEntry. It can either (A) have - // the same physical upper or (B) have a larger physical upper. - // - // - If (A), then we would again satisfy this `physical_upper` - // check, again advance the logical upper again, ... - // - If (B), then we'd fall down to the code below, which copies - // the passthrough data until the frontier passes - // `physical_upper`, then loops back up here. - if r.physical_upper.less_equal(cap.time()) { - if cap.time() < &r.logical_upper { - cap.downgrade(&r.logical_upper); - } - remap = txns_progress_frontiers_read_remap_input( - &name, - &mut remap_input, - r.clone(), - ) - .await; - continue; - } - } - - // This only returns None when there are no more data left. Turn it - // into an empty frontier progress so we can re-use the shutdown - // code below. - let event = passthrough_input - .next() - .await - .unwrap_or_else(|| AsyncEvent::Progress(Antichain::new())); - match event { - // NB: Ignore the data_cap because this input is disconnected. - AsyncEvent::Data(_data_cap, mut data) => { - // NB: Nothing to do here for `until` because both the - // `shard_source` (before this operator) and - // `mfp_and_decode` (after this operator) do the necessary - // filtering. - debug!("{} emitting data {:?}", name, data); - passthrough_output.give_container(&cap, &mut data); - } - AsyncEvent::Progress(new_progress) => { - // If `until.less_equal(new_progress)`, it means that all - // subsequent batches will contain only times greater or - // equal to `until`, which means they can be dropped in - // their entirety. - // - // Ideally this check would live in `txns_progress_source`, - // but that turns out to be much more invasive (requires - // replacing lots of `T`s with `Antichain`s). Given that - // we've been thinking about reworking the operators, do the - // easy but more wasteful thing for now. - if PartialOrder::less_equal(&until, &new_progress) { - debug!( - "{} progress {:?} has passed until {:?}", - name, - new_progress.elements(), - until.elements() - ); - return; - } - // We reached the empty frontier! Shut down. - let Some(new_progress) = new_progress.into_option() else { - return; - }; - - // Recall that any reads of the data shard are always - // correct, so given that we've passed through any data - // from the input, that means we're free to pass through - // frontier updates too. - if cap.time() < &new_progress { - debug!("{} downgrading cap to {:?}", name, new_progress); - cap.downgrade(&new_progress); - } - } - } - } - }); - (passthrough_stream, shutdown_button.press_on_drop()) -} - -#[cfg(test)] -async fn txns_progress_frontiers_read_remap_input( - name: &str, - input: &mut AsyncInputHandle>, C>, - mut remap: DataRemapEntry, -) -> Option> -where - T: Timestamp + TotalOrder, - C: InputConnection, -{ - while let Some(event) = input.next().await { - let xs = match event { - AsyncEvent::Progress(logical_upper) => { - if let Some(logical_upper) = logical_upper.into_option() { - if remap.logical_upper < logical_upper { - remap.logical_upper = logical_upper; - return Some(remap); - } - } - continue; - } - AsyncEvent::Data(_cap, xs) => xs, - }; - for x in xs { - debug!("{} got remap {:?}", name, x); - // Don't assume anything about the ordering. - if remap.logical_upper < x.logical_upper { - assert!( - remap.physical_upper <= x.physical_upper, - "previous remap physical upper {:?} is ahead of new remap physical upper {:?}", - remap.physical_upper, - x.physical_upper, - ); - // TODO: If the physical upper has advanced, that's a very - // strong hint that the data shard is about to be written to. - // Because the data shard's upper advances sparsely (on write, - // but not on passage of time) which invalidates the "every 1s" - // assumption of the default tuning, we've had to de-tune the - // listen sleeps on the paired persist_source. Maybe we use "one - // state" to wake it up in case pubsub doesn't and remove the - // listen polling entirely? (NB: This would have to happen in - // each worker so that it's guaranteed to happen in each - // process.) - remap = x; - } - } - return Some(remap); - } - // remap_input is closed, which indicates the data shard is finished. - None -} - /// The process global [`TxnsRead`] that any operator can communicate with. #[derive(Default, Debug, Clone)] pub struct TxnsContext { @@ -1376,6 +1210,106 @@ mod tests { ) } + /// Generates a random schedule for the no-data-loss fuzz test. Interleaves + /// remap entries/frontiers with passthrough data/frontiers. Payloads are + /// unique and increasing so a single dropped or duplicated record is + /// detectable; per-input times are non-decreasing (the harness requires + /// this). The schedule never closes the passthrough input, and the test + /// uses `until = ∅`, so the operator never has a legitimate reason to shut + /// down and must pass through every record it is given. + /// + /// Schedules are intentionally NOT constrained to respect the remap + /// "[physical_upper, logical_upper) is empty" contract. The no-data-loss + /// property must hold under arbitrary interleavings, so feeding + /// contract-violating schedules only strengthens the test. + fn gen_schedule(seed: u64) -> Vec { + // Simple xorshift RNG for determinism without extra deps. + let mut state = seed.wrapping_add(0x9E3779B97F4A7C15).max(1); + let mut next = || { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + state + }; + + let mut schedule = Vec::new(); + let mut physical = 0u64; + let mut logical = 0u64; + let mut pass_frontier = 0u64; + let mut payload = 0i64; + let mut remap_closed = false; + let steps = 8 + (next() % 16); + for _ in 0..steps { + match next() % 5 { + 0 if !remap_closed => { + physical += next() % 3; + logical = logical.max(physical) + (next() % 4); + schedule.push(Action::Remap { + physical_upper: physical, + logical_upper: logical, + }); + } + 1 if !remap_closed => { + if next() % 8 == 0 { + remap_closed = true; + schedule.push(Action::RemapFrontier(None)); + } else { + logical += next() % 3; + schedule.push(Action::RemapFrontier(Some(logical))); + } + } + 2 => { + let t = pass_frontier + (next() % 3); + pass_frontier = t; + payload += 1; + schedule.push(Action::Pass { + records: vec![(payload, t)], + }); + } + 3 => { + pass_frontier += next() % 3; + schedule.push(Action::PassFrontier(Some(pass_frontier))); + } + _ => schedule.push(Action::Step), + } + schedule.push(Action::Step); + } + schedule + } + + /// Fuzz: under any random interleaving, the deasynced operator must emit + /// every passthrough record it is given (no loss, no duplication) and must + /// not prematurely shut down. With `until = ∅` and no passthrough close, the + /// operator never legitimately drops its capability, so the output frontier + /// must stay finite. + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // too slow + fn frontiers_fuzz_no_data_loss() { + for seed in 0..500u64 { + let schedule = gen_schedule(seed); + let mut sent: Vec = schedule + .iter() + .flat_map(|a| match a { + Action::Pass { records } => records.iter().map(|(p, _)| *p).collect(), + _ => Vec::new(), + }) + .collect(); + let (out, frontier) = run_schedule(build_sync, Antichain::new(), &schedule); + let mut emitted: Vec = out.iter().map(|(p, _, _)| *p).collect(); + sent.sort(); + emitted.sort(); + assert_eq!( + emitted, sent, + "seed {seed}: operator lost or duplicated data\nschedule={schedule:?}\nout={out:?}" + ); + assert_ne!( + frontier, + u64::MAX, + "seed {seed}: operator prematurely shut down (empty output frontier)\nschedule={schedule:?}" + ); + } + } + #[mz_ore::test] #[cfg_attr(miri, ignore)] // too slow fn frontiers_sql_299_up_to_no_tail_loss() { From 4b10171d7b8d9fcc1f99f68e5ac9fa7a607f8a2b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 5 Jun 2026 14:56:20 +0200 Subject: [PATCH 10/10] txn-wal: two-phase shutdown for txns_progress_frontiers Address review: dropping the capability on the local button press alone is weaker than builder_async, which holds capabilities until ALL workers have pressed. An early local drop lets the downstream frontier advance past times whose data this worker discarded while other workers' operator instances still feed downstream (cross-worker teardown skew). Mirror the two-phase semantics via build_reschedule: wedge (retain the capability, leave inputs undrained, reschedule) on the local press, drop the capability and drain inputs once all workers pressed. Also reword "buffered data" comments: the operator consumes pending input, it does not buffer. Co-Authored-By: Claude Opus 4.8 --- .../20260529_txn_wal_frontiers_deasync.md | 10 +++- src/txn-wal/src/operator.rs | 54 +++++++++++++------ 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/doc/developer/design/20260529_txn_wal_frontiers_deasync.md b/doc/developer/design/20260529_txn_wal_frontiers_deasync.md index 0214a289fce28..e1c7b130ed719 100644 --- a/doc/developer/design/20260529_txn_wal_frontiers_deasync.md +++ b/doc/developer/design/20260529_txn_wal_frontiers_deasync.md @@ -89,7 +89,15 @@ So the remap-driven downgrade never strands a record below the new cap. ### Per-activation algorithm ```text -1. if shutdown button pressed -> capability = None +1. if shutdown button pressed locally -> wedge (keep capability, leave inputs + undrained, reschedule) until ALL + workers pressed; then capability = + None and drain inputs + (mirrors builder_async two-phase + shutdown; early local drop would let + the downstream frontier advance past + this worker's discarded input during + cross-worker teardown skew) 2. drain remap_input.for_each -> fold into `remap`: keep larger logical_upper, assert physical monotone 3. fold remap frontier: Some(l) -> bump remap.logical_upper diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 14c5f6e5e3590..192dd8beed11a 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -230,11 +230,11 @@ where (remap_stream, shutdown_button.press_on_drop()) } -/// The block ordering inside the schedule closure is load-bearing: buffered -/// passthrough data is emitted at the pre-activation capability BEFORE any +/// The block ordering inside the schedule closure is load-bearing: pending +/// passthrough input is emitted at the pre-activation capability BEFORE any /// capability downgrade, which keeps the differential invariant `send_time <= -/// record_time` and avoids dropping buffered rows when the passthrough frontier -/// crosses `until` in the same activation (SQL-299). Do not reorder. +/// record_time` and avoids dropping in-flight rows when the passthrough +/// frontier crosses `until` in the same activation (SQL-299). Do not reorder. fn txns_progress_frontiers<'scope, K, V, T, D, P, C>( remap: StreamVec<'scope, T, DataRemapEntry>, passthrough: StreamVec<'scope, T, P>, @@ -275,9 +275,9 @@ where let mut remap_input = builder.new_input_connection(remap, Pipeline, []); let mut passthrough_input = builder.new_input_connection(passthrough, Pipeline, []); - let (shutdown_handle, shutdown_button) = button(scope, info.address); + let (mut shutdown_handle, shutdown_button) = button(scope, info.address); - builder.build(move |capabilities| { + builder.build_reschedule(move |capabilities| { // The output capability's time tracks how far we've progressed in // copying along the passthrough input. `None` indicates that we've // dropped the capability to shut down. @@ -296,8 +296,27 @@ where let mut remap_closed = false; move |frontiers| { + // If our worker pressed the button we stop producing data and + // frontier updates downstream, but mirror `builder_async`: hold the + // capability and stop draining the inputs until ALL workers have + // pressed. Dropping the capability on the local press alone would + // let the downstream frontier advance during cross-worker teardown + // skew, past times whose data this worker has discarded, while + // other workers' operator instances still feed downstream. if shutdown_handle.local_pressed() { - capability = None; + return if shutdown_handle.all_pressed() { + // All workers pressed: drop the capability and drain the + // inputs so teardown does not stall the dataflow. + capability = None; + remap_input.for_each(|_input_cap, _data| {}); + passthrough_input.for_each(|_input_cap, _data| {}); + false + } else { + // Wedge: keep the capability, leave the inputs undrained + // (their pending messages hold the frontier), and ask to be + // rescheduled until the remaining workers press. + true + }; } // Fold new DataRemapEntries, keeping the one with the largest @@ -341,13 +360,14 @@ where debug!("{} remap {:?} remap_closed={}", name, remap, remap_closed); - // Emit buffered passthrough data at the current (pre-downgrade) - // capability, BEFORE any downgrade below. `cap.time()` here equals - // the pre-activation frontier, which is `<=` every buffered - // record's time, so the differential invariant `send_time <= - // record_time` holds. Doing this before the `until`-driven drop is - // the SQL-299 fix. NB: nothing to do for `until` because the - // shard_source (before) and mfp_and_decode (after) filter. + // Pass through any data the passthrough input has pending, at the + // current (pre-downgrade) capability, BEFORE any downgrade below. + // `cap.time()` here equals the pre-activation frontier, which is + // `<=` every pending record's time, so the differential invariant + // `send_time <= record_time` holds. Doing this before the + // `until`-driven drop is the SQL-299 fix. NB: nothing to do for + // `until` because the shard_source (before) and mfp_and_decode + // (after) filter. if let Some(cap) = capability.as_ref() { let mut output = passthrough_output.activate(); passthrough_input.for_each(|_input_cap, data| { @@ -412,8 +432,8 @@ where // If we've copied passthrough data to at least `physical_upper`, we // can artificially advance the output to `logical_upper`. By the - // emptiness of `[physical_upper, logical_upper)`, no buffered record - // lies below `logical_upper`, so this never strands data. + // emptiness of `[physical_upper, logical_upper)`, no record still in + // flight lies below `logical_upper`, so this never strands data. if let Some(cap) = capability.as_mut() { assert!(remap.physical_upper <= remap.logical_upper); let phys_reached = remap.physical_upper.less_equal(cap.time()); @@ -422,6 +442,8 @@ where cap.downgrade(&remap.logical_upper); } } + + false } });