Skip to content

Latest commit

 

History

History
552 lines (352 loc) · 35.1 KB

2017-09-05.md

File metadata and controls

552 lines (352 loc) · 35.1 KB

Toward a Big Data Stream Manager

Michael Carey gave a talk at MSR SV many years ago, talking about Hyracks and the stack they built on top of it, and invoked (for the first time, that I had seen) the idea of a "big data management system" (BDMS) in contrast with a "data base management system" (DBMS).

It wasn't much of a step to think that what we needed next is a "big data stream manager", if only because it would get lots of unintended hits based on the abbreviation alone.

Anyhow, I'm going to talk through what I've been doing of late that looks a bit like a BDSM, and you all are going to keep your very clever double-entendres to your collective selves.

Manager vs Processor

We have "stream processors", things like Flink and Kafka and if it's 2am maybe Spark Streaming. Why do we need a "stream manager"?

As it turns out, there are lots of features you would find in a database that stream processors just don't have. There are so many of them, I'm just going to tell you about the ones important to me. Mostly, it is going to be about performance, and ways that the stream processors fall down where databases would not.

Indexing

Databases build and maintain indices of data. This is super helpful if you need random access to the data, perhaps because you want to look up some things, or perhaps because you are doing joins and it would be a pain to rebuild the index each time you want to do a join.

Stream processors also build and maintain indices of data, but they tend to do it independently each time you perform a join. Let's consider an example.

Imagine you have a collection of graph nodes and graph edges. You would like to compute (and perhaps maintain as changes stream along) those nodes that are "friends of friends": they result from joining nodes with edges, picking out reached nodes, and then joining with edges again:

nodes.join(edges)
     .map(|(src, dst)| dst)
     .join(edges)
     .map(|(src, dst)| dst)

Here we've intentionally used join twice.

Correct me if I am wrong, but I'm not aware of many stream processors that do not form and maintain two independent indices for edges. The only stream processor that I know of that can do this with only one index is differential dataflow, about which we will dig into more detail later.

The problem, such as it is, for existing stream processors is that there is little enough management about the streams to even know that the two edges streams are the same, much less that they will be indexed by the same key. The stream processors generally work with the belief that each operator gets nothing from the outside world other than a stream of tuples, and needs to be totally self-sufficient.

This can be great in some regards: the two join implementations can be distributed on different machines, manage their resources independently, and even be backed by different implementations, if you like. At the same time, it can also mean double the computation, communication, and memory requirements. Here "double" should be generalized to however many times the data are used.

Sharing

Databases manage to share their data between any number of users. If you want access to the data to do some exciting data analysis you don't need to first clone the database, you just start issuing queries and the database shares its indexed state (and compute resources, etc) among all of the users.

Let's imagine you want to maintain a "friends of friends" query, like above, and I want to maintain a reachability query, which might look like

nodes.iterate(|inner| {
    inner.join(edges)
         .map(|(src,dst)| dst)
         .concat(nodes)
         .distinct()
})

This query doesn't have much in common with the friends-of-friends query above, except that it also does a join with edges. It would be a total shame if you and I each had to maintain our own independent indices of edges, because this could burn a substantial amount of resources (again, computation, communication, and memory).

Again, as best as I know, stream processors don't do this. Largely because most stream processors don't understand "multiple users" as a thing; they just define and run streaming computations. Even differential dataflow doesn't know how to share indices because it doesn't know about multiple users. However, it does have some functionality that will let us start down this path.

Isolation may be great for stream processors that worry about user code crashing. You wouldn't want some defective query to take down the whole system, and keeping each query independent is a great way to do that. This is part of the reason why SQL is such a restricted language: they want to minimize your ability to issue queries that cause the DB to melt down.

A stream manager has the opportunity to impose a common logical structure on the data, so that multiple users can rely on and reuse the same underlying representation, which only needs to be formed and maintained once.

Consistency

If you are analyzing a stream of data and I am analyzing a stream of data, we might like to be able to integrate our results and produce an output (or sequence of outputs) that correctly correspond to some logic applied to our common inputs.

For example, you and I might like to mash up our results so that people can determine the relation between their number of friends-of-friends and their number of totally reachable people: what fraction of the people you can transitively reach in the graph can be reached in just two hops? Science!

What you need is a notion of "logical time", also called "event time" in similar contexts. If your logical time is only implicit, perhaps where you draw your batch boundaries, there is no reason that the boundaries from independent computations line up. Many stream processors do this well. Not all of them do, though.

A stream manager has the opportunity to impose a common logical time on the streams it manages, and if all users are so constrained, their results become much more easily integrated.

Differential dataflow

You could build lots of big data stream managers, but I'm going to talk about how you might build one on differential dataflow.

Differential dataflow is a computational model in which the base data types are evolving collections. Conceptually, each collection is indexed by some "logical time" (think "integer" for now), and is allowed to vary its contents as the index varies.

For example, we might have a collection edges that evolves thusly:

edges[0]      = { (a,b), (b,c) }
edges[1]      = { (a,b), (b,c), (c,a) }
edges[2..5]   = { (b,c), (c,a) }
edges[6..100] = { (p,q) }
edges[101..]  = { (q,p) }

The computational model is that you can invoke various operators on these collections, and the result is a collection whose value also changes as times advance, and which is at all times equal to your computation applied to the corresponding versions of the input collections.

Let's take friends-of-friends as an example. Imagine we also have a static collection

nodes[0..]    = { a, b }

If we build the query from before,

nodes.join(edges)
     .map(|(src, dst)| dst)
     .join(edges)
     .map(|(src, dst)| dst)

We would get an output collection that varies like so

output[0]     = { c }
output[1]     = { a, c }
output[2..5]  = { a }
output[6..]   = { }

This example could be more interesting if nodes changed a bit, and is most interesting in practice where the input collections are not fully specified ahead of time, but we get to interact with and drive them around.

But, the intended behavior is as if we literally re-executed the query at every logical time. Of course, we don't do that.

Implementation

Under the covers, differential dataflow represents a changing collection as a stream of updates to that collection. All updates have the form

(data, time, diff)

where data is some payload data (like (b, c) above), time is the logical time at which the change happens (like the integers above), and diff is the change we want to incorporate, which is often something like plus or minus one (indicating addition and subtraction, respectively).

The updates are designed so that for each time, we can determine the presence or absence (or multiplicity, generally) of a data by accumulating diffs at times less-or-equal to time.

collection[time][data] = sum { diff | (data, time', diff) && time' <= time }

Sequences of collections and sequences of updates are equivalent, under a few benign mathematical assumptions about time. We will choose to work with updates. All differential dataflow operators take streams of updates as inputs, and produce corresponding streams of updates as outputs.

Operators

When we look at what differential dataflow operators do with their streams of updates, one common pattern emerges: they like to re-index the initially time-ordered stream of updates by data, giving random access to the history of each datum.

When we implement the join operator, we have two input streams where the data types have the form (key, val1) and (key, val2). We need to respond to the arrival of a new ((key, val1), time1, diff1) tuples arriving on the first input (and mutatis mutandis the other input)

The correct thing to produce as output is, for all pairs ((key, val2), time2, diff2) in the other input stream, the tuples

((key, val1, val2), max(time1, time2), diff1 * diff2)

You don't need to understand why this is true right now (bilinearity), we just need to understand what the operator would need at hand to be able to produce these outputs efficiently.

As it turns out, what the operator needs is an indexed form of the history of each stream of update, where the index is on key. If you call join on a stream of updates, it will build this index for you. But, you can also build the index separately, and then re-use it.

Indexing

Differential dataflow has an indexing operator, called arrange rather than index for reasons that currently escape me. It produces an Arrangement, which is roughly a pair of two things.

  1. A stream of shared immutable "batches" of updates; these are some collection of updates that were all determined to be "finalized" at the same physical (not logical) time, and so get indexed as a group. Each batch is roughly equivalent to sorting the updates by data then time. They are "shared" in the sense that there is only one allocation, and what we actually hand out are reference-counting smart pointers to these allocations.

  2. A shared reference to the append-only list of all batches emitted so far, as far as you know. The "as far as you know" gives us permission to compact the batches as time advances, if we are sure that you wouldn't be able to tell the difference. This ends up being very similar to a Log Structured Merge Tree, with a bit of specialization baked in.

The stream of shared batches allow the system to explicitly notify operators about what has changed, which fits better with the timely dataflow model than having operators continually rescan for changes and spontaneously decide to emit things. The shared reference to the list of batches provides interactive access as if the operator had a private local copy of the index, plus some amount of overhead due to the sharing.

The arrange operator produces and maintains the stream and the list, and all other operators just peek at the immutable batches, sharing the effort put in to producing and maintaining them.

Sharding

For all of this work, differential dataflow needs to adopt a fairly different execution model than most stream processors. For two join operators to be able to re-use the same in-memory state, they need to be sharded equivalently.

That is, if I have distributed edges across ten workers, then the join operators need to be distributed across those same ten workers, using the same partitioning function that I used for arrange. This means that each worker is responsible for shards of multiple independent operators, which has always been the case in timely dataflow and has worked out pretty well so far. It is relatively uncommon for other stream processors, but it how sharded databases work (you do not clone and ship indices around, you move the computation to the index).

The down-side to this distribution of work is that we substantially complicate isolation. If two joins want to use the same in-memory state, they probably want to be on the same machine. If they are on the same machine and one of them detonates and consumes all available memory, well we have a problem for both joins. One could engineer around this for sure, but for the moment we do nothing of the sort. Differential dataflow is (currently) meant to be more like a programming language than a database platform, and if half of your program explodes then your whole program is at fault.

A Big Data Stream Manager

So with this background in hand, let's talk about an example BDSM.

This code is all available in the differential dataflow repository, but it is a work in progress. In particular, I'm making up a lot of stuff as I go and you may be greatly depressed to learn how ad-hoc it is.

A Server

I've written a little timely dataflow program in bin/server.rs. This program's role in life is roughly to take pairs

(library_path, symbol_name) : (String, String)

and load the library and look up the symbol, .. and then call it!

In a bit more detail, there is a type

pub type Environment<'a, 'b> = (
    &'a mut Child<'b, Root<Allocator>,usize>,
    &'a mut TraceHandler,
    &'a mut ProbeHandle<RootTime>,
    &'a [String]
);

Containing a bunch of helpful information, and the method the symbol resolves to gets called with an instance of this type. I suspect few of these fields make sense, but they are:

  1. A handle to a newly minted dataflow context. The callee can immediately use this to start creating their own new timely (and differential) dataflow computations.

  2. A handle to what is essentially a HashMap<String, Box<Any>>, containing common resources whose types need to be tested at runtime. This is where we are going to store handles to our arranged data.

  3. A handle to a probe, which allows the callee to tie part of its dataflow into the progress tracking that the managing worker will use. This allows constructed dataflow to "hold up" progress in the larger execution, if appropriate.

  4. Any arguments we want to pass to the method.

These methods have great latitude in terms of what they do. They could just screw around with the hashmap of resources, or they could build dataflow graphs that compute cool things and stick handles to the results back in the hashmap.

Let's check out a few examples!

A Shared Library: Degrees

All of our computations from this point forward are written as Rust shared libraries. They can be loaded up by any number of workers, any number of times, but can be compiled independently without restarting the server.

We are going to start with a degree distribution computation, where we want to count the number of nodes that have each out-degree. The output should be a collection whose data have the form (degree, count). This example does pre-suppose we have an indexed graph lying around, and we will get to how to do that, but this is a less intimidating amount of code to start with.

The degree distribution computation is just a shared library. In fact it is its own crate.

It has a fairly simply Cargo.toml, describing the resources it needs:

[package]
name = "degr_dist"
version = "0.1.0"

[dependencies]
differential-dataflow = { path = "../../../" }
dd_server = { path = "../../" }

[lib]
crate-type = ["dylib"]

And then it has a lib.rs presenting a method that can be called from the outside world (note #[no_mangle] to prevent name mangling).

extern crate differential_dataflow;
extern crate dd_server;

use differential_dataflow::operators::CountTotal;
use dd_server::{Environment, TraceHandle};

#[no_mangle]
pub fn build((dataflow, handles, probe, args): Environment) -> Result<(), String> {

    if args.len() != 1 { return Err(format!("expected one argument, instead: {:?}", args)); }

    handles
        .get_mut::<TraceHandle>(&args[0])?
        .import(dataflow)
        .as_collection(|k,v| (k.item.clone(), v.clone()))
        .map(|(src, _dst)| src as usize)
        .count_total_u()
        .map(|(_src, cnt)| cnt as usize)
        .count_total_u()
        .inspect(|x| println!("count: {:?}", x))
        .probe_with(probe);

    Ok(())
}

There isn't a lot here. We bring in the CountTotal operator from differential dataflow, and the Environment type from up above, and whatever TraceHandle is (it is the type we need to name to get arranged graph data).

We ask for the trace handle named by args[0], import it, and start working with it as differential dataflow. We attach probe to the end of our dataflow to make sure we aren't allowed to fall behind, but otherwise just spam the console with lots of "count: " lines.

It's a pretty simply project, and we can just build it with cargo run --release; a short 58.19 seconds later, and we are ready to go!

That's a lot longer that I would want to wait too. Let's see how Rust compile times develop before uninstalling python.

A Shared Library: Random Graphs

The example up above assumed that we had already installed a graph (with name args[0]). Let's see how we can do that now.

There is example code in dataflows/random_graph, which clocks in at a terrifying 146 lines of code. Most of this is related to graph generation rather than arrangement, so I'm going to start with a simplified version that assumes you've got a handle to a source of edges. For example, maybe you are trying out the new Kafka adapter, or anything else implementing timely's EventIterator trait.

The Cargo.toml file is pretty similar to that for degree distribution up above, with some additional dependencies for access to raw timely methods.

The lib.rs is a bit more complicated, to the point that I've snipped out the code that actually determines which edge changes to produce (you can check out the repo for the full details). Instead, let's just look at the skeleton of the method:

extern crate timely;
extern crate differential_dataflow;
extern crate dd_server;

use std::rc::Rc;
use std::cell::RefCell;
use timely::dataflow::operators::Probe;
use timely::dataflow::operators::generic::operator::source;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::arrange::ArrangeByKey;
use dd_server::Environment;

#[no_mangle]
pub fn build((dataflow, handles, probe, args): Environment) -> Result<(), String> {

    if args.len() != 4 { return Err(format!("expected four arguments, instead: {:?}", args)); }

    let name = &args[0];
    let nodes: usize = args[1].parse().map_err(|_| format!("parse error, nodes: {:?}", args[1]))?;
    let edges: usize = args[2].parse().map_err(|_| format!("parse error, edges: {:?}", args[2]))?;
    let rate: usize = args[3].parse().map_err(|_| format!("parse error, rate: {:?}", args[3]))?;

    let requests_per_sec = rate;
    let ns_per_request = 1000000000 / requests_per_sec;

    // shared capability keeps graph generation going.
    let capability = Rc::new(RefCell::new(None));

    // create a trace from a source of random graph edges.
    let trace =
        source(dataflow, "RandomGraph", |cap| {

            // produce stream of edge changes here

        })
        .probe_with(probe)
        .as_collection()
        .arrange_by_key_u()
        .trace;

    handles.set(name.to_owned(), trace);
    handles.set(format!("{}-capability", name), capability);

    Ok(())
}

Other than the elided body of source, the method doesn't do too much of anything outrageous. We extract a number of nodes and edges from the arguments, as well as an intended rate of edge changes (in changes per second).

We start up a source of data, though this could just as easily be replay_into using any of timely's replay mechanisms (e.g. from Kafka, from a file, from a TCP socket). We probe the result, convert it to a differential dataflow collection (essentially asserting that the record types are correct), and then arrange it.

The resulting trace is stashed in handles using the supplied name. We also stash something at the name plus "-capability", which is essentially a cancellation token; if we drop it, the random graph stops producing changes and allows all downstream computation to complete.

A Shared Library: Friends of Friends

I actually did friends of friends of friends, because I was feeling ambitious (and copy/paste is pretty easy)

extern crate differential_dataflow;
extern crate dd_server;

use differential_dataflow::input::Input;
use differential_dataflow::operators::JoinCore;
use differential_dataflow::operators::Consolidate;
use differential_dataflow::operators::arrange::ArrangeByKey;

use dd_server::{Environment, TraceHandle};

#[no_mangle]
pub fn build((dataflow, handles, probe, args): Environment) -> Result<(), String> {

    if args.len() != 2 { return Err(format!("expected two arguments; instead: {:?}", args)); }

    let edges = handles.get_mut::<TraceHandle>(&args[0])?.import(dataflow);

    let source = args[1].parse::<usize>().map_err(|_| format!("parse error, source: {:?}", args[1]))?;
    let (_input, query) = dataflow.new_collection_from(Some(source));

    let timer = ::std::time::Instant::now();

    query
        .map(|x| (x, x))
        .arrange_by_key_u().join_core(&edges, |_n, &q, &d| Some((d, q)))
        .arrange_by_key_u().join_core(&edges, |_n, &q, &d| Some((d, q)))
        .arrange_by_key_u().join_core(&edges, |_n, &q, &d| Some((d, q)))
        .map(|x| x.1)
        .consolidate()
        .inspect(|x| println!("{:?}:\t{:?}", timer.elapsed(), x))
        .probe_with(probe);

    Ok(())
}

This builds a new dataflow using edges extracted from args[0], but also takes a second args[1] that it uses as the source of the friends of friends query. It reports the number of length three paths from args[1] to each other node (not reporting those that cannot be reached).

While we interpreted args[1] as the name of a node in the graph, we could just as easily interpreted it as a collection to find in handles. This would allow us to install one friends of friends of friends computation, and just tweak the contents of nodes rather than re-loading and re-building dataflows for each query.

In the interest of science, I've also put a timer in there so that we can see how long it takes to come back with an answer!

A Shared Library: Reachability

This is pretty similar to the above, but with slightly different code

extern crate differential_dataflow;
extern crate dd_server;

use differential_dataflow::input::Input;
use differential_dataflow::operators::{Iterate, JoinCore, Distinct};
use differential_dataflow::operators::arrange::ArrangeBySelf;

use dd_server::{Environment, TraceHandle};

#[no_mangle]
pub fn build((dataflow, handles, probe, args): Environment) -> Result<(), String> {

    if args.len() != 2 { return Err(format!("expected two arguments; instead: {:?}", args)); }

    let edges = handles.get_mut::<TraceHandle>(&args[0])?.import(dataflow);

    let source = args[1].parse::<usize>().map_err(|_| format!("parse error, source: {:?}", args[1]))?;
    let (_input, roots) = dataflow.new_collection_from(Some(source));

    // repeatedly update minimal distances each node can be reached from each root
    roots.iterate(|dists| {
        let edges = edges.enter(&dists.scope());
        let roots = roots.enter(&dists.scope());
        dists.arrange_by_self_u()
             .join_core(&edges, |_src, _, &dst| Some(dst))
             .concat(&roots)
             .distinct_u()
    })
    .probe_with(probe);

    Ok(())
}

Lots of Shared Libraries

You probably get the point by now. You can write all sorts of analyses using not too much boiler plate. Having written all of those analyses, or whatever you wrote, you can then fire up the server and run them. Let's watch what happens!

Example executions

Let's try a few simple computations, like starting up some random graph generation, and then attaching a friends of friends of friends computation.

The first thing we do is start up the server.

Echidnatron% cargo run --release --bin server
    Finished release [optimized] target(s) in 0.0 secs
     Running `target/release/server`

This just sits there and does nothing (actually, it spins really fast doing lots and lots of nothing).

The next thing to do (in another shell) is to head in to each of the dataflow/ directories and do cargo build --release. This builds the libraries in release mode and leaves the artifacts where we can find them. Although we could have done this first, it is worth making the point that you don't have to have these built or even written before the server starts, and it won't need to be restart just to load a new dataflow.

Let's head back to that shell where we started up the server. Our next step is to load up the random graph building library we wrote (and saw a fragment of up above):

load ./dataflows/random_graph/target/release/librandom_graph.dylib build <graph_name> 1000 2000 1000

This doesn't seem to do a lot other than print out the diagnostic

worker 0: received command: ["load", "./dataflows/random_graph/target/release/librandom_graph.dylib", "build", "<graph_name>", "1000", "2000", "1000"]

When we type in the console we aren't actually interacting with the workers, we are interacting with the main thread which will ensure our commands get broadcast to all workers, and a common total order on them is established.

In this case, we've asked the workers to load up the library in question, which happens to be what we built with our random_graph shared library crate, and gives it some arguments:

"<graph_name>", "1000", "2000", "1000"

This asks for a random graph on 1,000 nodes, with 2,000 edges, with 1,000 edge changes per second. Each millisecond the operator will produce a pair of updates, one plus and one minus, changing an existing edge to a random new edge. The resulting arrangement is stashed under the name <graph-name>, and a cancellation token is stashed under the name <graph-name>-capability.

Nothing seems to happen yet, but behind the scenes we are producing some small volume of edge changes, one per millisecond. So we type in our next command:

load ./dataflows/neighborhood/target/release/libneighborhood.dylib build <graph_name> 0

This asks us to load up our friends of friends of friends computation and fire it off with source node 0.

This happens, and we quickly get a spray of output:

worker 0: received command: ["load", "./dataflows/neighborhood/target/release/libneighborhood.dylib", "build", "<graph_name>", "0"]
Duration { secs: 0, nanos: 22666681 }:  (0, (Root, 130000000), 7)
Duration { secs: 0, nanos: 22719408 }:  (0, (Root, 529000000), 1)
Duration { secs: 0, nanos: 22723223 }:  (0, (Root, 538000000), 1)
Duration { secs: 0, nanos: 22725800 }:  (0, (Root, 598000000), 2)
Duration { secs: 0, nanos: 22728269 }:  (0, (Root, 755000000), 1)
Duration { secs: 0, nanos: 22730922 }:  (0, (Root, 858000000), 2)
Duration { secs: 0, nanos: 22733362 }:  (0, (Root, 931000000), 2)
Duration { secs: 0, nanos: 22735807 }:  (0, (Root, 1059000000), 1)
Duration { secs: 0, nanos: 22738682 }:  (0, (Root, 1077000000), 9)
Duration { secs: 0, nanos: 22741788 }:  (0, (Root, 1269000000), 2)
Duration { secs: 0, nanos: 22744502 }:  (0, (Root, 1282000000), 2)
Duration { secs: 0, nanos: 55988144 }:  (0, (Root, 1319000000), 14)
Duration { secs: 0, nanos: 129050359 }: (0, (Root, 1393000000), 2)
Duration { secs: 0, nanos: 185061427 }: (0, (Root, 1449000000), 2)
Duration { secs: 0, nanos: 242048820 }: (0, (Root, 1506000000), 2)

What we are seeing here, after the acknowledgement of the command, is the neighborhood computation catching up with the pre-history of the random graph.

When we see a record like

(0, (Root, 130000000), 7)

what we are seeing is (data, time, diff) describing a change to the number of length three paths from source node 0. In this case, the change is 7 and happens at 130000000.

It seems like the first change happened at logical time 130000000, which is 130 milliseconds from the start of the random graph generation. If I've done the implementations correctly, this means that 0 had no length three paths from itself initially, but the 130th change managed to pick up seven of them (for example, it could have added the first out-edge from node 0).

Watching the stream above, we see several other changes happen almost concurrently, all at around 22 milliseconds from the start of the neighborhood call, essentially catching up with the history of the graph. Although there are 1,000 changes a second, not all of them are going to intersect the three-hop neighborhood of node 0.

Once past the 22 millisecond mark, the changes are now being tracked in real-time. The difference between logical time as reported in the stream, and physical time as reported by the timer, stays relatively fixed at around 1.2 seconds. This is the amount of time it took me to copy / paste the command to start up the neighborhood computation, after entering the random graph generation command.

We will have to wait for better evaluation about the actual latency.

The computation continues, until there is a slight disruption as I copy / paste something into the console

Duration { secs: 4, nanos: 564239485 }: (0, (Root, 5827000000), 4)
drop <graph_name>-capabilityDuration { secs: 4, nanos: 570063963 }: (0, (Root, 5834000000), 4)
Duration { secs: 4, nanos: 643910427 }: (0, (Root, 5885000000), 210)
Duration { secs: 4, nanos: 644019361 }: (0, (Root, 5890000000), 4)

which once I manage to hit return turns into

Duration { secs: 5, nanos: 61055551 }:  (0, (Root, 6325000000), 4)
Duration { secs: 5, nanos: 65042302 }:  (0, (Root, 6329000000), 2)
Duration { secs: 5, nanos: 77042945 }:  (0, (Root, 6341000000), 2)

worker 0: received command: ["drop", "<graph_name>-capability"]

This shuts down the random graph generator, which in turn stops feeding the neighborhood computation which should also shut down. Of course, the server doesn't shut down because we might want to load up some more libraries and play with it.

^C
Echidnatron%

It seems we didn't.


Let's do another example with a slightly beefier graph. This time, 10 million nodes and 100 million edges, with a relatively small rate of churn (1,000 changes per second).

Echidnatron% cargo run --release --bin server
    Finished release [optimized] target(s) in 0.0 secs
     Running `target/release/server`
load ./dataflows/random_graph/target/release/librandom_graph.dylib build <graph_name> 10000000 100000000 1000
worker 0: received command: ["load", "./dataflows/random_graph/target/release/librandom_graph.dylib", "build", "<graph_name>", "10000000", "100000000", "1000"]
load ./dataflows/neighborhood/target/release/libneighborhood.dylib build <graph_name> 0
worker 0: received command: ["load", "./dataflows/neighborhood/target/release/libneighborhood.dylib", "build", "<graph_name>", "0"]
Duration { secs:  0, nanos: 16731517 }:     (0, (Root, 0), 930)
Duration { secs:  0, nanos: 16784736 }:     (0, (Root, 4776000000), 1)
Duration { secs:  0, nanos: 16788738 }:     (0, (Root, 33324000000), 1)
Duration { secs:  0, nanos: 16791910 }:     (0, (Root, 37600000000), 1)
Duration { secs:  2, nanos: 940322290 }:    (0, (Root, 44587000000), 1)
Duration { secs:  5, nanos: 826324593 }:    (0, (Root, 47473000000), 1)
Duration { secs: 30, nanos: 881317035 }:    (0, (Root, 72528000000), 1)
drop <graph_name>-capability
worker 0: received command: ["drop", "<graph_name>-capability"]
^C
Echidnatron%

We see as before that it is pretty fast to get out the pre-history of the neighborhood computation, because all of that data is immediately available in indexed form. It takes about 16 milliseconds from dataflow definition to seeing logically timestamped results start to flow out.

Things slow down then, because there just isn't a lot of change. In a random graph with expected degree ten (what we have), the three-hop neighborhood involves something on the order of 1,000 edges (10^3). If each second we only change 1,000 out of 100 million, we just don't often intersect the three hop neighborhood. So the count doesn't change that often.

Ok, less obvious here is that a graph of this size, 100 million edges, takes a few gigabytes to represent. It looks like a stable 3.25GB before starting the neighborhood computation. It climbs up to about 3.39GB once I start the computation, despite using edges three times.

Conclusions

There is obviously a lot to do, including explaining things better, measuring things better, and very likely implementing things better to account for what I learn from measuring the things. Don't take this as "concluding comments" so much as "what to do next".

Functional Computation

Differential dataflow is essentially functional reactive programming, and the "functional" part of that makes it somewhat confusing to know how you stop doing computations. The best I can figure, when you import a trace you should probably be able to pinch the stream of updates and act like they've stopped; your functional computation should then wind down. I have no idea if this is the right way to do things.

Console Output

I have a fairly fancy CS education, but I'm not exactly sure how to let someone type input and see output without having the two crap all over each other. Perhaps one should not println! in these programs, and instead persist output streams (e.g. timely's capture operators) and do analysis post-hoc. Perhaps this is where we ship data to Kafka and someone's dashboard app starts lighting up?

Kafka

Kafka seems really slow to me. I'm sure I'm driving it badly, but I'm reading data out in 8KB chunks at about 6MB/s. It probably gets better, perhaps once I write a check to Confluent.

Performance

There are a bunch of fun performance gotchas hidden in here. The random graph producer is open-loop, and just produces lots and lots of data, even if you fall behind. The batch merging that differential does as part of its internal LSM isn't amortized, so you can just get stuck behind a 6s merge operation (this can and will be fixed, now that it is so easy to see).