From dc950c8cbd3329a4f0c248af555a9872e192ebbb Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 24 Apr 2023 16:13:18 +0200 Subject: [PATCH] compute: tokenize delta join closure This commit adds a shutdown token check to the closure we pass to the delta join operator. When the dataflow is shutting down, this makes the join closure drain all input data, rather than processing it. As a results, delta join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators. --- src/compute/src/render/join/delta_join.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index 8fc13b3ad562a..59291233d4ce5 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -12,6 +12,7 @@ //! Consult [DeltaJoinPlan] documentation for details. #![allow(clippy::op_ref)] + use std::collections::{BTreeMap, BTreeSet}; use timely::dataflow::Scope; @@ -24,7 +25,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena}; use mz_storage_client::types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; -use crate::render::context::{ArrangementFlavor, CollectionBundle, Context}; +use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken}; impl Context where @@ -198,6 +199,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + self.shutdown_token.clone(), ) } else { build_halfjoin( @@ -207,6 +209,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + self.shutdown_token.clone(), ) } } @@ -219,6 +222,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + self.shutdown_token.clone(), ) } else { build_halfjoin( @@ -228,6 +232,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + self.shutdown_token.clone(), ) } } @@ -314,6 +319,7 @@ fn build_halfjoin( prev_thinning: Vec, comparison: CF, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Collection, @@ -364,6 +370,10 @@ where |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. move |key, stream_row, lookup_row, initial, time, diff1, diff2| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]); let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder); @@ -396,6 +406,10 @@ where |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. move |key, stream_row, lookup_row, initial, time, diff1, diff2| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]); let row = closure