Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: add token-controlled fuse in feedback edges #18718

Merged
merged 8 commits into from Apr 18, 2023

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Apr 11, 2023

This PR implements an alternative solution to recursive dataflow cancellation where tokened passthrough operators are rendered across all feedback edges of the dataflow.

The tokens are held by the exported sinks and indexes in the same way source and imported index tokens are.

When the token gets dropped iteration will immediatley stop, akin to having just executed a break instruction, and even divergent dataflows quickly shut down.

Motivation

Tips for reviewer

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • This PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way) and therefore is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:
    • N/A

@petrosagg petrosagg requested a review from teskje April 11, 2023 17:45
@petrosagg petrosagg force-pushed the wmr-cancellation branch 3 times, most recently from 6ebd923 to e102225 Compare April 12, 2023 09:39
@petrosagg petrosagg marked this pull request as ready for review April 12, 2023 09:42
@petrosagg petrosagg requested a review from a team as a code owner April 12, 2023 09:42
Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

I think this deserves a test. What do you think about salvaging the one from #18442? It is meant to test active dataflow cancellation, but it really only test cancellation of divergent dataflows.

src/timely-util/src/operator.rs Outdated Show resolved Hide resolved
@teskje teskje requested a review from antiguru April 12, 2023 11:14
@teskje
Copy link
Contributor

teskje commented Apr 12, 2023

@aalexandrov @ggevay FYI. This will fix #16800, but you might want to leave that ticket open for tracking the hard limit.

@ggevay
Copy link
Contributor

ggevay commented Apr 12, 2023

Oh, this is really great, thanks @petrosagg! I'm very happy about this!

I tried it in a little bit of manual testing by hitting Ctrl+C on WMR selects and by dropping WMR materialized views, and it seems to work. But I'll leave the code review to people who know Timely better.

@ggevay ggevay mentioned this pull request Apr 12, 2023
5 tasks
petrosagg and others added 2 commits April 12, 2023 14:54
This PR implements an alternative solution to recursive dataflow
cancellation where tokeneds passthrough operators are rendered across
all feedback edges of the dataflow.

The tokens are held by the exported sinks and indexes in the same way
source and imported index tokens are.

When the token gets dropped iteration will immediatley stop, akin to
having just executed a `break` instruction, and even divergent dataflows
quickly shut down.

Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
@petrosagg
Copy link
Contributor Author

@teskje I also cherry-picked the commit adding the test from #18442

@teskje
Copy link
Contributor

teskje commented Apr 12, 2023

Thanks! We should probably change the name of this file and the comments inside it, since it's not active dataflow cancellation that we are testing, but shutdown of divergent WMR dataflows. I can push another commit with that later if you don't get to it first!

Edit: I added a fixup commit that makes these changes.

Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach is valuable! Thanks for the PR.

I added some comments around naming and describing concepts, which I think are important to resolve before merging. The concept as a whole seems good, though.

This PR can introduce a consistency when logging errors to Sentry. At the moment, we only ever process whole batches of data, which might not be the case with this PR. For reductions, this might mean that they'll see negative accumulations and log it to Sentry. Ideally, we prevent these logs, but I'm fine if it's not part of the same PR (but please make sure it'll be part of the same release.)


/// Wraps the stream with a passthrough operator that will be shut down when the provided
/// token cannot be upgraded anymore.
fn fused(&self, token: Weak<()>) -> Stream<G, D1>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not in favor of the name fused. I think the function should represent a verb, not a noun, and fused mostly means "some things mixed together," which is not the meaning we want to have here. I liked the old with_token (?) better!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation can be more specific to what the implementation actually provides. At the moment, it will gate all data once the token is dropped, but still passes through progress traffic. For some dataflows, this might be surprising if they assume that dropping the token also advances the frontier to the empty frontier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having "fuse" in the name because it evokes the notion of a thing that stops stuff from flowing through once it is triggered. (It would be nicer to have a water analogy rather than one from electricity, but I can't think of one.) "token" doesn't evoke the same associations for me, it's pretty generic. But it does have a specific meaning in the context of MZ dataflows, so I think it would be fine too.

A couple more entries to the bike shedding:

  • with_fuse
  • with_circuit_breaker

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The noun form of fuse is distinct from the verb form here, so I prefer with_fuse over fused. Circuit breakers normally can be reset, unlike fuses, so I'd prefer to avoid that here.

@teskje
Copy link
Contributor

teskje commented Apr 12, 2023

This PR can introduce a consistency when logging errors to Sentry. At the moment, we only ever process whole batches of data, which might not be the case with this PR. For reductions, this might mean that they'll see negative accumulations and log it to Sentry. Ideally, we prevent these logs, but I'm fine if it's not part of the same PR (but please make sure it'll be part of the same release.)

This is something I have on my plate because we also need it for making persist_source exit early, and generally for everything we do for making dataflow shutdown faster. We can just leave this PR open until we have that? Should be ready somewhen next week.

@petrosagg
Copy link
Contributor Author

@antiguru @teskje I pushed a commit that changes the name to with_token, adds a bit more documentation, and also makes it so the frontier immediately advances to the empty antichain when the token is dropped.

@philip-stoev
Copy link
Contributor

This is from the "feature flags" Nightly job:

thread 'timely:work-0' panicked at 'assertion failed: `(left == right)`
  left: `1`,
 right: `0`', /cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/dataflow/operators/generic/builder_raw.rs:148:9
stack backtrace:
   0: rust_begin_unwind
             at ./rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at ./rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panicking.rs:64:14
   2: core::panicking::assert_failed_inner
   3: core::panicking::assert_failed::<usize, usize>
             at ./rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panicking.rs:212:5
   4: <timely::dataflow::operators::generic::builder_raw::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>>>::new_output_connection::<alloc::vec::Vec<(mz_repr::row::Row, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>, i64)>>
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/dataflow/operators/generic/builder_raw.rs:148:9
   5: <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>>>::new_output_connection::<alloc::vec::Vec<(mz_repr::row::Row, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>, i64)>>
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/dataflow/operators/generic/builder_rc.rs:110:29
   6: <timely::dataflow::stream::StreamCore<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>, alloc::vec::Vec<(mz_repr::row::Row, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>, i64)>> as mz_timely_util::operator::StreamExt<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>, (mz_repr::row::Row, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>, i64)>>::with_token
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/timely-util/src/operator.rs:409:36
   7: <differential_dataflow::collection::Collection<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>, mz_repr::row::Row, i64> as mz_timely_util::operator::CollectionExt<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>, mz_repr::row::Row, i64>>::with_token
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/timely-util/src/operator.rs:521:9
   8: <mz_compute::render::context::Context<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp>, timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>>, mz_repr::row::Row>>::render_recursive_plan
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/render/mod.rs:658:22
   9: mz_compute::render::build_compute_dataflow::<timely_communication::allocator::generic::Generic>::{closure#3}::{closure#2}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/render/mod.rs:307:38
  10: <timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp> as timely::dataflow::scopes::Scope>::scoped::<timely::order::product::Product<mz_repr::timestamp::Timestamp, differential_dataflow::dynamic::pointstamp::PointStamp<usize>>, (), mz_compute::render::build_compute_dataflow<timely_communication::allocator::generic::Generic>::{closure#3}::{closure#2}>
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/dataflow/scopes/child.rs:129:13
  11: <timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, mz_repr::timestamp::Timestamp> as timely::dataflow::scopes::Scope>::iterative::<differential_dataflow::dynamic::pointstamp::PointStamp<usize>, (), mz_compute::render::build_compute_dataflow<timely_communication::allocator::generic::Generic>::{closure#3}::{closure#2}>
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/dataflow/scopes/mod.rs:126:9
  12: mz_compute::render::build_compute_dataflow::<timely_communication::allocator::generic::Generic>::{closure#3}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/render/mod.rs:273:13
  13: <timely::worker::Worker<timely_communication::allocator::generic::Generic>>::dataflow_core::<mz_repr::timestamp::Timestamp, (), mz_compute::render::build_compute_dataflow<timely_communication::allocator::generic::Generic>::{closure#3}, alloc::boxed::Box<()>>
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/worker.rs:639:13
  14: mz_compute::render::build_compute_dataflow::<timely_communication::allocator::generic::Generic>
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/render/mod.rs:193:5
  15: <mz_compute::compute_state::ActiveComputeState<timely_communication::allocator::generic::Generic>>::handle_create_dataflows
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/compute_state.rs:226:13
  16: <mz_compute::compute_state::ActiveComputeState<timely_communication::allocator::generic::Generic>>::handle_compute_command
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/compute_state.rs:139:43
  17: <mz_compute::server::Worker<timely_communication::allocator::generic::Generic>>::handle_command
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/server.rs:414:9
  18: <mz_compute::server::Worker<timely_communication::allocator::generic::Generic>>::run_client
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/server.rs:369:17
  19: <mz_compute::server::Worker<timely_communication::allocator::generic::Generic>>::setup_channel_and_run_client
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/server.rs:321:9
  20: <mz_compute::server::Worker<timely_communication::allocator::generic::Generic>>::run
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/server.rs:181:21
  21: <mz_compute::server::Config as mz_cluster::types::AsRunnableWorker<mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>>::build_and_run::<timely_communication::allocator::generic::Generic>
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/compute/src/server.rs:162:9
  22: <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse, timely::scheduling::activate::SyncActivator>, mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>, mz_compute::server::Config, mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>>::build_timely::{closure#0}::{closure#1}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-04708f9e0b018c0f3-1/materialize/tests/src/cluster/src/server.rs:179:13
  23: timely::execute::execute_from::<timely_communication::allocator::generic::GenericBuilder, (), <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse, timely::scheduling::activate::SyncActivator>, mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>, mz_compute::server::Config, mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>>::build_timely::{closure#0}::{closure#1}>::{closure#0}
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/timely/src/execute.rs:388:22
  24: timely_communication::initialize::initialize_from::<timely_communication::allocator::generic::GenericBuilder, (), timely::execute::execute_from<timely_communication::allocator::generic::GenericBuilder, (), <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse, timely::scheduling::activate::SyncActivator>, mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>, mz_compute::server::Config, mz_compute_client::protocol::command::ComputeCommand, mz_compute_client::protocol::response::ComputeResponse>>::build_timely::{closure#0}::{closure#1}>::{closure#0}>::{closure#0}
             at ./cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/134842a/communication/src/initialize.rs:316:33
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

The query to cause the crash is:

CREATE VIEW wmr_01 AS WITH MUTUALLY RECURSIVE     foo (a int, b int) AS (SELECT 1, 2 UNION SELECT a, 7 FROM bar),     bar (a int) as (SELECT a FROM foo) SELECT * FROM bar;
SELECT * FROM wmr_01 

After you fix it, can you make sure this query is put in an SLT file -- this should have been detected in the normal CI rather than in a random test in Nightly.

@petrosagg
Copy link
Contributor Author

I'm sorry, this was a stupid bug in my code that broke recursive rendering so all recursive queries will panic. That's why testdrive 0 failed too. I'm pushing a fix now

Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Copy link
Contributor

@philip-stoev philip-stoev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled again and ran a stress test with 50 concurrent divergent WMR queries. All WMR dataflows both inside cursors and inside INSERT ... SELECT are being cancelled correctly. At the end of the test computed CPU usage drops to zero, so it is all good.

@teskje
Copy link
Contributor

teskje commented Apr 18, 2023

This PR now needs to be adapted to the changes made in #18772 (specifically 2a9097f). I can take care of that if you don't mind @petrosagg.

There is some uncertainty about aggressively advancing the fuse operator's output to the empty frontier: #18760 (comment). I think if we cannot come to an agreement here quickly, we should merge this PR with the initial version of the fuse operator that doesn't have special handling of its output frontier, to unblock divergent WMR cancellation.

The previous implementation of `with_token` progress handling was flawed
and prevented downstream operators from seeing any progress. In
addition, there is some uncertainty about the safety of aggressively
advancing to the empty frontier on token drop within dataflow loops. So
this commit reverts back to the previous behavior of only dropping data
updates on token drop. We can add the frontier advancement back in as a
follow up, if we agree that it is safe.
@teskje
Copy link
Contributor

teskje commented Apr 18, 2023

As the sqllogictests found, the last commit that made the with_token operator advance to the empty frontier broke WMR queries. I suspect something was wrong with the way that operator tried to advance its frontier in spite of having an empty path summary on its output. I decided to revert this change, which also sidesteps the "is this even safe" discussion and brings us to a PR we can merge now.

@teskje
Copy link
Contributor

teskje commented Apr 18, 2023

I think this PR is ready for the final review. All comments have been addressed AFAICT. @vmarcos and @tokenrove can you take a look please?

Copy link
Contributor

@vmarcos vmarcos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good to me; I added a few smaller comments regarding code organization / comments, but nothing that would stand in the way of merging.

src/timely-util/src/operator.rs Outdated Show resolved Hide resolved
src/timely-util/src/operator.rs Outdated Show resolved Hide resolved
src/timely-util/src/operator.rs Show resolved Hide resolved
src/timely-util/src/operator.rs Outdated Show resolved Hide resolved
@teskje
Copy link
Contributor

teskje commented Apr 18, 2023

TFTRs! And thanks @petrosagg for the implementation!

@teskje teskje merged commit 4f73279 into MaterializeInc:main Apr 18, 2023
56 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants