Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
261 lines (165 sloc) 20.5 KB

An introduction to differential dataflow, part 1.

We are going to take a bit of a break from writing raw timely dataflow in Rust to look at one of the layers that lives on top of timely dataflow: differential dataflow.


I should start by saying that differential dataflow isn't brand new. We did this a few years back, and it already exists on top of Naiad. But, it turns out it could be done a lot better in Rust, and since it seems like everyone is still working with lower-tech stuff, it seemed worth revisiting and improving.


An Intro

Differential dataflow looks a lot like your favorite "high level analytics language" with things like map, filter, join, and group. It also has a slightly exotic iterate method that does a subcomputation multiple times, which lets you write things like iterative graph analyses, and we will talk about that.

However, what is really exciting about differential dataflow is that all of your computations are automatically updated when your inputs change. As in, immediately. Like, often in milliseconds. And not just counts, statistics, and histograms, but your weird iterative graph analysis too.

The example we used in the Naiad paper, which still holds today, was (not actually a quote):

Let's imagine you are Twitter and have a stream of social signals (@mentions, #hashtags) arriving in real-time. You want to know the most popular #hashtags aggregated not just by static demographic, but by emergent structure of the @mention graph, e.g. perhaps by [strongly] connected component, in real-time.

We will do that. I mean, technically we already did that, but we will do it again but with more explanation. Except the part where we use Twitter's data, because apparently that part was really quite expensive.

They could do it, though; that might be cool, right?


To get started, we are going to look at our favorite problem of breadth-first search, write it out in differential dataflow, and then see how quickly it responds to incremental changes to the input graph. It is going to respond alarmingly quickly, but we are going to have to wait for a future post to explain why it goes so fast.

In this post I mostly want to get you thinking that re-computing your big data analytics from scratch each time you get a little bit more data is a bad use of everyone's time.

If someone shows you something that isn't updating in real-time, you should ask "why not?".

Programming model

Differential dataflow provides a functional, collection-oriented programming model. This means that the variables you work with are collections of data, which you transform into new collections using differential dataflow operators.

The "functional" and "collection-oriented" parts are important, not just fun buzz-words. They are what allow us to automatically update computations. More on this in a bit.

Many differential dataflow operators are simply borrowed from timely dataflow:

  • map: applies a function to each element of the collection resulting in a new output collection.
  • filter: restricts a collection to those elements satisfying the predicate.
  • concat: merges two collections by adding the counts for each record (allowing multiplicities).

There are some new operators as well:

  • join takes two collections, of (K,V1) and (K,V2) data, and produces a collection of (K,V1,V2) data for each matching pair.
  • group takes a collection of (K,V) data and a function from a K and a list [V] to some output list [R], which it applies on each key group, and produces a collection of (K,R) data.

You can make more if you want, but you have to be smart-ish. Let's just use these for now.

Finally, there is the awesome and powerful iterate operator, which can be applied to any collection, and which takes as an argument a function from that collection type to that same collection type, and which you should think of as applying that function a large number of times. Basically, until it stops changing.

Differential dataflow is the one of the few frameworks where writing a loop as a fixed-point iteration ends up being a performance optimization. Language nerds, rejoice! Now stop rejoicing and get back to work on non-lexical scoping in Rust.

This isn't a super complicated language; it may not look particularly different from other languages you like, which is a good thing. You could probably imagine coding up an execution layer in some imperative language pretty easily, using arrays and stuff. You might even imagine coding it up in timely dataflow, because most of the operators are data-parallel!

Or you might code it up in something like Spark. Keep that thought in mind as we get to the eval section. Treasure your innocence.

An example! Breadth-first search

Let's walk through a short example to see that we understand what programs look like. There are going to be a few syntactic lies told, in the interest of not distracting you; the honest version is that there are still some leaky abstractions and ergonomic questions, all of which can and should be fixed. The differential dataflow bfs code is available for reference, but let's talk through things first.

You may recall breadth-first search from the previous post. We get as input a collection of edges, and we want to determine the shortest distances from each of the nodes in the graph to some specified root.

The algorithm we considered (and what we are going to use here, adapted) is to repeatedly activate nodes once they are reachable from the root, and then activate their neighbors. This algorithm is fine, but it isn't very "collection oriented" yet; it has a bunch of secret state baked in to the operators, and we just wrote some imperative code to make sure we sent the right messages around. We don't really have a good basis to update a computation if the inputs change.

Collection orientation

Let's reframe our computation as determining the collection of (node, dist) pairs, where each pair indicates that node can be reached in dist steps from the root. If a node isn't reachable, we just don't have a pair with its name.

We'd really like to end up with the final distances for each reachable node, but we are going to have to start simpler, by defining an initial collection of distances, and explaining how to repeatedly update a collection of distances.

We know how to create an initial version of this collection, just by starting with the single pair (root, 0). This means that root is at distance zero from something (itself), and all other nodes are not yet reachable.

We will need to use iterate to repeatedly expand and improve this collection. So, we need to think out how to take a collection of (node, dist) pairs and turn it in to a new collection that reflects some sort of progress. What ever we do should have the property that if we do it to the collection enough, we end up at the right answer.

How about we take the (node, dist) pairs, join them with the set of (node, next) edges, and propose (next, dist + 1) as a distance that might be interesting for next.

let nexts = dists.join(&edges)
                 .map(|(node, dist, next)| (next, dist + 1));

If you've used differential dataflow (no you haven't), you know that I have lied just a little bit with this fragment. The map operator is a touch more verbose, because it is borrowed from timely dataflow and currently leaks some of the differential dataflow abstraction (map actually operates on pairs (data, diff), but we don't touch the diff part because map is linear; happier now? :)).

We should merge the proposals nexts with the current distances dists, and let each of the nodes pick out their favorite (smallest) distance.

nexts.concat(&dists)
     .group(|node, ds| ds.min().map(|(d,_)| (d,1)))

Whoa there, that concat looks ok, but what is all this madness in the group operator?

The group operator is applied to a collection of pairs (K,V) where these are here (node, dist), and asks you for a function from K and MyIterator<V> (which is just some specific type of iterator over values) to an I: Iterator<Item=R> (which gets to be your specific type of iterator over output values). It produces the (K,R) pairs corresponding to the values that you produced.

So, in that fragment up above, we are just taking the smallest distance, and then doing something with a map. Here we have again the leaky abstraction: differential dataflow likes telling you how many times a record appears, and wants you to do the same for it. We are just saying "however many times this minimum occurs, we produce exactly one instance as output".

These four steps, join, map, concat, and group, take an input collection dists and update it to a "more complete" output collection. Just "more complete" though, not "totally complete".

Iterating

Can we wrap our update rule in some sort of iterative logic to get to the right answer? Of course!

let final = roots.iterate(|dists|
    dists.join(&edges)
         .map(|(node, dist, next)| (next, dist + 1))
         .concat(&dists)
         .group(|node, ds| ds.min().map(|(d,_)| (d,1)))
)

There is still a bit of syntactic mis-representation here, but the actual code isn't all that much more complicated. Sorting out the ergonomics is tricky.

Trying it out

If you know anything about high-level declarative languages, you probably know that they can be hilariously slow. Especially when compared with code that just directly manipulates arrays and bits and systems stuff.

This probably isn't the fastest way to write breadth-first search (it really isn't), but just how slow is it? We have a bunch of join and group operators, and we just ask the system to iterate some number of times. We didn't even say how many times, did we? This could be bad.

The secret of differential dataflow is in the name. Its operators are all rigged to only communicate changes to underlying collections, so even if we iterate infinity-many times (read: u64::max_value()), at some point things stop changing and so we just stop doing work. Even while we are iterating, each nodes distance only "changes" once, from not-reachable to reachable-at-dist; it doesn't improve in later rounds because distances only increase.

Let's try it out with a random graph with 10M nodes and 100M edges. This is something that our previous "raw timely dataflow" version of breadth-first search takes about 12.7 seconds to process, including data generation and sorting the edges.

If we try the same number of nodes and edges here, we see

Echidnatron% cargo run --release --example bfs -- 10000000 100000000 1
     Running `target/release/examples/bfs 10000000 100000000 1`
performing BFS on 10000000 nodes, 100000000 edges:
loaded; elapsed: 4.801791439997032s
stable; elapsed: 63.82679642300354s

Oof. That is a bunch slower. But, given that we could have been iterating that loop infinity-many times, this isn't horrible. That version you hypothetically wrote with arrays and stuff is probably not very hypothetically good either.

Actually, it turns out that the increased time is largely due to Rust's HashMap implementation. If we don't know anything about the key type K, we need to use a HashMap, and it is just kinda slow. If we want to do what we did in the previous program, exploit the fact that the integers are small and dense to use an array rather than a HashMap, we can just tweak the program by adding the _u suffix to the join and group operators; this tells differential dataflow to use a different index structure: A Vec<Option<_>>.

Echidnatron% cargo run --release --example bfs -- 10000000 100000000 1
     Running `target/release/examples/bfs 10000000 100000000 1`
performing BFS on 10000000 nodes, 100000000 edges:
loaded; elapsed: 4.937238768994575s
stable; elapsed: 28.822608040005434s

Ok, this is better. The times still aren't as great as timely dataflow, but that sort of makes sense because it turns out we are doing a bit more than the timely dataflow code.

Apropos that, you might be thinking "stable" is a weird way to describe "being done", but that is because we aren't actually done. We are just getting started.

Incremental updates

The third argument being passed to the example (go look!) is the number of input edges to randomly change at a time. Wait, wut?

The next part of the experiment is to run a sequence of "waves", where in each wave we add and remove that many (here 1) edges to and from the input edge collection. We record the time from changed edge introduction to timely dataflow confirming that we've received all output changes, divided by the number of changes (here 1) to learn about the throughput.

wave 0: avg 0.000047909998102113605
wave 1: avg 0.000024145003408193588
wave 2: avg 0.00004954898031428456
wave 3: avg 0.00010580199887044728
wave 4: avg 0.00005632700049318373
wave 5: avg 0.00006223400123417377
wave 6: avg 0.00005333099397830665
wave 7: avg 0.000053544004913419485
wave 8: avg 0.00006352699710987508
wave 9: avg 0.000060170015785843134

Those measurements are in seconds, and a bit hard to read. They range from 24.1 to 105.8 microseconds. This means between 10,000-40,000 atomic edge updates per second, pushed through a computation. Not just into a table, not just updating an aggregate, but through several operators around a loop multiple times and back out again, explaining the changes to previous reported outputs.

Here is an example of the output data produced (I tried to find an interesting wave; most have fewer updates):

wave 129:   [((3269936, 7), -1),
             ((3269936, 8), 1),
             ((3529000, 7), -1),
             ((3529000, 8), 1),
             ((4500387, 7), -1),
             ((4500387, 8), 1),
             ((8804446, 7), -1),
             ((8804446, 8), 1),
             ((9406134, 6), -1),
             ((9406134, 8), 1)]

What this is telling us is that five nodes had their distances changed, mostly from seven to eight it seems, but also one from six to eight. Probably that six-to-eight vertex had edges to the others, was providing them with a distance seven path, but now they have to fall back on other neighbors.

What the output isn't telling us is anything about the ten million-ish other distances that didn't change. That probably saved a lot of time.

Thanks differential dataflow!

At this point, this is supposed to sound magical and wonderful. The details are indeed cool and math-y and we are going to get to them, but for now I'm just trying to get you emotionally invested in knowing how this works.

Throughput

If we want to increase the throughput, we can increase the batch size (here from 1 to 1000). We are reporting times divided by the batch size, so the printed numbers are smaller despite each wave itself taking more time, just to be clear.

Echidnatron% cargo run --release --example bfs -- 10000000 100000000 1000
     Running `target/release/examples/bfs 10000000 100000000 1000`
performing BFS on 10000000 nodes, 100000000 edges:
loaded; elapsed: 4.485242634982569s
stable; elapsed: 25.588021253002807s
wave 0: avg 0.000009328773012384773
wave 1: avg 0.000010204062011325732
wave 2: avg 0.000014952600002288818
wave 3: avg 0.000012570315011544153
wave 4: avg 0.000015275064011802896
wave 5: avg 0.000016834694019053132
wave 6: avg 0.000015861994004808367
wave 7: avg 0.000013577864010585471
wave 8: avg 0.000016054217994678765
wave 9: avg 0.00001773550600046292

These now range from 9.32 to 17.7 microseconds, which is smaller than above. They also represent more work in total, so it isn't exactly correct to just compare the numbers.

If we want increase the throughput even more, we can bring in a second thread.

Echidnatron% cargo run --release --example bfs -- 10000000 100000000 1000 -w 2
     Running `target/release/examples/bfs 10000000 100000000 1000 -w 2`
performing BFS on 10000000 nodes, 100000000 edges:
loaded; elapsed: 4.747215053997934s
stable; elapsed: 16.79982848902s
wave 0: avg 0.000004792938998434693
wave 1: avg 0.000005150135984877125
wave 2: avg 0.000008665696979733184
wave 3: avg 0.000005509358015842736
wave 4: avg 0.000008675559016410262
wave 5: avg 0.000011210227006813511
wave 6: avg 0.00001276182199944742
wave 7: avg 0.000009505762020125985
wave 8: avg 0.000009433202998479828
wave 9: avg 0.000008076456986600534

We are now at between 5 and 12 microseconds per edge update, on average, meaning 80,000-200,000 updates per second. The batches are 1000 elements, so we aren't seeing the isolated effects of single edge updates, but the improved throughput may be worth it depending on what you actually need (high resolution vs high throughput).

If you want to start bringing in more processors to increase the throughput, we'll need to take the batch size up a bunch. Timely's network stack isn't in the small-microseconds roundtrip category yet, but it could be with some love (and user-space networking).

Caveats

These numbers probably sound pretty good. They are meant to get you excited about the idea of a framework that lets you automatically update your computation. This sort of thing is very possible, and can be very efficient.

To be clear and honest, there are a few things to know:

  1. These measurements are with a version of differential dataflow that doesn't think too hard about garbage collection. Each wave of 1000 updates seems to add about 300KB to the memory footprint. With time, the system starts to slow (both because of the memory spent, and because there are more updates to crawl through). There are nice ways to deal with this that smooth out the footprint and execution time, but they aren't live yet.

  2. The average update time includes updates that produce no changes in the output. These updates still require some work, pushing around new distance options, but not every update results in a major change (or any change) to the output distances. This is especially true for breadth-first search in a random graph.

  3. There are some bigger, low-probability spikes that you don't see above because I'm just printing a small set of numbers. Differential dataflow will correctly update your computation, and if something bad happens like we add or remove an edge near the root (bad luck!), we will need to do a fair bit of work. No free lunch here.

  4. Breadth-first search is just a nice problem. It has a fairly simple description in differential dataflow, and it fits nicely with differential dataflow's incremental execution strategy. Many other algorithms have similarly nice properties, but there are plenty of others that don't really benefit from incremental re-evaluation. Generally, combinatorial algorithms fit better than things like stochastic gradient maybe-this-will-tell-cats-from-dogs-idk descent.

You can see a few of the points above in this cdf of the latencies for the first, second, and third million single-edge updates: they gets a bit slower, and there is a pronounced low-probability tail, but it isn't all that bad (99.9% of the updates are visible; the max was 6s which happened once).

Latency cdfs

Neat, millions of waves of updates. If you tried to grab this data using a system where each wave takes a second or so, you'd need about 11 days. I got the measurements above in just a few minutes.

Next steps

There are other computations beyond breadth-first search for which differential dataflow works well. Examples include our motivating example of aggregations by connected component, but also things like stable marriage, network flow, graph coloring and clustering.

Having gotten you all excited about the possibility of writing data analytics that automatically update in near real-time, we are going to flesh out what the moving parts actually look like a bit more in the next post. Then, if we are especially brave, we'll look in to the mathematics and datastructures underlying differential dataflow.

As always, questions, requests, and sources of delightful data are always welcome.