Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: add token-controlled fuse in feedback edges #18718

Merged
merged 8 commits into from
Apr 18, 2023
33 changes: 21 additions & 12 deletions src/compute/src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
//! if/when the errors are retracted.

use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use std::rc::{Rc, Weak};
use std::sync::Arc;

use differential_dataflow::dynamic::pointstamp::PointStamp;
Expand All @@ -127,12 +127,13 @@ use mz_storage_client::controller::CollectionMetadata;
use mz_storage_client::source::persist_source;
use mz_storage_client::source::persist_source::FlowControl;
use mz_storage_client::types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;
use mz_timely_util::probe::{self, ProbeNotify};

use crate::arrangement::manager::TraceBundle;
use crate::compute_state::ComputeState;
use crate::logging::compute::LogImportFrontiers;
use crate::typedefs::ErrSpine;
use crate::typedefs::{ErrSpine, RowKeySpine};
pub use context::CollectionBundle;
use context::{ArrangementFlavor, Context};

Expand Down Expand Up @@ -647,22 +648,30 @@ where
let (oks, err) = bundle.collection.clone().unwrap();
self.insert_id(Id::Local(*id), bundle);
let (oks_v, err_v) = variables.remove(&Id::Local(*id)).unwrap();

// Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
use crate::typedefs::RowKeySpine;
oks_v.set(&oks.consolidate_named::<RowKeySpine<_, _, _>>("LetRecConsolidation"));
let mut oks = oks.consolidate_named::<RowKeySpine<_, _, _>>("LetRecConsolidation");
if let Some(token) = &self.shutdown_token {
oks = oks.with_token(Weak::clone(token));
}
oks_v.set(&oks);

// Set err variable to the distinct elements of `err`.
// Distinctness is important, as we otherwise might add the same error each iteration,
// say if the limit of `oks` has an error. This would result in non-terminating rather
// than a clean report of the error. The trade-off is that we lose information about
// multiplicities of errors, but .. this seems to be the better call.
err_v.set(
&err.arrange_named::<ErrSpine<DataflowError, _, _>>("Arrange recursive err")
.reduce_abelian::<_, ErrSpine<_, _, _>>(
"Distinct recursive err",
move |_k, _s, t| t.push(((), 1)),
)
.as_collection(|k, _| k.clone()),
);
let mut errs = err
.arrange_named::<ErrSpine<DataflowError, _, _>>("Arrange recursive err")
.reduce_abelian::<_, ErrSpine<_, _, _>>(
"Distinct recursive err",
move |_k, _s, t| t.push(((), 1)),
)
.as_collection(|k, _| k.clone());
if let Some(token) = &self.shutdown_token {
errs = errs.with_token(Weak::clone(token));
}
err_v.set(&errs);
}
// Now extract each of the bindings into the outer scope.
for id in ids.into_iter() {
Expand Down
29 changes: 29 additions & 0 deletions src/timely-util/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! Common operator transformations on timely streams and differential collections.

use std::future::Future;
use std::rc::Weak;

use differential_dataflow::difference::{Multiply, Semigroup};
use differential_dataflow::lattice::Lattice;
Expand Down Expand Up @@ -143,6 +144,11 @@ where
/// Take a Timely stream and convert it to a Differential stream, where each diff is "1"
/// and each time is the current Timely timestamp.
fn pass_through<R: Data>(&self, name: &str, unit: R) -> Stream<G, (D1, G::Timestamp, R)>;

/// Wraps the stream with an operator that passes through all received inputs as long as the
/// provided token can be upgraded. Once the token cannot be upgraded anymore, all data flowing
/// into the operator is dropped.
fn with_token(&self, token: Weak<()>) -> Stream<G, D1>;
teskje marked this conversation as resolved.
Show resolved Hide resolved
}

/// Extension methods for differential [`Collection`]s.
Expand Down Expand Up @@ -207,6 +213,11 @@ where
E: Data,
IE: Fn(D1, R) -> (E, R) + 'static,
R: num_traits::sign::Signed;

/// Wraps the collection with an operator that passes through all received inputs as long as
/// the provided token can be upgraded. Once the token cannot be upgraded anymore, all data
/// flowing into the operator is dropped.
fn with_token(&self, token: Weak<()>) -> Collection<G, D1, R>;
}

impl<G, D1> StreamExt<G, D1> for Stream<G, D1>
Expand Down Expand Up @@ -390,6 +401,20 @@ where
}
})
}

fn with_token(&self, token: Weak<()>) -> Stream<G, D1> {
self.unary(Pipeline, "WithToken", move |_cap, _info| {
let mut vector = Default::default();
move |input, output| {
input.for_each(|cap, data| {
if token.upgrade().is_some() {
data.swap(&mut vector);
output.session(&cap).give_container(&mut vector);
}
});
}
})
}
}

impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
Expand Down Expand Up @@ -479,6 +504,10 @@ where
});
(oks.as_collection(), errs.as_collection())
}

fn with_token(&self, token: Weak<()>) -> Collection<G, D1, R> {
self.inner.with_token(token).as_collection()
}
}

/// Creates a new async data stream source for a scope.
Expand Down
125 changes: 125 additions & 0 deletions test/testdrive/divergent-dataflow-cancellation.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# This test checks whether divergent WMR dataflows are correctly dropped after
# they have been cancelled by the user.
#
# We check whether the dataflow was dropped by inspecting the introspection
# sources. This also serves to verify that logging is correctly cleaned up under
# active dataflow cancellation.

> CREATE VIEW divergent AS
WITH MUTUALLY RECURSIVE
flip(x int) AS (VALUES(1) EXCEPT ALL SELECT * FROM flip)
SELECT * FROM flip
> CREATE INDEX divergent_index ON divergent (x)

> CREATE MATERIALIZED VIEW divergent_materialized AS
WITH MUTUALLY RECURSIVE
flip(x int) AS (VALUES(1) EXCEPT ALL SELECT * FROM flip)
SELECT * FROM flip

# Ensure the dataflow was successfully installed.
> SELECT count(*)
FROM mz_internal.mz_dataflows
WHERE name LIKE '%divergent%'
2

# Drop the installed dataflows

> DROP INDEX divergent_index

> DROP MATERIALIZED VIEW divergent_materialized

# Force a statement timeout

> CREATE TABLE divergent_insert_select (f1 INTEGER);

> SET statement_timeout = '5s'

! INSERT INTO divergent_insert_select
WITH MUTUALLY RECURSIVE flip(x int) AS (VALUES(1) EXCEPT ALL SELECT * FROM flip)
SELECT * FROM flip;
contains: canceling statement due to statement timeout

# Force a cursor to close
> BEGIN

> DECLARE c CURSOR FOR SUBSCRIBE (
WITH MUTUALLY RECURSIVE flip(x int) AS (VALUES(1) EXCEPT ALL SELECT * FROM flip)
SELECT * FROM flip
)

> FETCH ALL c WITH (timeout = '2s');

> COMMIT

# Confirm that all dataflows are now cancelled

> SELECT count(*) FROM mz_internal.mz_active_peeks_per_worker
0

> SELECT count(*) FROM mz_internal.mz_arrangement_batches_raw
0

> SELECT count(*) FROM mz_internal.mz_arrangement_records_raw
0

> SELECT count(*) FROM mz_internal.mz_arrangement_sharing_raw
0

> SELECT count(*) FROM mz_internal.mz_compute_delays_histogram_raw
0

> SELECT count(*) FROM mz_internal.mz_compute_dependencies_per_worker
0

> SELECT count(*) FROM mz_internal.mz_compute_exports_per_worker
0

# One frontier for each introspection arrangement.
> SELECT count(*)
FROM mz_internal.mz_compute_frontiers_per_worker
WHERE worker_id = 0
19

> SELECT count(*) FROM mz_internal.mz_compute_import_frontiers_per_worker
0

> SELECT count(*) FROM mz_internal.mz_compute_operator_durations_histogram_raw
0

> SELECT count(*) FROM mz_internal.mz_dataflow_addresses_per_worker
0

> SELECT count(*) FROM mz_internal.mz_dataflow_channels_per_worker
0

> SELECT count(*) FROM mz_internal.mz_dataflow_operator_reachability_raw
0

> SELECT count(*) FROM mz_internal.mz_dataflow_operators_per_worker
0

> SELECT count(*) FROM mz_internal.mz_message_counts_received_raw
0

> SELECT count(*) FROM mz_internal.mz_message_counts_sent_raw
0

# This source never sees retractions.
> SELECT count(*) > 0 FROM mz_internal.mz_peek_durations_histogram_raw
true

> SELECT count(*) FROM mz_internal.mz_scheduling_elapsed_raw
0

# This source never sees retractions.
> SELECT count(*) > 0 FROM mz_internal.mz_scheduling_parks_histogram_raw
true