diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index c0ca286df04e..d7fb77324549 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -26,6 +26,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena}; use mz_storage_client::types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; +use crate::render::context::ShutdownToken; use crate::render::context::{ Arrangement, ArrangementFlavor, ArrangementImport, CollectionBundle, Context, }; @@ -123,6 +124,7 @@ where inputs[stage_plan.lookup_relation].enter_region(inner), stage_plan, &mut errors, + self.shutdown_token.clone(), ); // Update joined results and capture any errors. joined = JoinedFlavor::Collection(stream); @@ -179,6 +181,7 @@ fn differential_join( lookup_relation: _, }: LinearStagePlan, errors: &mut Vec>, + shutdown_token: ShutdownToken, ) -> Collection where G: Scope, @@ -223,13 +226,13 @@ where } JoinedFlavor::Local(local) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = differential_join_inner(local, oks, closure); + let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = differential_join_inner(local, oks, closure); + let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -237,13 +240,13 @@ where }, JoinedFlavor::Trace(trace) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = differential_join_inner(trace, oks, closure); + let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = differential_join_inner(trace, oks, closure); + let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -262,6 +265,7 @@ fn differential_join_inner( prev_keyed: J, next_input: Arranged, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -284,6 +288,10 @@ where if closure.could_error() { let (oks, err) = prev_keyed .join_core(&next_input, move |key, old, new| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, old, new]); closure @@ -303,6 +311,10 @@ where (oks.as_collection(), Some(err.as_collection())) } else { let oks = prev_keyed.join_core(&next_input, move |key, old, new| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, old, new]); closure