Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
2 contributors

Users who have contributed to this file

@frankmcsherry @steveklabnik
262 lines (183 sloc) 16.9 KB

An introduction to timely dataflow, part 2.

In an introduction to timely dataflow, part 1, we went through how to write your first, second, and third timely dataflow programs. Mostly, we saw some examples of creating streams of data, putting them together, and (scary!) nesting them in various scopes.

Today we will look at writing super-powerful operators that do whatever you want to the streams. The explanations may be a little bit dry, unless you are excited about string-counting applications, but it contains some useful mechanisms we'll want to build on in part 3: doing a breadth-first traversal of a graph in timely dataflow.

Preface: some simplifications

Writing documentation and tests has the great property that you are forced to admit what a mess you've made of things. I have been trying to write both recently, which lead me to a few simpler ways to write example code. Because it really wasn't very simple before.

Let's do some stretching and warm up by walking through a really short timely dataflow program, whose shortness derives from some recent additions:

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("hello: {:?}", x));
    });
}

Ok wow very short.

The timely::example method lets you specify an action on a Scope which timely then runs single-threaded. You don't have to worry about starting up the computation, providing configuration arguments, figuring out that you should probably call scoped, etc. You just write a small amount of code using the scope, and then it gets run.

The new ToStream trait provides a to_stream method for any iterator, and it turns the iterator into a stream in whatever scope is passed as an argument. This makes it a lot easier to put together a functional example that actually does a thing. In this case, we should print ten lines to the console, for the numbers 0 through 9.

Review: pre-packaged operators

We've previously used a few different flavors of "operator": some method that acts on a Stream and produces a new Stream. For example, just above we used inspect, which is something I've written, and you only get to use as presented.

Now, inspecting things is fun. Maybe you've already realized that you can do more than println! there, and with the exchange operator we discussed previously, you can probably hack up something that moves data around and runs whatever code you like.

Why not just make it easier for you?

Pre-packaged generic operators

There are a few neat pre-packaged operators that make programming a lot easier. I use them myself! These operators take advantage of the fact that closures in Rust are pretty easy to use, and they result in the same performance as manually inlining code, which is great.

If you were writing your own operator, you basically just need to decide what to do for each batch of records you receive, right? I guess you also need to indicate how your input records should be partitioned, in case they aren't at the right worker yet. Delightfully, the generic operator unary_stream takes these parameters and makes an operator for you.

Here is the same code as above, but with our own inspect operator written in-place:

extern crate timely;

use timely::dataflow::operators::{ToStream, Unary};
use dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .unary_stream(Pipeline, "Inspect", |input, output| {
                   input.for_each(|time, data| {
                       let mut session = output.session(time);
                       for datum in data.iter() {
                           println!("hello: {:?}", datum);
                           session.give(datum);
                       }
                   })
               })
    });
}

All that we need to tell unary_stream is

  1. How we want to partition the data (e.g. Pipeline, which does no partitioning).
  2. What we should call this operator (useful for diagnostics).
  3. What the operator should do with its input and output when it gets a chance to run.

The Pipeline instruction is admittedly a bit dull, but we'll make that more exciting in a bit. The name can be whatever you like. Finally, the instructions are just as simple as writing some code that uses an input handle and an output handle. They each have some methods, but roughly the former is kind of like an iterator over pairs (Time, Vec<Data>), and the latter accepts timestamped data to produce as output.

The result of unary_stream is a Stream whose data reflect the data we handed to output. Not so hard! :D

Writing something more interesting: adding state

We haven't really solved the problem that inspect is pretty dull. Let's do something more interesting, where we suppress duplicate data with the same timestamp.

We are going to need some state for this, and it isn't clear where that state lives. When we just have a closure, we just write code. How do we remember what we've already seen, in case we need that information?

This part is great. At least, coming from C#, this is great. Rust's closures happily capture any state you define, and can take ownership of it. If you want a HashMap to differentiate data by timestamp, you just make a HashMap and start using it. Invariably Rust will complain about lifetimes, because this is what Rust does, but you just need to add the magic move keyword which roughly translates to: "I give up; please just take it".

Let's look at the example to print distinct integers from the input. I've changed the input to have repetitions, and there is certainly more logic in the operator now, but it is all fairly predictable logic, I think. You can play with it in examples/distinct.rs.

extern crate timely;

use std::collections::HashMap;

use timely::dataflow::operators::{ToStream, Exchange, Unary, Inspect};
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {
        let mut counts_by_time = HashMap::new();
        vec![0, 1, 2, 2, 2, 3, 3, 4]
            .into_iter()
            .to_stream(scope)
            .unary_stream(Pipeline, "Distinct", move |input, output| {
                input.for_each(|&time, data| {
                    let mut session = output.session(&time);
                    let mut counts = counts_by_time.entry(time)
                                                   .or_insert(HashMap::new());
                    for &datum in data.iter() {
                        let mut count = counts.entry(datum)
                                              .or_insert(0);
                        if *count == 0 {
                           session.give(datum);
                        }
                        *count += 1;
                    }
                })
            })
            .inspect(|x| println!("hello: {:?}", x));
    });
}

If we run this, we get output that looks like:

Echidnatron% cargo run --example distinct
   Compiling timely v0.0.10 (file:///Users/mcsherry/Projects/timely-dataflow)
hello: 0
hello: 1
hello: 2
hello: 3
hello: 4
Echidnatron%

Awesome! Distinct numbers! We are done, here. Ship it.

Actually there is a bit of a lie here. This works great on one worker, but we really need our distinct operator to shuffle the data so that like records go to the same worker. Otherwise, each worker will distinct their own input, but we may have multiple copies across the collection of workers. Fortunately, it is as easy as adding a

use timely::dataflow::channels::pact::Exchange;

and replacing the use of Pipeline with Exchange::new(|&x| x). Exchange just needs a function from a data reference to a u64, chosen so that records that should be grouped have the same result. All records then get routed to a worker determined by the output number (currently: the result mod the number of workers). By using the Exchange above, we just route records based on their value (they are each u64 values themselves).

Cleaning up after ourselves: notifications

This implementation does what it is supposed to, but it isn't as tidy as it could be. In particular, it leaves all the data it has ever seen in the counts_by_time hash map. That can get really large, right? All the data we have ever seen?

Yeah, it can get really large.

But, when should we clean it up? We can't just clear it at the end of the closure, because maybe we didn't get all of our data at once (a second worker may send us some more data in just a bit). If we clear the counts, we might repeat some data next time we get invoked. But we can't just wait until the end of the computation, because we might see an unboundedly large number of rounds of data.

We need to wait until we are sure that the operator will receive no more messages with a given time. Once that happens, we can pop the data for time from the counts_by_time hash map, allowing Rust to deallocate it.

But. How can we tell if we have seen all the messages or not? This is a global property of the system, rather than a local property of the operator (or even the worker). It kind of seems like we need some help to get this information.

Now is where the timely dataflow happens.

Timely dataflow happening

Pretty much everything up until this point was just us shoveling messages around, and anyone can shovel messages around. We need to start writing better programs, though, and this calls for a system with a broader set of features.

To get access to timely dataflow magic, we are going to start using the unary_notify operator. It is a lot like unary_stream, with the additional ability to request and receive notifications from the system about "progress" in its input stream of messages. The operator can ask to be notified when there will be no more messages with some timestamp, and the system will do exactly that.

The unary_notify operator takes similar arguments as unary_stream, with some slight differences. It requires:

  1. How we want to partition the data (e.g. Pipeline, Exchange).
  2. What we should call this operator (useful for diagnostics).
  3. An initial set of notification requests (usually empty).
  4. What the operator should do with its input, output, and notificator when it gets a chance to run.

Ok, great. Pretty much the same. Got it. WAIT WHAT IS A notificator?

It's all cool. A notificator is just the thing that your operator uses to ask for notifications, and which gives your operator any notifications that are ready to fire. It may sound weird, but it is pretty natural to use. And if it turns out there are better patterns, it's all pretty easy to change, too.

Here is a version of our distinct logic that cleans up counts_by_time once the system reports that the operator will receive no more data for a given time. There are four new lines; see if you can spot them all:

.unary_notify(Pipeline, "Distinct", vec![], move |input, output, notificator| {
    input.for_each(|&time, data| {
        notificator.notify_at(time);        // <--- NEW
        let mut session = output.session(&time);
        let mut counts = counts_by_time.entry(time)
                                       .or_insert(HashMap::new());
        for &datum in data.iter() {
            let mut count = counts.entry(datum)
                                  .or_insert(0);
            if *count == 0 {
               session.give(datum);
            }
            *count += 1;
        }
    });

    notificator.for_each(|time, _num| {     // <--- NEW
        counts_by_time.remove(time);        // <--- NEW
    })                                      // <--- NEW
})

All we had to do was ask to be notified for each time we might have to clean up, and then tell notificator to do the clean-up for each time that has completed. The only important thing to know is that the notificator will only respond to requests that you make. You have to explicitly make them. Sorry.

Behind the scenes, timely is keeping track of all of the outstanding messages (and notifications, because those can turn in to messages). It is doing this even if your operator doesn't need notifications, because other operators downstream might. This also means that you get to start using notifications right away; you don't have to revisit other parts of your program (or someone else's) and figure out if they understand notifications.

Counting words

Ok, String counting. Nothing we have written so far doesn't work for strings, other than that we need a better exchange function, because a String isn't a u64, and we need to output the counts rather than the distinct keys, which we can do in the notification.

We will have to write our own hash function, because Rust deprecated theirs (in fairness, it used SipHash which is stronger and slower than we often need).

fn hash<T: Hash>(item: &T) -> u64 {
    let mut h: fnv::FnvHasher = Default::default();
    item.hash(&mut h);
    h.finish()
}

Putting hash into the Exchange::new(), and sending the counts in the notification, our code now looks like this:

.unary_notify(Exchange::new(|x| hash(x)), "StringCount", vec![], move |input, output, notificator| {
    input.for_each(|&time, data| {
        notificator.notify_at(time);
        let mut counts = counts_by_time.entry(time)
                                       .or_insert(HashMap::new());
        for datum in data.drain(..) {
            *counts.entry(datum).or_insert(0) += 1;
        }
    });

    notificator.for_each(|time, _num| {
        if let Some(counts) = counts_by_time.remove(time) {
            output.session(time)
                  .give_iterator(counts.into_iter());
        }
    })
})

At this point, you should be able to connect the code to your favorite source of random strings, and get the counts out for each round of input. I would show you some output, but I don't want to encourage budding data scientists to go down the dark path of word counting. We'll get to something cooler in the next post.

A word about ownership

One slight but important change: we use a data.drain(..) instead of a data.iter(). Rust is very, very serious about ownership, and .iter() doesn't give us ownership of the contents of data, just references to the contents. That was ok in the case of u64 data, because Rust knows it can just copy the data. Now, with String data, Rust won't do that automatically, and we don't want to clone the String anyhow because that is expensive. We do need to get an owned String somehow, because counts.entry(x) needs ownership of x, and drain(..) moves ownership of the contents to us, leaving the array empty.

As an aside: why not just just drain(..) in the first place, way back up when we were counting machine words? It turns out that if timely can see that you don't need ownership of data, it can avoid copying data for you, and in the case of String data that means avoiding allocations. We could try harder and use a &str reference to test whether the String is in the HashMap first, and only copy the string if it is needed as a key; this would probably have better performing code in a distributed setting (where taking ownership means deserialization), but may not in a single-machine setting (where there is an allocated owned String ready for use to take).

There will need to be a post in the future about the ownership life-cycle of data in timely dataflow. Rust gives us the opportunity to do some pretty great things here, and I would say that timely is about 90% of the way there, with a few remaining memcpys it doesn't need.

Other generic operators

There are also binary_stream and binary_notify operators. They have two inputs, and need you to specify two partitioning functions, and a closure that takes two inputs (plus an output and maybe a notificator). Otherwise, they are pretty much what you would expect, I think. Actually, I have no idea what you would expect. I'll show you binary_notify in the next post, though.

Building more generic operators requires stepping in to the black pit that is actually writing raw timely dataflow code (manually connecting streams and operators). It's not that horrible, but it is a quantum leap more complicated than using the pre-fab solutions we've discussed so far.

Up next

In the next post we'll put together some of these pieces to build a small graph processor. It will only do fairly primitive things (e.g. breadth-first search), but it should give a feel for a more complete application.

We will also get a chance to try out a non-trivial program on input data of various scales, across varying number of workers. We'll start to be able to see what "costs" in timely, and how well we are equipped to fix those costs.

You can’t perform that action at this time.