Skip to content

Commit

Permalink
compute: tokenize delta join closure
Browse files Browse the repository at this point in the history
This commit adds a shutdown token check to the closure we pass to the
delta join operator. When the dataflow is shutting down, this makes the
join closure drain all input data, rather than processing it. As a
results, delta join operators shut down faster and emit less data,
which in turn speeds up shutdown of downstream operators.
  • Loading branch information
teskje committed Apr 24, 2023
1 parent 8fbdc6e commit dc950c8
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Consult [DeltaJoinPlan] documentation for details.

#![allow(clippy::op_ref)]

use std::collections::{BTreeMap, BTreeSet};

use timely::dataflow::Scope;
Expand All @@ -24,7 +25,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena};
use mz_storage_client::types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;

use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};

impl<G> Context<G, Row>
where
Expand Down Expand Up @@ -198,6 +199,7 @@ where
stream_thinning,
|t1, t2| t1.le(t2),
closure,
self.shutdown_token.clone(),
)
} else {
build_halfjoin(
Expand All @@ -207,6 +209,7 @@ where
stream_thinning,
|t1, t2| t1.lt(t2),
closure,
self.shutdown_token.clone(),
)
}
}
Expand All @@ -219,6 +222,7 @@ where
stream_thinning,
|t1, t2| t1.le(t2),
closure,
self.shutdown_token.clone(),
)
} else {
build_halfjoin(
Expand All @@ -228,6 +232,7 @@ where
stream_thinning,
|t1, t2| t1.lt(t2),
closure,
self.shutdown_token.clone(),
)
}
}
Expand Down Expand Up @@ -314,6 +319,7 @@ fn build_halfjoin<G, Tr, CF>(
prev_thinning: Vec<usize>,
comparison: CF,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, (Row, G::Timestamp), Diff>,
Collection<G, DataflowError, Diff>,
Expand Down Expand Up @@ -364,6 +370,10 @@ where
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
// shutting down.
shutdown_token.probe()?;

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]);
let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
Expand Down Expand Up @@ -396,6 +406,10 @@ where
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
// shutting down.
shutdown_token.probe()?;

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]);
let row = closure
Expand Down

0 comments on commit dc950c8

Please sign in to comment.