Skip to content

compute: move MV sink persist I/O off Timely thread#35328

Open
antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
antiguru:deasync
Open

compute: move MV sink persist I/O off Timely thread#35328
antiguru wants to merge 6 commits intoMaterializeInc:mainfrom
antiguru:deasync

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Mar 5, 2026

Convert the MV sink's mint, write, and append operators from AsyncOperatorBuilder to OperatorBuilderRc (sync) with dedicated Tokio tasks for persist I/O.
This removes async overhead from the Timely scheduling loop while keeping persist latency isolated from dataflow scheduling.

Key changes:

  • mint: Tokio task watches persist upper via SyncActivator; operator manages capabilities and batch description minting
  • write: Tokio task owns WriteHandle and CorrectionBuffer; operator sends commands via channel; ArcActivator coalesces redundant cross-thread activations
  • append: Tokio task performs compare_and_append; operator forwards descriptions, batches, and frontier updates
  • CorrectionLogger tracks net batch/record/size counts and retracts outstanding logging state on drop, fixing stale introspection rows after dataflow cancellation
  • ChannelLogging bridges Correction events from Tokio task to Timely thread for introspection logging
  • All tasks use abort_on_drop for prompt cleanup on dataflow shutdown

Shares the infra commit (compute/storage: add Send-safe infrastructure for sync Timely operators) with #35450.
Rebase before merging to deduplicate.

🤖 Generated with Claude Code

@github-actions
Copy link

github-actions bot commented Mar 5, 2026

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention checks out to me. I'll note that for me this makes it noticeably harder to follow the logic and be sure that it's still correct, so we should only make this change if we have measured performance improvements. Aiui, the thinking is that Timely activations are expensive enough to justify the overhead of the additional tokio tasks and channels. It'd be great to have some numbers!

@antiguru antiguru force-pushed the deasync branch 7 times, most recently from f1e5e14 to c5b5a72 Compare March 12, 2026 08:06
Prepare shared infrastructure needed for converting async Timely operators
to sync operators with Tokio tasks:

* Add `StartSignal::into_send_future()` to extract a Send-safe future
* Change `InternalCommandSender` to use `SyncActivator` (Send-safe)
  instead of `Activator` (thread-local)
* Add `+ Send` bounds to `persist_source` params (`listen_sleep`,
  `start_signal`) so callers can pass them to Tokio tasks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@antiguru
Copy link
Member Author

bugbot run

@cursor
Copy link

cursor bot commented Mar 17, 2026

PR Summary

Medium Risk
Reworks MV sink operators’ execution model and cross-thread coordination (Tokio tasks, channels, activators), which could affect sink correctness, progress, or shutdown behavior despite being largely a refactor.

Overview
Moves Materialized View sink persist interactions off Timely worker threads by refactoring the mint, write, and append operators to run persist I/O/state machines in spawned Tokio tasks, communicating with Timely via mpsc channels and SyncActivator-based wakeups.

Introduces StartSignal::into_send_future and tightens persist_source start-signal/listen hooks to be Send, plus updates correction buffering/logging to be task-safe via ChannelLogging/CorrectionLogger (including cleanup/retraction on async abort).

Written by Cursor Bugbot for commit 4ceac22. This will update automatically on new commits. Configure here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix is ON, but it could not run because the branch was deleted or merged before autofix could start.

@antiguru antiguru force-pushed the deasync branch 4 times, most recently from 7d1eb5d to ecb3d35 Compare March 17, 2026 10:35
@antiguru antiguru marked this pull request as ready for review March 17, 2026 10:47
@antiguru antiguru requested a review from a team as a code owner March 17, 2026 10:47
Convert the MV sink's mint, write, and append operators from
AsyncOperatorBuilder to OperatorBuilderRc (sync) with dedicated Tokio
tasks for persist I/O.

Key changes:
* mint: Tokio task watches persist upper, operator manages capabilities
* write: Tokio task owns WriteHandle and CorrectionBuffer, operator
  sends batches via channel
* append: Tokio task performs compare_and_append, operator collects
  results
* ArcActivator coalesces redundant cross-thread activations
* CorrectionLogger tracks net batch/record/size counts and retracts
  outstanding logging state on drop
* ChannelLogging bridges Correction events from Tokio task to Timely
  thread for introspection logging

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@frankmcsherry
Copy link
Contributor

Aiui, the thinking is that Timely activations are expensive enough to justify the overhead of the additional tokio tasks and channels.

I think there's an additional issue that is maybe addressed that the async operators would not only schedule themselves often, but would also yield quite happily. I believe we figured this was the most likely candidate for "why Frank's catalog server became permanently wedged". The operators do need to be designed to handle overload gracefully, and not fall permanently behind.

It'd be great to have some numbers!

+1 on this. It may be that this is a recognized issue in a tracker somewhere, but if not definitely worth socializing what's up and why.

Add `enable_compute_sync_mv_sink` dyncfg flag (default: false) to select
between the existing async Timely operator implementation and the new
sync+Tokio implementation of the MV sink. This enables gradual rollout
and quick rollback without redeployment.

The sync implementation is extracted into `materialized_view_v2.rs`. The
async implementation is restored as the default code path, adapted to
use `ChannelLogging`/`CorrectionLogger` instead of the removed `Logging`
type. Dispatch happens at `persist_sink()` based on the flag value.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add the new dyncfg flag to FlipFlagsAction in parallel-workload and
to get_variable_system_parameters in mzcompose for CI coverage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@antiguru antiguru requested a review from a team as a code owner March 17, 2026 13:59
antiguru and others added 2 commits March 17, 2026 17:02
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The maybe_mint_batch_description function was incorrectly rewritten
instead of being copied from the sync commit. Restore the correct
logic that uses persist_frontier as lower and desired_frontier as
upper. Also add the missing cap_set.try_downgrade in the else branch.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants