Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dataflow] Fuel the arrangement to stream operator #7171

Merged
merged 4 commits into from Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
152 changes: 118 additions & 34 deletions src/dataflow/src/render/context.rs
Expand Up @@ -278,18 +278,22 @@ where
I::Item: Data,
L: for<'a, 'b> FnMut(RefOrMut<'b, V>, &'a S::Timestamp, &'a Diff) -> I + 'static,
{
// Set a number of tuples after which the operator should yield.
// This allows us to remain responsive even when enumerating a substantial
// arrangement, as well as provides time to accumulate our produced output.
let refuel = 1000000;
// If `key_val` is set, and we have the arrangement by that key, we should
// use that arrangement.
if let Some((key, val)) = key_val {
if let Some(flavor) = self.arrangement(&key) {
match flavor {
ArrangementFlavor::Local(oks, errs) => {
let oks = Self::flat_map_core(&oks, Some(val), logic);
let oks = Self::flat_map_core(&oks, Some(val), logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
return (oks, errs);
}
ArrangementFlavor::Trace(_, oks, errs) => {
let oks = Self::flat_map_core(&oks, Some(val), logic);
let oks = Self::flat_map_core(&oks, Some(val), logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
return (oks, errs);
}
Expand All @@ -304,12 +308,12 @@ where
if let Some(flavor) = self.arranged.values().next() {
match flavor {
ArrangementFlavor::Local(oks, errs) => {
let oks = Self::flat_map_core(&oks, None, logic);
let oks = Self::flat_map_core(&oks, None, logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
(oks, errs)
}
ArrangementFlavor::Trace(_, oks, errs) => {
let oks = Self::flat_map_core(&oks, None, logic);
let oks = Self::flat_map_core(&oks, None, logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
(oks, errs)
}
Expand All @@ -333,6 +337,7 @@ where
trace: &Arranged<S, Tr>,
key: Option<V>,
mut logic: L,
refuel: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
Tr: TraceReader<Key = V, Val = V, Time = S::Timestamp, R = repr::Diff> + Clone + 'static,
Expand All @@ -345,40 +350,41 @@ where
let mode = if key.is_some() { "index" } else { "scan" };
let name = format!("ArrangementFlatMap({})", mode);
use timely::dataflow::operators::Operator;
trace.stream.unary(Pipeline, &name, move |_, _| {
trace.stream.unary(Pipeline, &name, move |_, info| {
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = trace.stream.scope().activations();
let activator = Activator::new(&info.address[..], activations);
// Maintain a list of work to do, cursor to navigate and process.
let mut todo = std::collections::VecDeque::new();
move |input, output| {
// First, dequeue all batches.
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
let batch = &wrapper;
let mut cursor = batch.cursor();
if let Some(key) = &key {
cursor.seek_key(batch, key);
if cursor.get_key(batch) == Some(key) {
while let Some(val) = cursor.get_val(batch) {
cursor.map_times(batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
}
});
cursor.step_val(batch);
}
}
} else {
while let Some(_key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
cursor.map_times(batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
}
});
cursor.step_val(batch);
}
cursor.step_key(batch);
}
}
let capability = time.retain();
for batch in data.iter() {
// enqueue a capability, cursor, and batch.
todo.push_back(PendingWork::new(
capability.clone(),
batch.cursor(),
batch.clone(),
antiguru marked this conversation as resolved.
Show resolved Hide resolved
));
}
});

// Second, make progress on `todo`.
let mut fuel = refuel;
while !todo.is_empty() && fuel > 0 {
todo.front_mut()
.unwrap()
.do_work(&key, &mut logic, &mut fuel, output);
if fuel > 0 {
todo.pop_front();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider asserting here that the PendingWork removed is indeed completed. It looks like it is from the do_work implementation, but it's always good to avoid surprises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, some sort of assert that if you try and read from or step the cursor you get nothing back?

}
}
// If we have not finished all work, re-activate the operator.
if !todo.is_empty() {
activator.activate();
}
}
})
}
Expand Down Expand Up @@ -481,3 +487,81 @@ where
}
}
}

use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::Capability;
struct PendingWork<K, V, T: Timestamp, R, C: Cursor<K, V, T, R>> {
capability: Capability<T>,
cursor: C,
batch: C::Storage,
}

impl<K: PartialEq, V, T: Timestamp, R, C: Cursor<K, V, T, R>> PendingWork<K, V, T, R, C> {
/// Create a new bundle of pending work, from the capability, cursor, and backing storage.
fn new(capability: Capability<T>, cursor: C, batch: C::Storage) -> Self {
Self {
capability,
cursor,
batch,
}
}
/// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
fn do_work<I, L>(
&mut self,
key: &Option<K>,
logic: &mut L,
fuel: &mut usize,
output: &mut OutputHandle<
'_,
T,
I::Item,
timely::dataflow::channels::pushers::Tee<T, I::Item>,
>,
) where
I: IntoIterator,
I::Item: Data,
L: for<'a, 'b> FnMut(RefOrMut<'b, V>, &'a T, &'a R) -> I + 'static,
{
// Attempt to make progress on this batch.
let mut work: usize = 0;
let mut session = output.session(&self.capability);
if let Some(key) = key {
if self.cursor.get_key(&self.batch) != Some(key) {
self.cursor.seek_key(&self.batch, key);
}
if self.cursor.get_key(&self.batch) == Some(key) {
while let Some(val) = self.cursor.get_val(&self.batch) {
self.cursor.map_times(&self.batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
work += 1;
}
});
self.cursor.step_val(&self.batch);
if work >= *fuel {
*fuel = 0;
return;
}
}
}
} else {
while let Some(_key) = self.cursor.get_key(&self.batch) {
while let Some(val) = self.cursor.get_val(&self.batch) {
self.cursor.map_times(&self.batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
work += 1;
}
});
self.cursor.step_val(&self.batch);
if work >= *fuel {
*fuel = 0;
return;
}
}
self.cursor.step_key(&self.batch);
}
}
*fuel -= work;
}
}