Skip to content

Commit

Permalink
compute: tokenize linear join closure
Browse files Browse the repository at this point in the history
This commit adds a shutdown token check to the closure we pass to the
differential join operator. When the dataflow is shutting down, this
makes the join closure drain all input data, rather than processing it.
As a result, differential join operators shut down faster an emit less
data, which in turn speeds up shutdown of downstream operators.

Unfortunately, the new shutdown logic interferes with the fueling of the
differential join operator. Fuel is consumed based on the number of
updates emitted. When the token is dropped, the join closure stops
producing updates, which means the operator stops consuming fuel, so it
does not yield anymore until it has drained all its inputs. If there are
many inputs left, the replica may not accept commands for potentially
quite a long time.
  • Loading branch information
teskje committed Apr 24, 2023
1 parent dc950c8 commit ab3780d
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena};
use mz_storage_client::types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;

use crate::render::context::ShutdownToken;
use crate::render::context::{
Arrangement, ArrangementFlavor, ArrangementImport, CollectionBundle, Context,
};
Expand Down Expand Up @@ -123,6 +124,7 @@ where
inputs[stage_plan.lookup_relation].enter_region(inner),
stage_plan,
&mut errors,
self.shutdown_token.clone(),
);
// Update joined results and capture any errors.
joined = JoinedFlavor::Collection(stream);
Expand Down Expand Up @@ -179,6 +181,7 @@ fn differential_join<G, T>(
lookup_relation: _,
}: LinearStagePlan,
errors: &mut Vec<Collection<G, DataflowError, Diff>>,
shutdown_token: ShutdownToken,
) -> Collection<G, Row, Diff>
where
G: Scope,
Expand Down Expand Up @@ -223,27 +226,27 @@ where
}
JoinedFlavor::Local(local) => match arrangement {
ArrangementFlavor::Local(oks, errs1) => {
let (oks, errs2) = differential_join_inner(local, oks, closure);
let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
ArrangementFlavor::Trace(_gid, oks, errs1) => {
let (oks, errs2) = differential_join_inner(local, oks, closure);
let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
},
JoinedFlavor::Trace(trace) => match arrangement {
ArrangementFlavor::Local(oks, errs1) => {
let (oks, errs2) = differential_join_inner(trace, oks, closure);
let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
ArrangementFlavor::Trace(_gid, oks, errs1) => {
let (oks, errs2) = differential_join_inner(trace, oks, closure);
let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
Expand All @@ -262,6 +265,7 @@ fn differential_join_inner<G, T, J, Tr2>(
prev_keyed: J,
next_input: Arranged<G, Tr2>,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, Row, Diff>,
Option<Collection<G, DataflowError, Diff>>,
Expand All @@ -284,6 +288,10 @@ where
if closure.could_error() {
let (oks, err) = prev_keyed
.join_core(&next_input, move |key, old, new| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
// shutting down.
shutdown_token.probe()?;

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
closure
Expand All @@ -303,6 +311,10 @@ where
(oks.as_collection(), Some(err.as_collection()))
} else {
let oks = prev_keyed.join_core(&next_input, move |key, old, new| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
// shutting down.
shutdown_token.probe()?;

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
closure
Expand Down

0 comments on commit ab3780d

Please sign in to comment.