txn-wal: deasync txns_progress_frontiers#36810
Conversation
Lessons from the txn-wal txns_progress_frontiers deasync (MaterializeInc#36810; prior attempts MaterializeInc#36537/MaterializeInc#36548 reverted): the input-await-only conversion case is semantic, not mechanical. Adds: derive the blocking structure first, the emit-before-downgrade and emptiness-contract ordering rules, intentional divergence documentation, scripted-harness pitfalls (button lifetime, close-vs-advance_to(MAX), send buffering, output-before-input, quiescence), teeth-proven regression tests, oracle-free fuzz properties, and why the old async impl must not be used as a differential oracle. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
aljoscha
left a comment
There was a problem hiding this comment.
I read the doc, to get the reasoning and facts and the operator impl. They look good! Only skimmed the tests.
|
|
||
| move |frontiers| { | ||
| if shutdown_handle.local_pressed() { | ||
| capability = None; |
There was a problem hiding this comment.
I was wondering if this is correct? Don't we get spurious frontier downgrades from this, downstream?
There was a problem hiding this comment.
Good catch — you were right. builder_async holds capabilities until all workers have pressed (on the local press it only stops scheduling the logic future and draining inputs); dropping on local_pressed alone could let the downstream frontier advance past this worker's discarded input while other workers' instances still feed downstream. Fixed in 4b10171: the operator now mirrors the two-phase shutdown via build_reschedule — wedge (retain capability, leave inputs undrained, reschedule) on the local press, drop and drain once all_pressed().
| // filtering. | ||
| debug!("{} remap {:?} remap_closed={}", name, remap, remap_closed); | ||
|
|
||
| // Emit buffered passthrough data at the current (pre-downgrade) |
There was a problem hiding this comment.
I found "buffered data" slightly confusing here because the operator itself is not buffering it, it's just consuming it's input. Which yes, is buffered and yada yada, but still.
There was a problem hiding this comment.
Fair — reworded to "pending input" / "in-flight" in 4b10171.
Specifies converting txns_progress_frontiers from AsyncOperatorBuilder to synchronous OperatorBuilderRc. Grounds correctness in two provable facts (buffered data safe at pre-activation cap; emptiness of [physical_upper, logical_upper)) rather than re-deriving per-activation logic, which is how PRs 36537 and 36548 introduced bugs. Adds a differential test against the async reference as the backstop against unknown interleavings. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The deasynced operator emits each batch at its current capability, so per-batch stream timestamps depend on scheduling cadence and are not contractual. Assert the record set and the differential invariant (stream ts <= record time) instead of exact stream timestamps. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Replace the planned differential-vs-async-oracle test with an oracle-free property fuzz test: over random input interleavings, assert the sync operator emits exactly the sent payload multiset (no loss or duplication) and never prematurely drops its capability. This is sound under arbitrary schedules and directly targets the data-loss and premature-shutdown bug classes. The async txns_progress_frontiers was kept only as a potential test oracle, but it strands data on schedules that violate the remap emptiness contract (it trusts the contract and advances its capability past buffered data), so it is not a sound oracle in a synthetic harness. Remove it and its read_remap_input helper, along with the now-unused async imports. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Address review: dropping the capability on the local button press alone is weaker than builder_async, which holds capabilities until ALL workers have pressed. An early local drop lets the downstream frontier advance past times whose data this worker discarded while other workers' operator instances still feed downstream (cross-worker teardown skew). Mirror the two-phase semantics via build_reschedule: wedge (retain the capability, leave inputs undrained, reschedule) on the local press, drop the capability and drain inputs once all workers pressed. Also reword "buffered data" comments: the operator consumes pending input, it does not buffer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
0a11a3b to
4b10171
Compare
|
Thanks for the review! |
Convert
txns_progress_frontiersfrom an asyncAsyncOperatorBuilderto a synchronousOperatorBuilderRc, and remove the async implementation entirely.Third attempt: #36537/#36538 were reverted and #36548 never landed. The SQL-299 data-loss bug is fixed by construction — buffered passthrough data is emitted at the pre-activation capability, before any capability downgrade. The PER-4 stall fix (retain the last remap entry after the remap input closes) and the replica-targeted-select-abort fix (gate the passthrough frontier on
waiting_for_remap) are preserved.Tests: three regression tests (SQL-299, PER-4,
SELECT AS OF MAX), each confirmed to fail when its fix is reverted, plus an oracle-free fuzz test asserting no data loss and no premature shutdown over random interleavings.as_of_untilis relaxed to the deasync stream-timestamp contract.Design:
doc/developer/design/20260529_txn_wal_frontiers_deasync.md.🤖 Generated with Claude Code