From ab3780dc0392ae715d47be705ed2737f7473e059 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 24 Apr 2023 16:16:06 +0200 Subject: [PATCH] compute: tokenize linear join closure This commit adds a shutdown token check to the closure we pass to the differential join operator. When the dataflow is shutting down, this makes the join closure drain all input data, rather than processing it. As a result, differential join operators shut down faster an emit less data, which in turn speeds up shutdown of downstream operators. Unfortunately, the new shutdown logic interferes with the fueling of the differential join operator. Fuel is consumed based on the number of updates emitted. When the token is dropped, the join closure stops producing updates, which means the operator stops consuming fuel, so it does not yield anymore until it has drained all its inputs. If there are many inputs left, the replica may not accept commands for potentially quite a long time. --- src/compute/src/render/join/linear_join.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index c0ca286df04e..d7fb77324549 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -26,6 +26,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena}; use mz_storage_client::types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; +use crate::render::context::ShutdownToken; use crate::render::context::{ Arrangement, ArrangementFlavor, ArrangementImport, CollectionBundle, Context, }; @@ -123,6 +124,7 @@ where inputs[stage_plan.lookup_relation].enter_region(inner), stage_plan, &mut errors, + self.shutdown_token.clone(), ); // Update joined results and capture any errors. joined = JoinedFlavor::Collection(stream); @@ -179,6 +181,7 @@ fn differential_join( lookup_relation: _, }: LinearStagePlan, errors: &mut Vec>, + shutdown_token: ShutdownToken, ) -> Collection where G: Scope, @@ -223,13 +226,13 @@ where } JoinedFlavor::Local(local) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = differential_join_inner(local, oks, closure); + let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = differential_join_inner(local, oks, closure); + let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -237,13 +240,13 @@ where }, JoinedFlavor::Trace(trace) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = differential_join_inner(trace, oks, closure); + let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = differential_join_inner(trace, oks, closure); + let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -262,6 +265,7 @@ fn differential_join_inner( prev_keyed: J, next_input: Arranged, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -284,6 +288,10 @@ where if closure.could_error() { let (oks, err) = prev_keyed .join_core(&next_input, move |key, old, new| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, old, new]); closure @@ -303,6 +311,10 @@ where (oks.as_collection(), Some(err.as_collection())) } else { let oks = prev_keyed.join_core(&next_input, move |key, old, new| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, old, new]); closure