Skip to content

Commit

Permalink
[dataflow] Fuel the arrangement to stream operator (#7171)
Browse files Browse the repository at this point in the history
* fuel the arrangement to stream operator

* clippy is a waste of ones life

* defend against uncertain cursor semantics

* test case from the #7171 PR

Co-authored-by: Philip Stoev <pstoev@materialize.io>
  • Loading branch information
frankmcsherry and philip-stoev committed Aug 5, 2021
1 parent ab67ecd commit dcd206e
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 34 deletions.
152 changes: 118 additions & 34 deletions src/dataflow/src/render/context.rs
Expand Up @@ -278,18 +278,22 @@ where
I::Item: Data,
L: for<'a, 'b> FnMut(RefOrMut<'b, V>, &'a S::Timestamp, &'a Diff) -> I + 'static,
{
// Set a number of tuples after which the operator should yield.
// This allows us to remain responsive even when enumerating a substantial
// arrangement, as well as provides time to accumulate our produced output.
let refuel = 1000000;
// If `key_val` is set, and we have the arrangement by that key, we should
// use that arrangement.
if let Some((key, val)) = key_val {
if let Some(flavor) = self.arrangement(&key) {
match flavor {
ArrangementFlavor::Local(oks, errs) => {
let oks = Self::flat_map_core(&oks, Some(val), logic);
let oks = Self::flat_map_core(&oks, Some(val), logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
return (oks, errs);
}
ArrangementFlavor::Trace(_, oks, errs) => {
let oks = Self::flat_map_core(&oks, Some(val), logic);
let oks = Self::flat_map_core(&oks, Some(val), logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
return (oks, errs);
}
Expand All @@ -304,12 +308,12 @@ where
if let Some(flavor) = self.arranged.values().next() {
match flavor {
ArrangementFlavor::Local(oks, errs) => {
let oks = Self::flat_map_core(&oks, None, logic);
let oks = Self::flat_map_core(&oks, None, logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
(oks, errs)
}
ArrangementFlavor::Trace(_, oks, errs) => {
let oks = Self::flat_map_core(&oks, None, logic);
let oks = Self::flat_map_core(&oks, None, logic, refuel);
let errs = errs.as_collection(|k, _v| k.clone());
(oks, errs)
}
Expand All @@ -333,6 +337,7 @@ where
trace: &Arranged<S, Tr>,
key: Option<V>,
mut logic: L,
refuel: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
Tr: TraceReader<Key = V, Val = V, Time = S::Timestamp, R = repr::Diff> + Clone + 'static,
Expand All @@ -345,40 +350,41 @@ where
let mode = if key.is_some() { "index" } else { "scan" };
let name = format!("ArrangementFlatMap({})", mode);
use timely::dataflow::operators::Operator;
trace.stream.unary(Pipeline, &name, move |_, _| {
trace.stream.unary(Pipeline, &name, move |_, info| {
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = trace.stream.scope().activations();
let activator = Activator::new(&info.address[..], activations);
// Maintain a list of work to do, cursor to navigate and process.
let mut todo = std::collections::VecDeque::new();
move |input, output| {
// First, dequeue all batches.
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
let batch = &wrapper;
let mut cursor = batch.cursor();
if let Some(key) = &key {
cursor.seek_key(batch, key);
if cursor.get_key(batch) == Some(key) {
while let Some(val) = cursor.get_val(batch) {
cursor.map_times(batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
}
});
cursor.step_val(batch);
}
}
} else {
while let Some(_key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
cursor.map_times(batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
}
});
cursor.step_val(batch);
}
cursor.step_key(batch);
}
}
let capability = time.retain();
for batch in data.iter() {
// enqueue a capability, cursor, and batch.
todo.push_back(PendingWork::new(
capability.clone(),
batch.cursor(),
batch.clone(),
));
}
});

// Second, make progress on `todo`.
let mut fuel = refuel;
while !todo.is_empty() && fuel > 0 {
todo.front_mut()
.unwrap()
.do_work(&key, &mut logic, &mut fuel, output);
if fuel > 0 {
todo.pop_front();
}
}
// If we have not finished all work, re-activate the operator.
if !todo.is_empty() {
activator.activate();
}
}
})
}
Expand Down Expand Up @@ -481,3 +487,81 @@ where
}
}
}

use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::Capability;
struct PendingWork<K, V, T: Timestamp, R, C: Cursor<K, V, T, R>> {
capability: Capability<T>,
cursor: C,
batch: C::Storage,
}

impl<K: PartialEq, V, T: Timestamp, R, C: Cursor<K, V, T, R>> PendingWork<K, V, T, R, C> {
/// Create a new bundle of pending work, from the capability, cursor, and backing storage.
fn new(capability: Capability<T>, cursor: C, batch: C::Storage) -> Self {
Self {
capability,
cursor,
batch,
}
}
/// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
fn do_work<I, L>(
&mut self,
key: &Option<K>,
logic: &mut L,
fuel: &mut usize,
output: &mut OutputHandle<
'_,
T,
I::Item,
timely::dataflow::channels::pushers::Tee<T, I::Item>,
>,
) where
I: IntoIterator,
I::Item: Data,
L: for<'a, 'b> FnMut(RefOrMut<'b, V>, &'a T, &'a R) -> I + 'static,
{
// Attempt to make progress on this batch.
let mut work: usize = 0;
let mut session = output.session(&self.capability);
if let Some(key) = key {
if self.cursor.get_key(&self.batch) != Some(key) {
self.cursor.seek_key(&self.batch, key);
}
if self.cursor.get_key(&self.batch) == Some(key) {
while let Some(val) = self.cursor.get_val(&self.batch) {
self.cursor.map_times(&self.batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
work += 1;
}
});
self.cursor.step_val(&self.batch);
if work >= *fuel {
*fuel = 0;
return;
}
}
}
} else {
while let Some(_key) = self.cursor.get_key(&self.batch) {
while let Some(val) = self.cursor.get_val(&self.batch) {
self.cursor.map_times(&self.batch, |time, diff| {
for datum in logic(RefOrMut::Ref(val), time, diff) {
session.give(datum);
work += 1;
}
});
self.cursor.step_val(&self.batch);
if work >= *fuel {
*fuel = 0;
return;
}
}
self.cursor.step_key(&self.batch);
}
}
*fuel -= work;
}
}

0 comments on commit dcd206e

Please sign in to comment.