Permalink
Find file
ef143c0 Jan 4, 2017
@frankmcsherry @klucar
621 lines (397 sloc) 45.8 KB

Dataflow as Database

This post starts from a recent citation Naiad got, from the Weaver paper in VLDB2016. We are going to start with the traditional whinging about science and related work sections and such, but then build up to answer some questions about whether dataflow of functional operators is a suitable way to implement certain database fundamentals. At least, we will try and put the questions in some context, and leave the door open for them to be answered.

Then we will write and measure some differential dataflow code for a transactional analytic graph store. Kinda.

As a warning, I get that lot of you work on databases and know lots about databases and I may be grossly oversimplifying many important things in this post. The goal of the post really isn't (as much as it might sound like it) to trivialize work in this space, so much as to think out loud about some simpler ways we could approach problems, cleaning up a bit of mess for several use cases. The ideas here are probably very bad for e.g. (i) geo-distributed databases, (ii) exotic fault-tolerance, (iii) actual databases people need to use; you are totally welcome to point this out, but no prizes will be awarded.

Background and motivation.

The Weaver paper is from the normally responsible HyperDex folks at Cornell, lead by the irrepressible @el33th4xor Emin Gün Sirer, who can be relied on to say very amusing things about MongoDB. More recently, that the DAO isn't what he would have done with 150 million dollars (my guess: protein powder).

Their abstract indicates that the graph processing space is bereft of usable solutions:

Graph databases have become a common infrastructure component. Yet existing systems either operate on offline snapshots, provide weak consistency guarantees, or use expensive concurrency control techniques that limit performance.

Further along, we have the following lament:

Lightweight techniques for modifying and querying a distributed graph with strong consistency guarantees have proved elusive thus far.

I know of at least one system that allows you to modify and query a distributed graph with strong consistency guarantees: Naiad. They know about it too, as it appears in their related work as citation [44] in this well-though out and surely very accurate list:

Offline Graph Processing Systems: Google’s Pregel [41] computation paradigm has spurred a spate of recent systems [3, 53, 44, 26, 35, 52, 49, 64, 43, 14, 27, 68, 63, 12, 13, 4, 66, 65, 67, 60, 29, 32, 16] designed for offline processing of large graphs. Such systems do not support the rich property graph abstraction, transactional updates, and lookup operations of a typical graph database.

If you are an even remotely skeptical person (hello, VLDB reviewers?), you might wonder why the abstract of the SOSP paper says it does

... such diverse tasks as streaming data analysis, iterative machine learning, and interactive graph mining.

or why the headline workload in the Naiad paper is interactive access to continually updated consistent views of derived data from a stream of Twitter mentions. Mysterious!

So Weaver's description is a bit crap. I mentioned this to Gün last year when Weaver landed on the Arxiv, and sent them some stats on how timely dataflow performed on their workloads (better, iirc? gmail deleted the email for me). Gün totally said not to worry; he'd fix it, from which we can probably conclude that Gün uses MongoDB for his todo list.

Graph Databases

All snark aside, it is totally fair to ask whether Naiad or other things like it are "a graph database".

No. Neither Naiad nor timely dataflow are a graph database.

But, they could be suitable foundations for building a graph database (or analytic database generally), which is the point I'll try and make in this post. More generally, this post is about re-thinking how databases fit into our compute environments: do we really need to reduce state management down to a monolithic key-value store with some weird-ass consistency properties, or should we try to capture higher level information about the computations that define our data?

This post is going to try out the idea that we might design a database as a functional data-parallel computation mapped over an input sequence of transactions. Which probably has lots of issues, but thinking them out is going to be good for your brain.

Let's start with some concrete questions, before building up to something more interesting: Does Naiad support (i) the rich property graph abstraction, (ii) transactional updates, and (iii) lookup operations? What do these even mean?

(i) Property graphs

This is the least interesting question, in terms of "what makes a database", but we should talk it through anyhow.

The property graph abstraction is just a way of binding attributes and values to things in your dataset, graph edges in this case, and is pretty easy to do in any relational setting. In addition to defining edges (src,dst) you can have some other relations containing tuples like

((src, dst), color, "red")
((src, dst), style, "dashed")

which add properties to the edges they name. Depending on how strongly you care, these tuples can either be strongly typed and in their respective relations, e.g.

let color: Collection<(Edge, ColorEnum)> = ...;
let style: Collection<(Edge, StyleEnum)> = ...;

or you can put them all in a big pile using strings to identify both the property name and its value, e.g.

let props: Collection<(Edge, String, String)> = ...;

When you write your computations, you can use these properties. For example, if you want the out-degrees of your graph restricted to red edges, you might write:

color.filter(|(e,c)| c == ColorEnum::Red)
     .map(|(s,d)| s)
     .count();

Maybe you would join color with edges to make sure the edge exists, or with some other properties to get more advanced information (red and dashed edges). You can even join the colors with each other and connect endpoints of edges with the same color. Why wouldn't you want to do that?

It's all just data, and you can use whatever computation you would like with it.

(ii) Transactional updates

Naiad, differential dataflow specifically, takes as input a sequence of updates to apply to its input collections. These updates are applied in this sequence, and any data derived from these inputs are also updated, to correspond exactly to the updated inputs. The system produces as outputs what those of us not in the database community might call "the correct answer".

For various reasons, the database community doesn't subscribe to the "correct answer" school of thought, because correctness is just like your point of view, man. Instead, the community has some 31 flavors of "consistency", which are each a distinct rationalization for the incorrect answer you might have just gotten back, motivated mostly by potential performance issues. This approach has been wildly successful, and the distributed database landscape is exactly the paragon of clarity, reliability, and robustness you might imagine it would be.

Instead of "correct", the database community speaks of ACID transactions, for atomicity, consistency, isolation, and durability. The first three are roughly "correctness", in that violating any of them corresponds to not computing the correct answer, or screwing things up generally. Durability is more interesting, and is about ensuring that before you provide an output, you must be sure that subsequent outputs will reflect it even if someone pulls the rug out from under you or your computer. This is often where things go wrong, because computer failures are often rather surprising.

Durability seems much more manageable for functional computations, in which the output is fully defined by computation and input. In such a setting, we can start to report outputs as soon as the corresponding inputs are made durable. You might get a bit stressed out that this implies a recovery strategy of replaying all of history, but this isn't actually the case, and we will say a bit about this later on. All that I mean to say here is that durability is a lot less scary when your output only reflects the input, and not weird non-deterministic choices your system made along the way.

(iii) Look-ups

But databases aren't about computation, they are about storing data and getting it back later, right?

This is also just computation: a sequence of read and write requests has (or should have) a well-defined answer for what you get back for each read. You could implement this computation using a database, but there are other ways. You could almost certainly write the sequential code to process such a sequence of reads and writes, probably using something like a HashMap to store written values.

Here is a way to implement reads and writes to a common store, using differential dataflow:

// input handles for reads and writes.
let (rhandle, reads) = scope.input();
let (whandle, writes) = scope.input();

// report writes with a corresponding read
writes.semijoin(&reads)
      .inspect(|x| println!("read: {:?}", x));

The semijoin operator pulls out (key, val) entries from writes whose key appears in reads.

As you add entries to reads the correct answer changes to include the corresponding entries in writes, just like you want to look up values in a database. Moreover, as you add entries to writes the correct answer changes to include those entries who have existing keys in read, so this is actually more like a primitive pub-sub system. You can turn it in to a look-up by unsubscribing once you've gotten what you need (sending a negative weighted key into reads).

An example

Let's work through a simple example, where we build a graph database that can respond to transactional updates to its inputs, allows you to query and monitor graph neighborhoods, and also for kicks allows you to perform reachability queries from nodes and monitor their results.

The code we will discuss is about 40 lines (plus several driver lines), and you can get it in the differential dataflow repository at examples/weaver.rs.

Transactional updates

The first thing we'll do is define an input for our transactional updates.

// transactional updates to edge collection.
let (trans_input, trans) = scope.new_input();
let trans = trans.as_collection();

Ok, super easy and just as uninformative.

I've decided that our transactional updates are going to be represented by a bunch of tuples (u64, bool, Edge) whose fields indicate, respectively

  1. a transaction identifier,
  2. whether this is a read (true) or a write (false), and
  3. if it is a read what value we require, and if a write what value to write.

We will keep all writes with a given transaction id exactly when all of the reads with that same transaction id are still valid. In our case we are using multisets, so a read being valid means that the record read still exists; we will abort transactions with any reads of records that are no longer present.

We could redo most of what follows with key-value pairs if we wanted to. In some sense, despite using multisets differential dataflow is key-value based, just the values are the integer counts for each domain element: sometimes positive, often zero.


So, imagining we get a bunch of reads and writes and based on what we see we need to commit some of the writes, where do we commit them to?

We need to define a loop variable, a stream of data that can be accessed immediately, but whose definition we can defer until later. This is how timely dataflow lets you create cycles in your dataflow, allowing us to use a collection of data in its own definition. Before we use it, we have to define it:

// graph contains committed edges, will persist across epochs.
let (handle, graph) = scope.loop_variable(u64::max_value(), 1);
let graph = graph.as_collection();

All that we have said here is that we want graph to be a collection that we can use now and define later (said in Hans and Franz voice for extra awesome).

We have also indicated a few things about how the dataset interacts with times: the data circulates for u64::max_value() iterations, each time incrementing its index by 1. This is timely dataflow's way of making sure that when we use the data, we only get to see the past. This ensures that there are no cyclic definitions in our computation; we may use the same name, graph, but we are referring to a previous instance of that data.


The next thing to do is determine, for some existing graph, how would we like to define the next version of graph. We will want to determine which transactions we should commit and which we should abort. Let's determine which to abort because that is a bit easier.

The aborts collection will contain transaction ids that we should abort, because they have a read that is no longer valid.

// transactions with reads not found in the edge store
let aborts = edges.filter(|x| x.1)
                  .map(|x| (x.2, x.0))
                  .antijoin(&graph)
                  .map(|x| x.0)
                  .distinct();

The computation isn't all that weird: we filter down to just reads, and transform the records to be pairs (edge, t_id). We then antijoin this with the graph itself; an antijoin is like the opposite of a semijoin: it passes records whose key is not present in the second argument. In this case, we will pass read requests that do not appear in the graph dataset because those are the ones that should cause an abort. We pull out the transaction id and call distinct because of how we will use aborts.

// restrict writes to those that with no bad reads.
let commits = edges.filter(|x| !x.1)
                   .map(|x| (x.0, x.2))
                   .antijoin(&aborts)
                   .map(|(_,edge)| edge);

The aborts collection is just used to filter the set of writes. Again we use antijoin, to pick out those writes whose transaction id is not present in aborts; those writes can be committed.


Finally, we want to commit these writes. This is just a matter of merging the committed writes with the existing graph data, and binding that to be the definition of the next instance of graph:

// set `graph` to be committed writes.
commits.concat(&graph).inner.connect_loop(handle);

That's it! A transactionally updated graph database.*

*: Well, transactional if there is just one transaction per batch, and you ignore durability for now. We'll get to doing multiple transactions per batch in a bit, and durability is a great concept to think about, philosophically!

Look-ups

What if we want to look at things in our graph database? There are a few choices we have, and the simplest is just to use differential dataflow operators:

// read queries against edge collection
let (query_input, query) = scope.new_input();
let query = query.as_collection();

// query the graph, return a probe.
let query_probe = graph.semijoin(&query)
                       .inspect(|x| println!("query result: {:?}", x))
                       .probe().0;

This fragment puts together a query input which we will feed with node identifiers. The node identifiers get semijoined with the graph dataset to return only those edges with a source in the query collection. We might print out the result because we don't have anything better to do with it, and put a probe on the end.

The probe provides us with information about the completeness of data at this point in the dataflow graph. It is what allows us to know that we have seen all of the results for a specific timestamp. If we submit a batch of queries, this will tell us when we have seen all of the results for these queries. The probe only operates at the timestamp granularity, not at the transaction id granularity. If we really want the latter it might be important not to batch transactions within a timestamp (or to think harder).

As all queries are standing queries, the probe also gives us valuable information even when we don't issue new queries: when the graph data change, it is what lets us know that we have seen all the corresponding changes in query results, even if there are none.

If you would rather have a more direct and invasive view of the data, the arrange operator, which join and semijoin and antijoin all use internally, provides a shared read-only handle to a multiversioned index representing the data. You could totally write a timely dataflow operator that looks at that and gives answers back quickly, as appropriate. You would still need to do this on the timely worker threads though, because Rust's Rc shared references cannot be shared between threads, to avoid data races. You could rewrite some things and make the sharing through threadsafe reference counting, but this starts to be something you get to do, rather than me.

Computation

Let's do something like graph reachability too. It happens to be what they did in the Weaver paper (graph traversals). I happen to have a reachability method written already, so all we need is a source of roots for these reachability computations, and away we go!

// issue reachability queries from various roots
let (roots_input, roots) = scope.new_input();
let roots = roots.as_collection();

// do bfs on the graph from the roots, report counts at each distance.
let result = reach(&graph, &roots);
let roots_probe = result.inspect(|x| println!("reachable: {:?}", x))
                        .probe().0;

All we do here is create a new collection which will host root identifiers, then call into the reach routine which reports those nodes reachable from each root.

We get a new probe here, because we like the idea that we can independently report results from this computation and from the queries up above. If the queries come back quickly we can see that through query_probe, even if roots_probe isn't quite ready to confirm that we have seen all of its results yet. This keeps the bfs computation from blocking our ability to look up data, in principle (in practice, the CPU is always doing something, and if it is doing BFS then it isn't probably isn't doing query look-ups).

That's the whole differential dataflow computation I've written. Let's run it!

Some examples

First let's walk through some simple examples to see what it does, without handing it millions of edges just yet.

Putting some edges in

Let's start by putting in some edges using transactions with no read components, so they should always pass. Incidentally, this is why it is easier to look for aborts by read failures rather than read successes: if there are no successes because there were no reads in the first place, it is hard to tell from something with no successes because of lots of failures.

// add some edges with no read dependencies.
edges.send(((0, false, (0, 1)), 1));
edges.send(((0, false, (1, 2)), 1));
edges.send(((1, false, (1, 3)), 1));
edges.send(((2, false, (2, 4)), 1));

let next = edges.epoch() + 1;
edges.advance_to(next);
query.advance_to(next);
roots.advance_to(next);

// do computation for a bit, until we see outputs from each.
while query_probe.lt(&edges.time()) || roots_probe.lt(&edges.time()) {
    computation.step();
}

So, three things go on here. We add the edges { (0,1), (1,2), (1,3), (2,4) } making a happy little tree. We indicate to each of our inputs, edges, query, and roots, that we are done with this batch of inputs, and we can move on to the next. This allows timely dataflow to start reasoning that we might soon be done with this work, and the probes might report that the timestamps at which data might still arrive have advanced as well. As long as either probe indicates we might receive something strictly less than the new time, we keep running steps of the computation.

I actually put an inspect on our graph collection, so we can see what happens:

(Root, 0): edges: [((0, 1), 1), ((2, 4), 1), ((1, 3), 1), ((1, 2), 1)]

This is the first timestamp, (Root, 0), telling us that we have several changes to our graph, as intended.

Issuing some queries

Let's issue some queries, both using query and using roots:

// issue some queries
query.send((2,1));
roots.send((2,1));

let next = edges.epoch() + 1;
edges.advance_to(next);
query.advance_to(next);
roots.advance_to(next);

// do computation for a bit, until we see outputs from each.
while query_probe.lt(&edges.time()) || roots_probe.lt(&edges.time()) {
    computation.step();
}

This is pretty similar, except rather than send any edge data we have issued a query for the edges from node 2, and we ran a reachability query starting from node 2.

query result: ((2, 4), 1)
reachable: ((2, 2), 1)
reachable: ((2, 4), 1)

There is only one edge out of 2, to node 4, which we can see. Also, our reachability query indicates that we can reach both of these nodes from 2, which non-trivially tells us that we can't reach any other nodes, yet.

Importantly, we could have submitted something to query and advanced it (and edges) to the next round, without advancing roots. Timely dataflow can see that the only inputs that lead to query_probe are edges and query, and as long as those two have been advanced and the intervening data processed, it can report completion. Both query and roots can proceed independently, and will produce outputs as long as edges keeps up with them.

Our first transaction

Now let's do a successful transaction to add some edges to the graph. This will be non-trivial, in that we'll put two conditions on the transaction:

// attempt a successful transaction
edges.send(((3, true, (0, 1)), 1));
edges.send(((3, true, (1, 2)), 1));
edges.send(((3, false, (2, 5)), 1));

let next = edges.epoch() + 1;
edges.advance_to(next);
query.advance_to(next);
roots.advance_to(next);

// do computation for a bit, until we see outputs from each.
while query_probe.lt(&edges.time()) || roots_probe.lt(&edges.time()) {
    computation.step();
}

This transaction says: "as long as (0,1) and (1,2) are still in the graph, I'd like to add (2,5)". It succeeds!

(Root, 2): edges: [((2, 5), 1)]
query result: ((2, 5), 1)
reachable: ((2, 5), 1)

Whoa, we see the new edge which is great, but it looks like the query has updated with a new output, and the distances have also updated. Recall that we only see changes in differential dataflow, so now we see a new record that should be output as part of our look-up query for node 2. We also see that there is one more node that can be reached, that being the new node 5.

A failing transaction

How about a transaction that fails? We just make at least one of the read conditions reference absent data.

// attempt a failing transaction
edges.send(((4, true, (0, 1)), 1));
edges.send(((4, true, (0, 2)), 1));
edges.send(((4, false, (5, 0)), 1));

let next = edges.epoch() + 1;
edges.advance_to(next);
query.advance_to(next);
roots.advance_to(next);

// do computation for a bit, until we see outputs from each.
while query_probe.lt(&edges.time()) || roots_probe.lt(&edges.time()) {
    computation.step();
}

This transaction wants to add (5,0), which would be awesome, but it needs both (0,1) and (0,2) to be present in the graph. The first edge is, but the second edge is not, so aborts gets a new transaction id, 4, and the write is not committed. I secretly put an inspect as part of aborts so that we could see:

tid: 4  read failure: (0, 2)

Amazing.

An interesting successful transaction

Let's make the transaction work out, and see some totally great incremental updating:

// attempt a passing transaction with implications
edges.send(((5, true, (0, 1)), 1));
edges.send(((5, true, (1, 2)), 1));
edges.send(((5, false, (4, 0)), 1));

let next = edges.epoch() + 1;
edges.advance_to(next);
query.advance_to(next);
roots.advance_to(next);

// do computation for a bit, until we see outputs from each.
while query_probe.lt(&edges.time()) || roots_probe.lt(&edges.time()) {
    computation.step();
}

This time we actually manage to add an edge, (0,4) rather than (0,5) I guess, and some neat things happen to our results:

(Root, 4): edges: [((4, 0), 1)]
reachable ((2, 0), 1)
reachable ((2, 1), 1)
reachable ((2, 3), 1)

We see the new edge, and we see a lot of new distances from 2. The loop back means that we can now reach all of the nodes in the graph (the tree is now very happy, if a bit misshapen). We also know that there are no new neighbors of 2, because query_probe tells us we have seen everything there is to see.

In fact, it could have told us this much earlier if we wrote the code slightly differently. Consider

// do computation for a bit, until we see outputs from each.
while query_probe.lt(&edges.time()) || roots_probe.lt(&edges.time()) {
    computation.step();
    let query_done = !query_probe.lt(&edges.time());
    let roots_done = !roots_probe.lt(&edges.time());
    println!("status: {:?}", (query_done, roots_done));
}

which outputs something like (lines deduplicated for clarity)

status: (false, false)
(Root, 4): edges: [((4, 0), 1)]
status: (false, false)
status: (true, false)
distance: (2, 1)
distance: (3, 1)
distance: (4, 1)
status: (true, true)

As soon as we see that first transition in the first field we can announce that the query results are correct as stated.

Comments

Not too much to say. This example is meant to be a pretty simple description of the logic to apply transactional updates by first looking at the existing state. The other parts, lookups and computations, are pretty vanilla differential dataflow. The example transactions we saw were simple, but seemed to work! (I fixed all the bugs I found in them).

Conflicting transactions

One thing we haven't explored is: "what happens if you submit conflicting transactions as part of a batch?" Like, maybe the first transaction requires some edge and deletes another, and the second transaction requires the other edge and deletes the first. They will run exactly as specified: they each consult the current graph instance and make their decisions to commit or not independent of the other. You might say "that isn't transactional!" which is totally correct.

Weaver has a new way of doing timestamping to order possibly conflicting transactions and not worry about non-conflicting transactions. It's interesting and complicated, because you have several machines independently receiving these transactions who have to decide how to consistently order them. They take a conservative approach of saying if two transactions touch the same shard of data (based on how it is partitioned) they might conflict, and shouldn't be processed concurrently. This balances accuracy (don't conflict too often!) against performance (don't think too hard).

However, we know what we want to do---apply transactions in order of transaction id---so let's just do that. We are going to compute the set of transactions that would abort if executed sequentially, in parallel, using differential dataflow.

Starting from an empty aborts collection, we will iteratively update the collection by using it, and the supplied transactions, to determine which values each read will observe (non-aborted writes with lower transaction id combined with the initial state of memory), and consequently which transactions must abort. Filtering the writes by the aborts collection is just an antijoin, and we can get all the remaining writes to corresponding reads by joining on the location (the data item). Then we repeat.

This is just a nested iterative computation; it's totally doable in differential dataflow.

aborts.iterate(|abort| {

    // bring in transactions and initial state.
    let trans = trans.enter(&abort.scope());
    let state = state.enter(&abort.scope())
                     .map(|x| (x, 0));

    // reads -> (location, trans_id)
    let reads = trans.filter(|x| x.1 == true)
                     .map(|x| (x.2, x.0));

    // writes -> (location, trans_id), filtered by aborts
    let writes = trans.filter(|x| x.1 == false)
                      .map(|x| (x.0, x.2))
                      .antijoin(&abort)
                      .map(|x| (x.1, x.0));

    // look up the value for each read using only writes with lesser tid.
    let lookup = writes.concat(&state)
                       .join_map(&reads, |key, &tid1, &tid2| (key.clone(), tid1, tid2))
                       .filter(|&(_, tid1, tid2)| tid1 < tid2)
                       .map(|(k, _, tid2)| (k, tid2))
                       .distinct();

    // abort transactions with bad reads
    reads.map(|x| (x,()))
         .antijoin(&lookup)
         .map(|((_,t),())| t)
         .distinct()
})

Ok, what just happened? We computed, from a collection of transactions and initial memory, the ids of those transactions that should abort in a sequential execution by transaction id. Except we did it using only data-parallel operators. Sequential computation, done in parallel. How delightful.

Of course, in the worst case, we may need to perform a number of iterations equal to the number of transactions, but in the more optimistic case where not all so much conflicts, we just need to identify and propagate aborts (from the optimistic "no aborts" starting state).

It is sane to be skeptical, but

  1. Looking a bit closer most of these crazy operators are thing like map and filter and concat; there are two antijoins, two distincts, and one join_map. Only one of these, the join_map, tracks the full volume of data; the other operators just work with the current transactions.

  2. We are totally going to evaluate this.

Evaluating this

So let's see what we can do here, performance-wise. I don't want to get anyone actually excited about performance, because we are skipping lots of important things like durability, or cleaning up after ourselves. On the other hand, we aren't using a cluster, we are just using my laptop. I'm not sure who that favors.

The first non-bitcoin experiment in the Weaver paper is with the livejournal graph, which is really small (256MB). I don't really understand the workload from the paper, but it is a mix of reads and writes, I can't tell if any transactions actually read the data and then write back, or if they are all blind writes. The reads don't do any writing, though.

We aren't going to do a bake-off, so let's just try some similar sorts of queries.

Queries

There are two query workloads the Weaver paper looks at here, one uses 99.8% reads (per the TAO benchmark, apparently) and another that is just 75% reads.

To warm us up, let's look at doing 1,000 batches of 500 reads, batches of 4 reads, and batches of one read. Each read is a random vertex identifier and the output as above is "all of its edges". If we load up the livejournal graph and give it a spin, the numbers are:

batch size batches time rate
500 1,000 0.296096601s 1,688,638.09 reads/sec
4 1,000 0.037974209s 105,334.65 reads/sec
1 1,000 0.036228031s 27,602.94 reads/sec

So, batching is pretty clearly nice. This is just one core, too. The small batches are pretty clearly gated on timely dataflow infrastructure (this can be improved, feature code-name "fine-grained timestamps"), and the large batches get a million-plus random reads per second, which is pretty great I guess. This really is just taking some integers and for each hitting an array to find an offset, and cloning out the edge destinations at that offset.

With two workers, the numbers change:

batch size batches time rate
500 1,000 0.222924780s 2,242,909.02 reads/sec
4 1,000 0.056459825s 70,846.84 reads/sec
1 1,000 0.067131147s 14,896.22 reads/sec

Multiple workers do roughly what you would expect here: help out a bit when there is a fair amount of work, and utterly get in the way when there isn't much work.

It is plausible that if you get to batch 500, 4, or just 1 read together because you have writes coming in between the batches, you might see performance like above.

Transactions

Let's do some actual transactions now. I have no clue what a good workload is here, and I'm a bit lazy. So I just made something up. We are going to send in 1,000 batches of batch transactions, each of which pulls a read requirement and a write location at random from a fixed set of size batch. This should make a non-trivial constant fraction of the transactions risk conflict, and force our craaaaazy iterative transaction resolver to do some work (maybe).

batch size batches time rate
1,000 1,000 6.981093165s 143,244.04 txs/sec
100 1,000 1.768807578s 56,535.26 txs/sec
10 1,000 0.862126996s 11,599.22 txs/sec
1 1,000 0.402891740s 2,482.06 txs/sec

So what do we see here? If we have large-ish batches of transactions to resolve at once, we can get pretty high throughput, even though they are weird and messed up and might all conflict with each other. However, the act of determining which transactions will cause others to abort is just a data-parallel computation, working on relatively modest data (the transactions themselves and any read locations).

Weaver's concern, that explicitly ordering too many transactions will involve too much coordination, doesn't seem to apply here because a data-parallel system like timely doesn't have to coordinate on a per-record basis. I guess we also are just using one computer, but there totally wouldn't be much coordination with more computers either. There would be data exchange in volume, as part of the abort computation, but not tight coordination.

The times above are also quite decent from a latency point of view. Even with a batch size of 1,000 all of the transactions are resolved in an average of under seven milliseconds. The only reason not to batch transactions, given that we will properly order them for you, would be concern about latency, so this is re-assuring.

These numbers will obviously change as the workload changes, and the may change substantially. You can see that the rates are an order of magnitude less than the conflict-free read workload, and the rates could drop orders of magnitudes more if the workload focuses conflicts in an even smaller set.

I'm going to keep poking on evaluating this, as I suspect it will surface some more bugs / issues. To be clear, this approach isn't supposed to be "the right answer", it is just supposed to be interesting.

Memory use

I'm not actually going to evaluate this, because ActivityMonitor.app is not exactly a scientific instrument, but some words are in order. The crazy dataflow we put together uses operators that store traces of the data they've seen. As a computation proceeds, these collections will grow. In the present Rust implementation, they will grow without bound, which is another reason I didn't measure things.

Another concern is that the number of operators that store state is not "one". The collection of data, the livejournal graph, is stored in multiple places in the computation. This is normally a horrible idea, because databases often try to work off of the principle that there is one source of truth somewhere (usually some snapshots plus a log), so that when things go wrong you have a leg to stand on. We are doing something different, letting the input be the one source of truth, and our computation derive from that.

There is redundancy in what we are storing in memory, it could (and should) be reduced, which (i) will improve the memory utilization, but (ii) will not make the design any more satisfying to responsible database practitioners.

Graph traversals

If you thought that the livejournal graph was too small, Weaver's next experiment is on a Twitter graph with 1.76 million edges. You know a graph is small when just writing its number of edges (1,768,149) is the shortest way to summarize its magnitude.

I don't have such a graph; the small Twitter graph I have has 1.34 million edges (1,342,283) and is about 5MB on disk. If you go to the place they got their data they report the graph as having 1.76 million edges, but if you download twitter_combined.txt.gz you get 2.42 million edges. I have no idea what is going on, so I'm just going to use my 1.34 million edge graph, and pretend that it looks like the 1.76 million edge graph. It probably really doesn't so please do not try and make comparative evaluations here (you can check out Figure 11 in the Weaver paper if you want to ignore my plea).

So, one thousand batches of single reachability queries. There is enough work that batching doesn't seem to help, and even slightly harms (due to more memory pressure, probably).

batch size batches workers time rate
1 1,000 1 45.415582655s 22.02 txs/sec
1 1,000 2 28.939512466s 34.55 txs/sec

These are in the same ballpark as the Weaver numbers, though the data are not the same. I also did a cumulative density plot for you, to get a sense for the distribution of response times.

Reachability cdf

There are just a few (two and three of out 500) measurements above 100ms, and adding that extra worker helps to bring the tail in.

These numbers could be as small as they are in part because the vertex identifiers are small and dense, so the reachability join operator can use an array rather than a HashMap. I turned off that optimization, and got the following numbers, worse but not too different:

batch size batches workers time rate
1 1,000 1 50.378856919s 19.85 txs/sec
1 1,000 2 31.574732737s 31.67 txs/sec

The small impact may be because the computation already has a distinct in it which cannot use integer keys (it tracks the distinct set of reachable pairs (node, root)). Improving index performance would have a decent impact on differential dataflow, and there is an order-preserving hash table waiting in the wings for when I get around to it.

Scaling

The Weaver paper does some scaling experiments! I'm not going to because they are (at least in part) on the larger Twitter graph which I can't fit two copies of into memory on me laptoppe. I could do the look-ups in principle, but would have to turn off the transactional subgraph (because it keeps a copy of the data indexed too, but differently).

You damned dirty liar

Let's say just a bit about some things I didn't cover properly. There are probably things I don't know about too, and you can go all Rumsfeld on me about those.

Durability

Durability is a serious question in these settings. It's great to run laptop measurements on data you have on SSD and just load up each time you run, but what if you are getting this data from the web, or clients, or something like that? It needs to go somewhere persistent, because if "your laptop" crashes or gets taken away with 24 hours notice or some crap like that, we don't really have the data anymore do we?

Nope.

So what is the story here? The "story", which is a great description of it, is that with a functionally defined computation, the input itself is what gives us durability, rather than clever implementation of index internals. If something explodes and we need to start over, we can: from the input!

At this point you are just about to fire me, but: each of the operators in a differential dataflow computation work by processing piles of differences for each logical time. They index the differences for the times, and then logically commit them. At this point we know that they are correct: any re-execution of the same computation on the same input will derive these same differences at each of these times. So, we can write them down somewhere, and if we ever need to recover just load them up, no questions asked (other than: any corruption?).

Even better, in responsible implementations of differential dataflow (not yet!) these piles of differences are merged as time proceeds, keeping the number of active piles manageably small. As they are merged, the piles start to correspond to the accumulated differences over a range of times. The exactly defined accumulated differences over a specific set of times.

The functional nature of the computation makes recovery much simpler in many ways. The state at each worker can simply be re-loaded if available, and re-computed if it is not. The re-computation needn't be very complicated, and only needs to happen for missing data. The piles of index data are laid out in memory in a binary-searchable format, and just by mapping them into memory they can be used immediately while we rebuild indices.

This all seems like it is wild speculation and should probably be detailed out, but ideally we agree (maybe?) that this is now a discussion about performance, rather than about correctness.

Cleaning up

Differential dataflow works off of streams of data, but it still (I know!) doesn't do anything like garbage collection. It really should and I've mostly just been too lazy to write and commit benchmarks that show performance sucking without it. An a computation proceeds, the memory footprint should somehow stay bounded, even as the total number of transactions processed increases. Logically there is nothing that prevents this (Naiad did such a thing), but the current code doesn't, and it probably only makes things slower.

Cleaning up is also the same thing that would give us the small number of committable piles of differences mentioned up above. I should really just do it.

Usability

One of the most appealing aspects of databases is that you can use them however you want. A differential dataflow computation needs to be re-compiled if you want to change it, and that is really, really annoying. You can build something general, like we did above with transactions and look-ups, but if you aren't planning on pushing your computation into dataflow, this may be a less appealing solution.

On the other hand, I think there are use cases where it makes sense to push business logic into dataflow from whatever horrible script it currently lives in. It is work to do, and may not always be worth doing, but generally asking people to describe their computation more brings new things to light. In this case, it brings some performance, incremental maintenance, and distribution to light.

Geo-distribution

The most compelling, I think, reason for the 31 flavors of consistency are situations where your database is not on one rack in your machine room, but is geographically distributed, and you need to maintain the appearance of one database while fielding transactions from different continents. You could keep all the data strongly in sync, but there are simply costs to doing this that may suck. If each read out of a database needs to hit another continent, you may take a long time to build up the web page for your fancy social badger sharing site, and people may go elsewhere.

At the same time, some (small?) fraction of the reason we need small latency is that we are planning on doing lots of database queries. Your badger website fetches a list of all your badger-bros who are online, then needs to grab all of the active badgers photos, timeline contributions, crap like that. You could imagine a world where you issue just one request, which goes into the badger-page assembly dataflow, and comes out the other end with all the data you need.

Can you tell that this is a relatively restricted use case?

A proposal for database research

Let me sum up what was initially going to be a fairly flip blog post with some thoughts about one direction database research could take. It isn't the only, or even obviously the best, line of research. But, it would be interesting and different research, which is my favorite kind.

What are the fundamental barriers to making analytic databases closer to functional computations mapped across sequences of input transactions? If we transform these databases to functional data-parallel computations, do we massively increase the throughput, without introducing the complexity of bespoke communication and coordination infrastructure? Why are analytic databases not the poster child for declarative data-parallel implementation?

I get the impression that some amount of database research is moving in this direction, away from row-level locking, towards per-thread ownership of data, with simpler coordination mechanisms. If this question has already been answered, even partly, that would be great to know! Especially if there are negative results, because those are always a fun challenge. :D

A note on Weaver

You should all read the paper, even if I complain about its related work section. HyperDex and related projects are cool, and aimed at sorting out issues in real systems by doing research. At one point I was thinking about using it to provide differential dataflow's fault tolerance layer, using it as the key-value store backing operators.

Also, Gün isn't the primary author on the Weaver paper, Ayush Dubey is, but I don't know Ayush well enough so I made fun of Gün instead. If you think Weaver is cool, props go to Ayush (and Greg and Robert).