Skip to content

Latest commit

 

History

History
345 lines (245 loc) · 19.7 KB

2017-05-06.md

File metadata and controls

345 lines (245 loc) · 19.7 KB

Specializing differential dataflow

Differential dataflow can do lots of weird and wonderful things, and the code that makes this happen can be at times quite complex. In fact, it can be much more complex than is required for simpler operations in relatively common cases. Today we'll look at specializing the implementation of the count operator to collections that change in a fairly common (and simple) way: totally ordered time.

Counting things

The count operator is the one in differential dataflow that takes an input collection of keys and produces the output collection of (key, sum) pairs, representing the number of times key appears in the input. As the input collection changes, we want the collection of (key, sum) pairs to change too: as counts go up and down the sum should change, and if a key vanishes entirely then its summary should also vanish (just like all the other absent keys we don't summarize, because we've never seen them).

The count operator is a bit more general than this: the things it sums up can be more general than integers, they can be any Abelian group. Mostly, this is tuples of things you want to add up, so that when someone asks you to do

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= date '1998-12-01' - interval ':1' day (3)
group by
    l_returnflag,
    l_linestatus

you can get the answer with a count, rather than running multiple counts and then joining the results together.

Another way in which count is general that you might not have expected is that it works correctly even when the underlying collections change through partially ordered versions. We are used to thinking of things changing sequentially, like going from version one, to version two, to version three and beyond. Like all differential dataflow operators, count works correctly even when the underlying version history (timestamps) are partially ordered, and there may not be a single, well-defined "previous" collection to change from.

All of this makes count a bit more complicated than you would expect.

However, a great many workloads, and especially the sorts of workloads people use to evaluate things, do not have weird and complicated partial orders, they just change some inputs and want to see the change in counts. The query up above, for example, is the first of the TPC-H queries, and it wants you to add things up without a nested iterative context or version-controlled base relation.

So let's look at how simple count can be if we don't have to do the hard work. Let's specialize the implementation to totally ordered times.

Specialization

The code for count_total, my name for this operator, is actually pretty simple. It does build on some existing differential dataflow parts, like the arrangements we saw recently that maintain and compact indexed state for you, but I think that if you accept that functionality, the remaining implementation isn't so complicated.

Let's look at the user-facing trait first:

/// Extension trait for the `count` differential dataflow method.
pub trait CountTotal<G: Scope, K: Data, R: Diff> where G::Timestamp: Lattice+Ord {
    /// Counts the number of occurrences of each element.
    fn count_total(&self) -> Collection<G, (K, R), isize>;
    /// Counts the number of occurrences of each element.
    /// 
    /// This method is a specialization for when the key is an unsigned integer fit for distributing the data.
    fn count_total_u(&self) -> Collection<G, (K, R), isize> where K: Unsigned+Copy;
}

This is the same signature and documentation from the operators::group::Count trait. What about the implementation?

impl<G: Scope, K: Data+Default+Hashable, R: Diff> CountTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: Lattice+Ord+::std::fmt::Debug {
    fn count_total(&self) -> Collection<G, (K, R), isize> {
        self.arrange_by_self()
            .count_total_core()
            .map(|(k,c)| (k.item, c))
    }
    fn count_total_u(&self) -> Collection<G, (K, R), isize> where K: Unsigned+Copy {
        self.map(|k| (UnsignedWrapper::from(k), ()))
            .arrange(DefaultKeyTrace::new())
            .count_total_core()
            .map(|(k,c)| (k.item, c))
    }
}

I see. We are using a count_total_core method to provide both implementations. The only thing they differ on is how they arrange the data. Careful readers may notice a difference from the implementations of operators::group::Count here, in that the output of count_total_core seems to have a different type than its more general counterpart, which is true and leads us to its definition:

/// Extension trait for the `group_arranged` differential dataflow method.
pub trait CountTotalCore<G: Scope, K: Data, R: Diff> where G::Timestamp: Lattice+Ord {
    /// Counts the occurrence of each key.
    fn count_total_core(&self) -> Collection<G, (K, R), isize>;
}

The count_total_core method returns a Collection of (K, R) pairs, where K is the type of the key and R is the type of the accumulable quantity (isize, most often). This is different from what its Group analogue produces, but if you don't know about that then no worries; we'll cover it later.

How about the implementation of this trait? I'm going to fire the whole thing at you; it is 53 lines. Then we will break it down.

impl<G: Scope, K: Data, T1, R: Diff> CountTotalCore<G, K, R> for Arranged<G, K, (), R, T1>
where 
    G::Timestamp: Lattice+Ord,
    T1: TraceReader<K, (), G::Timestamp, R>+Clone+'static,
    T1::Batch: BatchReader<K, (), G::Timestamp, R> {

    fn count_total_core(&self) -> Collection<G, (K, R), isize> {

        let mut trace = self.trace.clone();

        self.stream.unary_stream(Pipeline, "CountTotal", move |input, output| {

            input.for_each(|capability, batches| {

                let mut session = output.session(&capability);
                for batch in batches.drain(..).map(|x| x.item) {

                    let mut batch_cursor = batch.cursor();
                    let mut trace_cursor = trace.cursor_through(batch.lower()).unwrap();

                    while batch_cursor.key_valid() {

                        let key: K = batch_cursor.key().clone();
                        let mut count = R::zero();

                        trace_cursor.seek_key(batch_cursor.key());
                        if trace_cursor.key_valid() && trace_cursor.key() == batch_cursor.key() {
                            trace_cursor.map_times(|_, diff| count = count + diff);
                        }

                        batch_cursor.map_times(|time, diff| {

                            if !count.is_zero() {
                                session.give(((key.clone(), count), time.clone(), -1));
                            }
                            count = count + diff;
                            if !count.is_zero() {
                                session.give(((key.clone(), count), time.clone(), 1));
                            }

                        });

                        batch_cursor.step_key();
                    }

                    // tidy up the shared input trace.
                    trace.advance_by(batch.upper());
                    trace.distinguish_since(batch.upper());
                }
            });
        })
        .as_collection()
    }
}

Now let's explain each of the things that are going on.

The declarations

impl<G: Scope, K: Data, T1, R: Diff> CountTotalCore<G, K, R> for Arranged<G, K, (), R, T1>
where 
    G::Timestamp: Lattice+Ord,
    T1: TraceReader<K, (), G::Timestamp, R>+Clone+'static,
    T1::Batch: BatchReader<K, (), G::Timestamp, R> {

    fn count_total_core(&self) -> Collection<G, (K, R), isize> {

        // method definition ... [up next]

    }
}

All we are doing here is explaining for which types we plan on implementing the trait. In this case, we are implementing it for Arranged types of a certain form: mainly, no values, just keys. We need the Arranged's trace type to implement the TraceReader trait, which is what lets us read out of it using cursors. This is all pretty standard, if non-obvious, meaning that you would probably copy/paste it from somewhere else. I sure did.

Introducing an operator

The next thing we need to specify is what actually happens when you call this method. We are starting from an arrangement and want to produce a collection, so the main thing we are going to do is introduce a new timely dataflow operator. Before we do, we grab a copy of the shared trace from the arrangement (here self, because we are implementing the trait for the arrangement), so that we can use it in the new operator.

    let mut trace = self.trace.clone();

    self.stream.unary_stream(Pipeline, "CountTotal", move |input, output| {

        input.for_each(|capability, batches| {

            let mut session = output.session(&capability);
            for batch in batches.drain(..).map(|x| x.item) {

                // iterate over keys in batch ... [up next]

            }
        });
    })
    .as_collection()

The structure is apparently pretty simple. Our operator is going to take a stream of batches of updates, and do something for each of the batches. We could have had lots of other things to worry about, but it seems like in this case it will be pretty simple. Loop through each of the batches, do some sort of work, probably send some stuff, and then call it a day.

Iterating over the keys

Our next piece of work is to swing over the keys that changed, which are presented by the batch we've pulled off of the input. We also grab a cursor to the trace, because we'll want to be able to see what things were like before these new changes, because we are changing from them. When we grab the cursor, we indicate that we only want to see changes up to but not including those changes in batch, indicated by using the batch's lower frontier.

        let mut batch_cursor = batch.cursor();
        let mut trace_cursor = trace.cursor_through(batch.lower()).unwrap();

        while batch_cursor.key_valid() {

            // process each key ... [up next]

            batch_cursor.step_key();
        }

        // tidy up the shared input trace.
        trace.advance_by(batch.upper());
        trace.distinguish_since(batch.upper());

At the end of this block, we also do some interesting things with trace. Or, mysterious things, at least. What we are doing is telling trace that is it now ok (from our point of view) to move things up to the upper frontier of the batch. We won't need to distinguish between times earlier than the those in the batch any more, and so if the trace can compact itself, that is great with us!

Updating each key

Here is the heart of the logic, the part that makes count different from filter or distinct.

For each key, we want to get the historical count out, and then for each new change in the batch we want to report the output changes: removing the old value and introducing the new value, ignoring zeros.

            let key: K = batch_cursor.key().clone();
            let mut count = R::zero();

            trace_cursor.seek_key(batch_cursor.key());
            if trace_cursor.key_valid() && trace_cursor.key() == batch_cursor.key() {
                trace_cursor.map_times(|_, diff| count = count + diff);
            }

            batch_cursor.map_times(|time, diff| {

                if !count.is_zero() {
                    session.give(((key.clone(), count), time.clone(), -1));
                }
                count = count + diff;
                if !count.is_zero() {
                    session.give(((key.clone(), count), time.clone(), 1));
                }

            });

Hey we're done.

Evaluating count_total

The count_total method is a lot simpler than its counterpart. Lollllll.

It should run faster. Let's see.

Degree counting

The benchmark I use for count and count_u is examples/degrees.rs which takes a randomly changing random graph as input, and first counts the number of edges from each node giving their out-degrees, and then counts the number of times each out-degree occurs giving the out-degree distribution. It looks like this:

edges.map(|(src, _dst)| src)
     .count_u()
     .map(|(_src, cnt)| cnt)
     .count_u()
     .probe()

If I run this version of the code with 10,000 nodes and 50,000 edges, doing batches of 100,000 updates, we see numbers like:

     Running `target/release/examples/degrees 10000 50000 100000`
Loading finished after Duration { secs: 0, nanos: 12911929 }
worker 0, round 100000 finished after Duration { secs: 0, nanos: 306719501 }
worker 0, round 200000 finished after Duration { secs: 0, nanos: 308062370 }
worker 0, round 300000 finished after Duration { secs: 0, nanos: 310309059 }
worker 0, round 400000 finished after Duration { secs: 0, nanos: 323381687 }
worker 0, round 500000 finished after Duration { secs: 0, nanos: 320012727 }

This averages out to around three microseconds per update, which isn't horrible but .. isn't amazing or anything. If we crank the graph up by three orders of magnitude, the times increase a bit:

     Running `target/release/examples/degrees 10000000 50000000 100000`
Loading finished after Duration { secs: 14, nanos: 112288417 }
worker 0, round 100000 finished after Duration { secs: 0, nanos: 449198774 }
worker 0, round 200000 finished after Duration { secs: 0, nanos: 466185732 }
worker 0, round 300000 finished after Duration { secs: 0, nanos: 422684933 }
worker 0, round 400000 finished after Duration { secs: 0, nanos: 569630725 }
worker 0, round 500000 finished after Duration { secs: 0, nanos: 434054831 }

So now let's change this to use count_total_u, which we do by changing count_u to count_total_u. The numbers for the smaller graph configuration changes to be:

     Running `target/release/examples/degrees 10000 50000 100000`
Loading finished after Duration { secs: 0, nanos: 8223517 }
worker 0, round 100000 finished after Duration { secs: 0, nanos: 121006437 }
worker 0, round 200000 finished after Duration { secs: 0, nanos: 116247450 }
worker 0, round 300000 finished after Duration { secs: 0, nanos: 112117586 }
worker 0, round 400000 finished after Duration { secs: 0, nanos: 110726945 }
worker 0, round 500000 finished after Duration { secs: 0, nanos: 113185037 }

There is a reasonable improvement in start-up time (grinding through the first 50,000 edges), but we see a more pronounced improvement once we get running. We finish off batches much faster now. The same trend holds for the larger graph:

     Running `target/release/examples/degrees 10000000 50000000 100000`
Loading finished after Duration { secs: 10, nanos: 522300086 }
worker 0, round 100000 finished after Duration { secs: 0, nanos: 148743418 }
worker 0, round 200000 finished after Duration { secs: 0, nanos: 159994362 }
worker 0, round 300000 finished after Duration { secs: 0, nanos: 155572030 }
worker 0, round 400000 finished after Duration { secs: 0, nanos: 200299938 }
worker 0, round 500000 finished after Duration { secs: 0, nanos: 146119459 }

Here again we see a ~2.5x improvement once running, and a 1.5x reduction in start-up time.

In fact, if we crack open the profiler at this point, almost all of the remaining time is spent arranging the inputs: we choose to order input updates before we process them (so that we scan through keys once, rather than performing random accesses). We do have two count operators we order the input for, and the 100,000 element batches turn into about 520,000 updates throughout the computation: each input "change" is two updates, and each of these updates lead to ~1.6 updates (sometimes the edges change but the degrees don't).

That's basically all we are doing now. Maintaining indexed representations that absorb about half a million updates each sixth of a second, as part of 100,000 sequential updates. This leaves us in a great position to evaluate whether this is a good rate, or what we are leaving on the table. Three million updates per second is not brilliant compared with single-threaded hash maps, but when you have to start worrying about out-of-order updates things become a bit less clear.

TPCH

There are at least two interesting queries from TPCH that stress count_u: query 1 and query 13.

Query 1 is that block of SQL we saw up above. We do some light filtering, but then we just want to add up a lot of fields in the input records. We can run this with count_u and with count_total_u, and we get the following elapsed times on a scale-factor 1 input with batch sizes varied from 1,000 up to 100,000:

Q01             1,000   10,000  100,000
                -----   ------  -------
count_u         4.04s    4.24s    5.10s
count_total_u   1.48s    1.42s    1.61s

The throughput triples, which is great news. This query really is just adding things up, so it makes sense that we get whatever performance benefits we brought into counting.

Query 13 is roughly like the degree-counting example above: it asks for the histogram of the number of customer comments from each customer (non-complaint comments, at least). It claims it has a left-join, but it is really just counting.

Q13             1,000   10,000  100,000
                -----   ------  -------
count_u         6.18s    4.47s    3.83s
count_total_u   2.24s    1.66s    1.52s

Also a pretty sweet improvement.

Conclusions

I'm not sure what your take is, but I think the count_total_core implementation up above is totally approachable, if not "trivial". Counting is probably one of the simplest things I could have written, but it is also pretty pervasive. Differential dataflow has an open issue to specialize operators to totally ordered times, and I could imagine other operators receiving similar improvements.

This is all a bit less glamorous, because totally ordered times have nothing to do with what is "new" about differential dataflow (partially ordered times), but it does seem important to do if we want to keep up in the benchmarks that don't exercise the things that differential excels at (e.g. updating iterative computations).

I'm off to write some tests and then ship this. Cheers!

Postscript

The main observable difference between count and count_total, other than the performance, is that the innards of count produce an arrangement of output data, rather than an unarranged stream. We needed that arrangement in the general case, because we had to consult what counts we had previously produced, and this is weird with partially ordered times. In the restricted setting, we don't need to remember what we previously produced because it is always just whatever count is there now (that this isn't true for partially ordered times should worry you).

So count_total doesn't produce arranged data, because if the consumer doesn't need it (as in the degrees.rs example) it is just wasted effort. On the other hand, if a consumer could use the output it would be easier for count_total to do the work than the consumer, because it produces the output grouped by key, in order of time. I'm not sure how to pick between the two; perhaps it is something a higher-level language could think about and decide.