Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: tokenize delta join operator #18927

Merged
merged 2 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ where
pub until: Antichain<T>,
/// Bindings of identifiers to collections.
pub bindings: BTreeMap<Id, CollectionBundle<S, V, T>>,
/// An optional token that operators can probe to know whether the dataflow is shutting down.
pub shutdown_token: Option<Weak<()>>,
/// An token that operators can probe to know whether the dataflow is shutting down.
pub(super) shutdown_token: ShutdownToken,
}

impl<S: Scope, V: Data + columnation::Columnation> Context<S, V>
Expand All @@ -121,7 +121,7 @@ where
as_of_frontier,
until: dataflow.until.clone(),
bindings: BTreeMap::new(),
shutdown_token: None,
shutdown_token: Default::default(),
}
}
}
Expand Down Expand Up @@ -175,6 +175,42 @@ where
}
}

/// Convenient wrapper around an optional `Weak` instance that can be used to check whether a
/// datalow is shutting down.
///
/// Instances created through the `Default` impl act as if the dataflow never shuts down.
/// Instances created through [`ShutdownToken::new`] defer to the wrapped token.
#[derive(Clone, Default)]
pub(super) struct ShutdownToken(Option<Weak<()>>);

impl ShutdownToken {
/// Construct a `ShutdownToken` instance that defers to `token`.
pub(super) fn new(token: Weak<()>) -> Self {
Self(Some(token))
}

/// Probe the token for dataflow shutdown.
///
/// This method is meant to be used with the `?` operator: It returns `None` if the dataflow is
/// in the process of shutting down and `Some` otherwise.
pub(super) fn probe(&self) -> Option<()> {
match &self.0 {
Some(t) => t.upgrade().map(|_| ()),
None => Some(()),
}
}

/// Returns whether the dataflow is in the process of shutting down.
pub(super) fn in_shutdown(&self) -> bool {
self.probe().is_none()
}

/// Returns a reference to the wrapped `Weak`.
pub(crate) fn get_inner(&self) -> Option<&Weak<()>> {
self.0.as_ref()
}
}

/// Describes flavor of arrangement: local or imported trace.
#[derive(Clone)]
pub enum ArrangementFlavor<S: Scope, V: Data + columnation::Columnation, T = mz_repr::Timestamp>
Expand Down
19 changes: 7 additions & 12 deletions src/compute/src/render/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
//! Helpers for handling errors encountered by operators.

use std::hash::Hash;
use std::rc::Weak;

use differential_dataflow::ExchangeData;
use mz_repr::Row;
use timely::container::columnation::Columnation;

use mz_repr::Row;

use super::context::ShutdownToken;

/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
/// specialized for use in compute. Validation code will only run when the error constructor is
/// Some.
Expand Down Expand Up @@ -77,25 +79,18 @@ where
/// the process of shutting down.
#[derive(Clone)]
pub(super) struct ErrorLogger {
token: Option<Weak<()>>,
token: ShutdownToken,
dataflow_name: String,
}

impl ErrorLogger {
pub fn new(token: Option<Weak<()>>, dataflow_name: String) -> Self {
pub fn new(token: ShutdownToken, dataflow_name: String) -> Self {
Self {
token,
dataflow_name,
}
}

fn token_alive(&self) -> bool {
match &self.token {
Some(t) => t.upgrade().is_some(),
None => true,
}
}

/// Log the given error, unless the dataflow is shutting down.
///
/// The logging format is optimized for surfacing errors with Sentry:
Expand All @@ -113,7 +108,7 @@ impl ErrorLogger {
///
// TODO(#18214): Rethink or justify our error logging strategy.
pub fn log(&self, message: &'static str, details: &str) {
if self.token_alive() {
if !self.token.in_shutdown() {
self.log_always(message, details);
}
}
Expand Down
16 changes: 15 additions & 1 deletion src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Consult [DeltaJoinPlan] documentation for details.

#![allow(clippy::op_ref)]

use std::collections::{BTreeMap, BTreeSet};

use timely::dataflow::Scope;
Expand All @@ -24,7 +25,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena};
use mz_storage_client::types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;

use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};

impl<G> Context<G, Row>
where
Expand Down Expand Up @@ -198,6 +199,7 @@ where
stream_thinning,
|t1, t2| t1.le(t2),
closure,
self.shutdown_token.clone(),
)
} else {
build_halfjoin(
Expand All @@ -207,6 +209,7 @@ where
stream_thinning,
|t1, t2| t1.lt(t2),
closure,
self.shutdown_token.clone(),
)
}
}
Expand All @@ -219,6 +222,7 @@ where
stream_thinning,
|t1, t2| t1.le(t2),
closure,
self.shutdown_token.clone(),
)
} else {
build_halfjoin(
Expand All @@ -228,6 +232,7 @@ where
stream_thinning,
|t1, t2| t1.lt(t2),
closure,
self.shutdown_token.clone(),
)
}
}
Expand Down Expand Up @@ -314,6 +319,7 @@ fn build_halfjoin<G, Tr, CF>(
prev_thinning: Vec<usize>,
comparison: CF,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, (Row, G::Timestamp), Diff>,
Collection<G, DataflowError, Diff>,
Expand Down Expand Up @@ -364,6 +370,10 @@ where
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit to also checking the token in the yield function? Might save a bunch of sorting and buffer manipulation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, how would that allow us to reduce sorting and buffer manipulation? Do you mean we could yield less often when the token was dropped and thereby avoid duplicate work?

I have been thinking about adding a token check to the yield function, to switch to time-based yielding when the token was dropped. But Frank's comment above makes me somewhat scared of messing with the yield function at all :D #8818 provides context on that comment. The TLDR is that specifying a bad yield function introduces the risk that the half join operator gets stuck because it yields before it was able to make any durable progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, for some reason I was thinking about that from the point of view of if that dataflow would eventually get dropped, which will not happen, so yielding on shutdown would be bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, unfortunately with this approach we still need to grind through all the join matches produced by input data that has made it to the operator so far. There is really no way around this, short of modifying the implementation in DD to do a token check as well. Or drop_dataflow of course :)

// shutting down.
shutdown_token.probe()?;

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]);
let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
Expand Down Expand Up @@ -396,6 +406,10 @@ where
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
// shutting down.
shutdown_token.probe()?;

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]);
let row = closure
Expand Down
10 changes: 6 additions & 4 deletions src/compute/src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ use crate::typedefs::{ErrSpine, RowKeySpine};
pub use context::CollectionBundle;
use context::{ArrangementFlavor, Context};

use self::context::ShutdownToken;

pub mod context;
mod errors;
mod flat_map;
Expand Down Expand Up @@ -299,7 +301,7 @@ pub fn build_compute_dataflow<A: Allocate>(
// Build declared objects.
for object in dataflow.objects_to_build {
let object_token = Rc::new(());
context.shutdown_token = Some(Rc::downgrade(&object_token));
context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token));
tokens.insert(object.id, object_token);

let bundle = context.render_recursive_plan(0, object.plan);
Expand Down Expand Up @@ -353,7 +355,7 @@ pub fn build_compute_dataflow<A: Allocate>(
// Build declared objects.
for object in dataflow.objects_to_build {
let object_token = Rc::new(());
context.shutdown_token = Some(Rc::downgrade(&object_token));
context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token));
tokens.insert(object.id, object_token);

context.build_object(object);
Expand Down Expand Up @@ -651,7 +653,7 @@ where

// Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
let mut oks = oks.consolidate_named::<RowKeySpine<_, _, _>>("LetRecConsolidation");
if let Some(token) = &self.shutdown_token {
if let Some(token) = &self.shutdown_token.get_inner() {
oks = oks.with_token(Weak::clone(token));
}
oks_v.set(&oks);
Expand All @@ -668,7 +670,7 @@ where
move |_k, _s, t| t.push(((), 1)),
)
.as_collection(|k, _| k.clone());
if let Some(token) = &self.shutdown_token {
if let Some(token) = &self.shutdown_token.get_inner() {
errs = errs.with_token(Weak::clone(token));
}
err_v.set(&errs);
Expand Down