Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
2 contributors

Users who have contributed to this file

@frankmcsherry @cmontella
439 lines (328 sloc) 22.8 KB

An introduction to timely dataflow

I realized that I haven't actually done a proper introduction to timely dataflow, as it exists in its Rust incarnation. I'm going to start to fix that today, showing most of the moving parts over a sequence of posts, with a bit of explanation where it makes sense. I should warn you that the code, documentation, tests are all alpha quality; you can (and should!) play around with them, but anything more than that and I'd be concerned.

The main goal of this project, other than to learn Rust (success!), is to put together a re-usable infrastructure for research in data-parallel computation. There are a bunch of non-goals, like to make programs as short as possible, or make a start-up worthy artifact, or to write a ton of incremental papers. But it is a good question to ask as we go: "tell me again, why are we doing all of this?"

Why we are doing this

The Naiad project argued that a large volume of work on data-parallel compute systems were basically re-doing the same things over and over, with minor variations with the newly popular flavor of the week (graphs, streams, columns, learning, temporal, various cross products, etc). The project made a bunch of headway into a re-usable infrastructure that was unfortunately a) built on C# which (unfairly) isn't super popular, and b) shot in the head about a year ago by MSR leadership (also not super popular).

Despite performing quite well, a fair bit of Naiad work did go in to fighting against C#'s performance characteristics. Rust has been a great relief since, in that I've been able to write a large volume of apparently (shush!) safe code, with the performance I expect, spending even less time debugging weird corner cases than in the higher-level languages. The amount of code (and coding) required for various projects has dropped substantially, and it is all faster.

I think Rust is a great opportunity for various big data systems to get their performance issues sorted out, without compromising on the high-level expressivity languages like Java or Scala offer. I'm hopeful that a sufficiently high-level yet performant re-usable core could make research a lot easier, in that folks won't have to continually re-implement things from scratch in mutually incompatible frameworks. That being said, many of them have strong incentives to do so, which we aren't going to solve today.

Also, I like learning more about how computers work, and building a low-latency, high-throughput, data-parallel compute engine is a pretty good way of doing that, it turns out. I'm a bunch better with computers than I was a year ago.

Hello world in timely

Let's start out with a relatively simple, fully functional hunk of code in timely dataflow. You can also check it out in examples/hello.rs in the timely repository.

extern crate timely;

use timely::dataflow::*;
use timely::dataflow::operators::*;

fn main() {
    // initializes and runs a timely dataflow computation
    timely::execute_from_args(std::env::args(), |computation| {

        // create a new input, and inspect its output
        let mut input = computation.scoped(move |scope| {
            let (input, stream) = scope.new_input();
            stream.inspect(|x| println!("hello {}", x));
            input
        });

        // introduce data and watch!
        for round in 0..10 {
            input.send(round);
            input.advance_to(round + 1);
            computation.step();
        }
    });
}

If you run this sweet program, you get deep insights like

Echidnatron% cargo run --example hello
     Running `target/debug/examples/hello`
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
Echidnatron%

Let's talk through what is going on here.

Initialization

The first things we do in a timely dataflow program, other than import some crates and modules, is pretty simple, but super-duper important:

extern crate timely;

use timely::dataflow::*;
use timely::dataflow::operators::*;

fn main() {
    // initializes and runs a timely dataflow computation
    timely::execute_from_args(std::env::args(), |computation| {

It doesn't look so important. Just, standard initialization boiler-plate.

It is totally important, pedagogically, but explaining why is a bit of a distraction now. One thing it does do, though, is get our program a handle to this computation thing, which is essentially the root of any timely dataflow computation. We will need it to build our dataflow graph.

Building a dataflow graph

We are building a dataflow engine (for now, "just because"), and so we need to explain what dataflow graph to build. The dataflow graph should describe where we want records to flow from and to, and in our case we want an input vertex and an inspect vertex, to produce the data and observe it (in the form of a println).

// create a new input, and inspect its output
let mut input = computation.scoped(move |scope| {
    let (input, stream) = scope.new_input();
    stream.inspect(|x| println!("hello {}", x));
    input
});

This looks a bit more complicated than that. What happened? In timely, dataflow graphs are all "scoped", meaning that there is a mandatory hierarchical structure to the graphs, roughly corresponding to the nested scopes you would find in an imperative program.

In this case, we start with a blank dataflow canvas, computation, but we want to have some operators (input and inspect) that we will use multiple times (a lot like the body of a for-loop). So we need to inform timely about this structure.

Within the scope, the new_input() method gives us a pair of input handle and data stream, the former is how our program will introduce records into the dataflow, and the latter is a handle to the dataflow stream, which we can use to define further dataflow computation. Importantly, at this point we don't have any data, we are just defining what should happen with it when it shows up. We return the input handle.

It might seem that you could totally dodge this "scope" complexity for a hello-world program. For example, many streaming systems just allow each operator to receive an arbitrary sequence of input records. This ends up being quite limiting, and being more explicit about different scopes allows us to write much cooler programs in timely, but later. Not yet. By the end of the post we'll be doing something most of them can't easily do.

Executing the dataflow graph

The last thing we do in the program, having defined that cool scoped dataflow graph, is to execute the computation. Because this is a dataflow computation, all that means is supplying input (and a little bit of control information):

// introduce data and watch!
for round in 0..10 {
    input.send(round);
    input.advance_to(round + 1);
    computation.step();
}

Here we are happily sending some records (the integers 0 up through 9) to the input. They'll make their way to the logic we defined in inspect, and will be printed. There are two other things going on here, though.

We also call input.advance_to(round + 1); which informs timely that we have finished sending data for everything up until round round + 1. This allows timely to do some internal logic, differentiate the data associated with different rounds, and generally be smarter. We aren't doing anything useful with this yet.

Finally, we call computation.step();, which tells timely to go and schedule each of the dataflow operators once. This results in input data making its way to inspect logic, and being consumed there. Actually, nothing bad happens in this example if we don't call this method, because...

Shutting down the dataflow graph

This is the most succinct part of the program.

   });
}

At the end of execute_from_args, timely will close any of your input handles, and step the computation until it is finished. "Finished" here means that there are no outstanding messages to process, and no dataflow operators have any reported work to perform.

These magical steps allow you to print the numbers 0..9. You're welcome.

Data-parallelism

Timely isn't just a dataflow engine, it is a data-parallel dataflow engine, meaning it is targeted at a particular class of dataflow problems where each operator (things like input, inspect, and others) can be implemented by partitioning their input records between a number of independent workers.

Let's return to that initialization fragment:

// initializes and runs a timely dataflow computation
timely::execute_from_args(std::env::args(), |computation| {

If we just needed a handle to computation, why not just make a method timely::get_computation()?

The fragment above forces us to define our computation as a closure, a method that timely can capture and invoke multiple times. Essentially, we are required to describe the instructions to run our computation, rather than just writing the code to go and do it. We do end up with the look and feel that we just wrote the code to go and do it, which is nice.

By capturing the instructions for our computation, timely can run the instructions for multiple workers. You just need to tell it how many workers to use. Let's try four!

Echidnatron% cargo run --example hello -- -w 4
     Running `target/debug/examples/hello -w 4`
hello 0
hello 0
hello 0
hello 0
hello 1
hello 1
hello 1
hello 2
hello 1
hello 2
hello 3
hello 2
...
hello 8
hello 9
hello 9
hello 9
Echidnatron%

Ok. Hard to tell if that was success or not. We do appear to have four workers each producing the numbers 0..9, and printing a bunch of things. They don't all seem to be in the right order.

It gets better than this, right?

Data exchange

To make things a bit more interesting, we'll want to have the workers interact. In a data-parallel dataflow setting, this means having them send data to one another. In timely this is really easy: each operator gets to specify how its input records should be partitioned, by providing a function from record to u64; each record will be routed to a worker based on the result u64, meaning records with the same value will be processed by the same worker.

To get a sense for this, we can use the exchange operator, which requires its input to be partitioned, but does nothing other than forward the records once they are partitioned. Let's tweak the example to use exchange, and also to print out the name of the worker processing the result:

let mut input = computation.scoped(move |scope| {
    let index = scope.index();
    let (input, stream) = scope.new_input();
    stream.exchange(|x| *x as u64)
          .inspect(move |x| println!("worker {}:\thello {}", index, x));
    input
});

As the data we are circulating are just integers, we can just use the integer itself as a u64 for the exchange operator.

If we run with four workers, using the -w flag, our output now looks like:

Echidnatron% cargo run --example hello -- -w 4
     Running `target/debug/examples/hello -w 4`
worker 0:	hello 0
worker 0:	hello 0
worker 0:	hello 0
worker 0:	hello 0
worker 1:	hello 1
worker 1:	hello 1
worker 1:	hello 1
worker 2:	hello 2
worker 2:	hello 2
worker 1:	hello 1
worker 3:	hello 3
worker 2:	hello 2
...
worker 0:	hello 8
worker 3:	hello 7
worker 3:	hello 7
worker 3:	hello 7
worker 1:	hello 9
worker 0:	hello 8
worker 0:	hello 8
worker 0:	hello 8
worker 1:	hello 9
worker 1:	hello 9
worker 1:	hello 9
Echidnatron%

Heeeey! Each of the records went to consistent workers! This may be interesting!

Distribution

Did we have to do all of this with just one process? No!

The -n and -p flags allow us to specify the total number of process and the index of each process, respectively. We can then spin up multiple processes with multiple workers (though the number of workers has to be the same across the process for now):

Echidnatron%  cargo run --example hello -- -n 2 -p 0 -w 2 &
              cargo run --example hello -- -n 2 -p 1 -w 2
[1] 18456
     Running `target/debug/examples/hello -n 2 -p 1 -w 2`
     Running `target/debug/examples/hello -n 2 -p 0 -w 2`
worker 2:	hello 2
worker 2:	hello 2
worker 3:	hello 3
worker 3:	hello 3
worker 2:	hello 6
worker 3:	hello 7
worker 2:	hello 6
worker 3:	hello 7
worker 0:	hello 0
...
worker 1:	hello 9
worker 3:	hello 3
worker 3:	hello 7
worker 3:	hello 3
worker 3:	hello 7
worker 2:	hello 2
worker 2:	hello 6
worker 2:	hello 2
worker 2:	hello 6
[1]  + done       cargo run --example hello -- -n 2 -p 0 -w 2
Echidnatron%

Behind the scenes, timely sets up network connections between your processes, handles all the data serialization, coordination, and shutting things down once the dataflow is complete. Sweet!

If you want to use multiple computers, or specify ports manually, you'll also need to use the -h option which allows you to specify a hostfile, containing lines like:

host0:port0
host1:port1
host2:port2
...
hostn:portn

for the various machines and ports you'd like to use. You'll also need to start up each process i on hosti for this to work out.

Coordination

The above example looks like a bit of a mess, because with multiple processes records just get received in weird orders and basically it is all chaos. Let's add a bit more structure in, by asking each of the workers to wait until each round is complete before advancing to the next.

The first change we'll need is to add a new dataflow operator probe which pays attention to the possible availability of messages at a point in the dataflow graph:

let (mut input, probe) = computation.scoped(move |scope| {
    let index = scope.index();
    let (input, stream) = scope.new_input();
    let probe = stream.exchange(|x| *x as u64)
                      .inspect(move |x| println!("worker {}:\thello {}", index, x))
                      .probe().0;
    (input, probe)
});

The probe method returns a pair (ProbeHelper, Stream), where the stream is just the data it takes as input, and the ProbeHelper is a handy device we can use to ask about progress in the stream. We just want the first of these two, which is why we have a .0 after the call to probe().

// introduce data and watch!
for round in 0..10 {
    input.send(round);
    input.advance_to(round + 1);
    while probe.le(&RootTimestamp::new(round)) {
        computation.step();
    }
}

Rather than step computation just once, we repeatedly step the computation until probe reports that it is no longer possible to see a message containing round (and with some weird RootTimestamp stuff; sorry).

Now our output looks like

Echidnatron%  cargo run --example hello -- -n 2 -p 0 -w 2 &  
              cargo run --example hello -- -n 2 -p 1 -w 2
[3] 20998
     Running `target/debug/examples/hello -n 2 -p 1 -w 2`
     Running `target/debug/examples/hello -n 2 -p 0 -w 2`
worker 0:	hello 0
worker 0:	hello 0
worker 0:	hello 0
worker 0:	hello 0
worker 1:	hello 1
worker 1:	hello 1
worker 1:	hello 1
worker 1:	hello 1
worker 2:	hello 2
worker 2:	hello 2
worker 2:	hello 2
worker 2:	hello 2
...
worker 0:	hello 8
worker 0:	hello 8
worker 0:	hello 8
worker 0:	hello 8
worker 1:	hello 9
worker 1:	hello 9
worker 1:	hello 9
worker 1:	hello 9
[2]  - done       cargo run --example hello -- -n 2 -p 0 -w 2
Echidnatron%

Ahhhhh, all nice and tidy.

Note: you don't have to use round in the probe.le call; you could use round - 5 to allow five outstanding epochs in case you can tolerate some slack. This can often be important for getting nice throughput out of a distributed implementation.

Something more interesting

Exchanging records and printing them out is a great way to spend time with friends, but we would probably like to do something more interesting. Let's flesh out our example with something cooler: Loops!

We are going to use mostly the same program as above, but with different core logic. Rather than inspect each record, we are going to increment it and push it around the loop. Since the value is now different, exchange should route it to a different worker. In this example it will go round and round for iterations many times, at which point it gets dropped.

// create a new input, and inspect its output
let mut input = computation.scoped(move |scope| {
    let (input, stream) = scope.new_input();
    let (helper, cycle) = scope.loop_variable(iterations, 1);
    stream.concat(&cycle)
          .exchange(|&x| x)
          .map(|x| x + 1)
          .connect_loop(helper);
    input
});

The first new thing is this example is the call to loop_variable. This tells timely we want to build a loop, and we need some help. We need help because we can only define operators on existing data streams, which would result in acyclic graphs. But, we want cycles.

The call to loop_variable tells timely that we need to build a loop and it had best give us back a data stream, and a way to set the input of that stream, which we will do later. Timely wants to know a few things about the loop: how many times do we plan to go around the loop (iterations) and by how much should the loop counter be incremented each time around (1).

We combine the loop variable with the input using concat, a simple operator which forwards records from either input, followed by the familiar exchange, the new map operator which in this case increments the record, and finally the connect_loop method, which attaches the output stream to the input of the loop variable. Not so terrible!

Running it!

We now have a new example program, examples/pingpong.rs, to try out! I'm going to run it with a single worker, multiple workers in the same process, and multiple workers across processes, to see how long it takes to do 1_000_000 iterations.

Echidnatron% time cargo run --release --example pingpong -- 1000000
     Running `target/release/examples/pingpong 1000000`
cargo run --release --example pingpong -- 1000000  1.43s user 0.16s system 100% cpu 1.589 total
Echidnatron%

So, about 1.6s to do one million iterations, or about 1.6 microseconds per iteration. Nothing much should be happening other than moving a buffer onto and off of a some shared queues, which suggests that there is probably some room for optimization. I'm a bit sloppy about allocations at the moment, which would tighten things a bit, I believe.

Let's look at multiple workers within the same process:

Echidnatron% time cargo run --release --example pingpong -- 1000000 -w 2
     Running `target/release/examples/pingpong 1000000 -w 2`
cargo run --release --example pingpong -- 1000000 -w 2  4.78s user 0.16s system 186% cpu 2.642 total
Echidnatron%

Not too bad. The time goes up, which is to be expected because data has to move between the worker threads.

How about multiple processes?

Echidnatron% time cargo run --release --example pingpong -- 1000000 -n 2 -p 0 -w 1 &
                  cargo run --release --example pingpong -- 1000000 -n 2 -p 1 -w 1
[1] 29444
     Running `target/release/examples/pingpong 1000000 -n 2 -p 0 -w 1`
     Running `target/release/examples/pingpong 1000000 -n 2 -p 1 -w 1`
cargo run --release --example pingpong -- 1000000 -n 2 -p 0 -w 1  48.94s user 10.11s system 119% cpu 49.285 total
Echidnatron%

This is a bit longer. About 49 seconds in total, or 49 microseconds per iteration. This is a fair bit more expensive despite being two local processes, because there are now quite a few kernel transitions involved in moving the data around using TCP, and six different threads involved (four new send and receive network threads).

Putting these together

Our two examples, hello and pingpong, have both been using one scope. That means that they each get an integer timestamp for their messages, which hello uses for the epoch of input, and pingpong uses to represent an iteration. What if we want to do both? Like, write something that takes multiple epochs of input integers and circulates each for some number of iterations?

// create a new input, and inspect its output
let (mut input, probe) = computation.scoped(move |outer| {
    let (input, stream) = outer.new_input();
    let result = outer.scoped(move |inner| {

        // construct the same loop as before, but bind the result
        // so we can both connect the loop, and return its output.
        let (helper, cycle) = inner.loop_variable(iterations, 1);
        let result = inner.enter(&stream)
                          .concat(&cycle)
                          .exchange(|&x| x)
                          .map(|x| x + 1);

        result.connect_loop(helper);
        result.leave()
    })

    (input, result.probe().0)
});

Hrm. Well now, this is more complicated.

The main thing to notice is that we are now using two calls to scoped. These two calls set up two scopes, which means that records now have both an "epoch" and an "iteration", independent of one another.

These two nested scopes also lead to the use of enter and leave. To use a stream within a scope it must be explicitly brought in, and enter does this. To take a stream from within a scope and make it available outside the scope, we use leave. These two operators do nothing to the data, but rather add and remove, respectively, iteration information from each record. We need to extract the result of the iterative subcomputation so that we can attach a probe to it, allowing us to see when a computation has completed.

Up next

We've now seen a bit about building up progressively weirder and weirder timely dataflow computations. However, we haven't gotten to one of the most important parts of timely dataflow, which is that the operators themselves can respond to the sort of progress information probe returns. This is a critical component of implementing "map-reduce" like computations, where some operators want to wait until they have received all of the data they are going to receive.

In the next post we will put together a streaming, incremental, word-count application to get a feel for how you can write your own operators. It will not be hard! (lots easier than Naiad, for sure). We will also take the opportunity to look at the resulting performance characteristics; will there be weird hiccups due to garbage collection? (spoiler alert: nope).

Until then, feel free to fire off any questions, complaints, things of that sort.

You can’t perform that action at this time.