Skip to content
Permalink
Browse files

update example

  • Loading branch information...
frankmcsherry committed Apr 3, 2019
1 parent 74c1356 commit 5287f4cdcb73dad37f3be1b9ed63d34839baf5a2
Showing with 56 additions and 36 deletions.
  1. +56 −36 mdbook/src/chapter_2/chapter_2_5.md
@@ -204,32 +204,40 @@ As before, I'm just going to show you the new code, which now lives just after `
// allocate operator-local storage.
let mut queues = HashMap::new();
let mut counts = HashMap::new();
let mut buffer = Vec::new();
move |input, output| {
// for each input batch, stash it at `time`.
while let Some((time, data)) = input.next() {
queues.entry(time.retain())
.or_insert(Vec::new())
.push(data.replace(Vec::new()));
.extend(data.replace(Vec::new()));
}
// for each stashed time, apply if ready.
for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for mut batch in val.drain(..) {
for (word, diff) in batch.drain(..) {
let entry = counts.entry(word.clone()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
// enable each stashed time if ready.
for (time, vals) in queues.iter_mut() {
if !input.frontier().less_equal(time.time()) {
let vals = std::mem::replace(vals, Vec::new());
buffer.push((time.clone(), vals));
}
}
// drop complete time and allocations.
queues.retain(|key, val| val.len() > 0);
queues.retain(|time, vals| vals.len() > 0);
// sort ready updates by time.
buffer.sort_by(|x,y| (x.0).time().cmp(&(y.0).time()));
// retire updates in time order.
for (time, mut vals) in buffer.drain(..) {
let mut session = output.session(&time);
for (word, diff) in vals.drain(..) {
let entry = counts.entry(word.clone()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
}
})
# .inspect(|x| println!("seen: {:?}", x))
@@ -272,13 +280,14 @@ Third and finally, we specify a closure. The closure has an argument, which we i
// allocate operator-local storage.
let mut queues = HashMap::new();
let mut counts = HashMap::new();
let mut buffer = Vec::new();
move |input, output| {
// coming soon!
}
```

The closure that we end up returning is the `|input, output|` closure. It describes what the operator would do when presented with a handle to the input and a handle to the output. We've also named two hash maps we will need, and provided the `move` keyword to Rust so that it knows that the resulting closure *owns* these hash maps, rather than *borrows* them.
The closure that we end up returning is the `|input, output|` closure. It describes what the operator would do when presented with a handle to the input and a handle to the output. We've also named two hash maps and a vector we will need, and provided the `move` keyword to Rust so that it knows that the resulting closure *owns* these hash maps, rather than *borrows* them.

Inside the closure, we do two things: (i) read inputs and (ii) update counts and send outputs. Let's do the input reading first:

@@ -287,46 +296,57 @@ Inside the closure, we do two things: (i) read inputs and (ii) update counts and
while let Some((time, data)) = input.next() {
queues.entry(time.retain())
.or_insert(Vec::new())
.push(data.replace(Vec::new()));
.extend(data.replace(Vec::new()));
}
```

The `input` handle has a `next` method, and it optionally returns a pair of `time` and `data`, representing a timely dataflow timestamp and a hunk of data bearing that timestamp, respectively. Our plan is to iterate through all available input (the `next` method doesn't block, it just returns `None` when it runs out of data), accepting it from the timely dataflow system and moving it into our `queue` hash map.
The `input` handle has a `next` method, and it optionally returns a pair of `time` and `data`, representing a timely dataflow timestamp and a hunk of data bearing that timestamp, respectively. Our plan is to iterate through all available input (the `next()` method doesn't block, it just returns `None` when it runs out of data), accepting it from the timely dataflow system and moving it into our `queue` hash map.

Why do we do this? Because this is a streaming system, we could be getting data out of order. Our goal is to update the counts in time order, and to do this we'll need to enqueue what we get until we also get word that the associated `time` is complete. That happens in the next hunk of code:
Why do we do this? Because this is a streaming system, we could be getting data out of order. Our goal is to update the counts in time order, and to do this we'll need to enqueue what we get until we also get word that the associated `time` is complete. That happens in the next few hunks of code

First, we extract those times and their data that are ready to go:

```rust,ignore
// for each stashed time, apply if ready.
for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for mut batch in val.drain(..) {
for (word, diff) in batch.drain(..) {
let entry = counts.entry(word.clone()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
// enable each stashed time if ready.
for (time, vals) in queues.iter_mut() {
if !input.frontier().less_equal(time.time()) {
let vals = std::mem::replace(vals, Vec::new());
buffer.push((time.clone(), vals));
}
}
```

Here we look through each `(key, val)` pair that we've queued up, where `key` was `time` before. We then check `input.frontier`, which is what tells us whether we might expect more times or not. The `input.frontier()` describes times we may yet see on the input; if it is `less_equal` to the time, then it is possible there might be more data.
Here we look through each `(time, vals)` pair that we've queued up. We then check `input.frontier`, which is what tells us whether we might expect more times or not. The `input.frontier()` describes times we may yet see on the input; if it is `less_equal` to the time, then it is possible there might be more data.

If the time is complete, we create a new output session from `output`. We need to specify the time for the output session, and so we use `key`. More importantly, this actually needs to be the same type as `time` from before; the system is smart and knows that if you drop all references to a time you cannot create new output sessions. It's a feature, not a bug.
If the time is complete, we extract the data and get ready to act on it. We don't actually act *yet*, because many times may become available at once, and we want to process them in order too. Before we do that, some housekeeping:

We then proceed through each of the batches we enqueue, and through each of the `(word, diff)` pairs in each of the batches. I've decided that what we are going to do is update the count and announce the new count, but you could probably imagine doing lots of different things here.
```rust,ignore
// drop complete time and allocations.
queues.retain(|time, vals| vals.len() > 0);
Finally, we do a surprisingly important thing, clean out empty hash table entries for complete times.
// sort ready updates by time.
buffer.sort_by(|x,y| (x.0).time().cmp(&(y.0).time()));
```

These calls clean up the `queues` hash map removing keys we are processing, and then sort `buffer` by time to make sure we process them in order. This first step is surprisingly important: the keys of this hash map are timestamps that can be used to send data, and we need to drop them for timely dataflow to understand that we give up the ability to send data at these times.

Finally, we drain `buffer` and process updates in time order

```rust,ignore
// drop complete time and allocations.
queues.retain(|key, val| val.len() > 0);
// retire updates in time order.
for (time, mut vals) in buffer.drain(..) {
let mut session = output.session(&time);
for (word, diff) in vals.drain(..) {
let entry = counts.entry(word.clone()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
```

It would have been great to do this as part of the loop up above, but Rust gets stressed out if you try to mutate the source of an iterator while iterating (thus preventing the problem of iterator invalidation). However, we *really* need to do this.
Here we process each time in order (we sorted them!). For each time, we create a new output session from `output` using `time` More importantly, this actually needs to be the same type as `time` from before; the system is smart and knows that if you drop all references to a time you cannot create new output sessions. It's a feature, not a bug.

Remember that `time` we capture as the key, and how it acts as our ability to send data at the time? Until we actually drop it, the timely dataflow system knows that we are able to send data at that time. We have the "capability" (a technical term) to send data at that time. This `retain` method drops `(key, val)` pairs with no data to send, which alerts timely dataflow to the fact that it can now proceed. The `probe.less_than` call down below now has a chance to succeed, and our computation can run to completion.
We then proceed through each of the batches we enqueue, and through each of the `(word, diff)` pairs in each of the batches. I've decided that what we are going to do is update the count and announce the new count, but you could probably imagine doing lots of different things here.

## The finished product

0 comments on commit 5287f4c

Please sign in to comment.
You can’t perform that action at this time.