Skip to content

Latest commit

 

History

History
766 lines (589 loc) · 42.7 KB

2019-06-13.md

File metadata and controls

766 lines (589 loc) · 42.7 KB

Trusted connection paths through forums created in a given timeframe

That wordy title thar is the name for the Linked Data Benchmark Council's Business Intelligence Query 25. This query is going to be the subject of a few posts, as we attempt to implement what seems to be a pretty sophisticated query.

Rather than writting a massive missive about how you write such a query in one of several frameworks, I'm going to spill things out slowly. This is mainly to give people time to ponder the problem, and think about which parts are challenging and why. It is also to give me time to finish my implementation, which as it turns out doesn't just write itself.

Breaking down the problem (2019-06-13)

To start with, let's talk through the problem.

If you plan to follow along, I strongly recommend clicking that link up there and reading a bit. It is a one pager which describes the query both in English prose and using a graphical query language. Although we'll introduce the language and components of the problem, I'll be writing as if you have access to the link above and can use it to resolve ambiguities.

Go read it!

The problem, briefly

Our task is to take as input pairs of "people", each represented by integer identifiers, and then find and score the shortest paths between them in a directed graph. The score for a path is determined by scoring each link based on the number and type of interactions between its endpoints, and then accumulating the scores of each link in the path. There is the additional complication that we should only accumulate interactions whose root event (these interactions are forum posts) has a date that lies in a specified interval.

The problem calls out a few interesting challenges, at least to my mind:

  1. Finding shortest paths in graphs is surprisingly harder than computing the distance between two nodes, which is surprisingly harder than computing the distance from one node to all other nodes in the graph, which is the only thing scalable graph processors bother to do with paths in graphs.

    We will need to be clever and careful here, to efficiently report the shortest paths in graphs between pairs of query nodes. For example, presumably you've noticed that "paths" is plural and not singular.

  2. Link scoring is potentially expensive, and not something we want to do for all interactions, but rather just those interactions that pique our interest.

  3. Link filtering is even more expensive, as it involves chasing down the root of a chain of interactions. There will be iteration.

  4. Path scoring at the very least involves data types that are vectors. Ideally we could have left the shortest path data as a DAG, but the query asks us to expand it out into the potentially (it won't be) exponentially many shortest paths and their scores.

And obviously because this is the blog you think it is, all of this should be scalable, interactive, and incrementally maintained as the data change.

It will be. Don't worry. ;)

The data and query inputs, and the output.

The query inputs are perhaps the simplest, so let's start there. Each instance of the query has four parameters (all in the linked pdf; go read it again!):

person1Id:  usize,
person2Id:  usize,
startDate:  Date,
endDate:    Date,

The data inputs come in three flavors (in my mind).

knows:      (usize, usize),         // e.g. (person1, person2)
posts:      (usize, usize, usize),  // e.g. (id, author, forum)
comments:   (usize, usize, usize),  // e.g. (id, author, parent)
forum:      (usize, Date),          // e.g. (id, creation_date)

This isn't exactly how LDBC represents things, but we'll deal with that when we get there.

The required output is, for each query quadruple a list of tuples with fields

path:       Vec<usize>,
score:      f64,

We are going to change the score to be a usize because we like doing math with correct answers, which in this case is all fine (we can double the requested scores and get integral non-negative scores).

Breaking apart the parts of the query.

I've written a fair bit of the query so far, and it comes in parts. They roughly track the challenges above, but I thought I'd start by sketching the structure without yet developing the details.

First, for reasons of sanity we are going to want to pretend to impose some type structure on all of these identifiers.

type Node = usize;  // identifies a person.
type Text = usize;  // identifies a post or comment.
type Forum = usize; // identifies a forum.
type Edge = (Node, Node);

These are just type aliases, and won't actually keep us from comparing people identifiers and forum identifiers, but it will at least let us be more clear about our intent. We could certainly create actual new types wrapping these identifiers, at the cost of a fair bit of boilerplate. I'm up for that, but once I understand that we have the right structure in place.

The skeleton I'm currently working with starts like so:

fn main() {

    // define a new computational scope, in which to run BFS
    timely::execute_from_args(std::env::args(), move |worker| {

        // Create a dataflow for BI Q25, returning handles to inputs.
        let (mut query, mut knows, mut posts, mut comms, mut forum) =
        worker.dataflow(|scope| {

            // Create various input handles and collections.
            let (query_input, query) = scope.new_collection();
            let (knows_input, knows) = scope.new_collection();
            let (posts_input, posts) = scope.new_collection();
            let (comms_input, comms) = scope.new_collection();
            let (forum_input, forum) = scope.new_collection();

            // 1. Determine edges in shortest paths, for each query.
            let goals = query.map(|(src,dst,_,_)| (src,dst));
            let mut shortest_edges: Collection<_,((Node, Node), Edge)> =
                = shortest_paths(&knows, &goals);

            // 2. Score each edge, broken down by the root post.
            let mut edge_scores: Collection<_, (Edge, Text)>
                = score_edges(&shortest_edges, &posts, &comms);

            // 3. Merge queries and scores, filter by start and end dates.
            let filtered_edges: Collection<_, ((Node, Node), (Edge, usize))>
                = unimplemented!();

            // 4. Reconstruct paths and scores.
            let scored_paths: Collection<_, ((Node, Node), (Vec<Node>, usize))>
                = unimplemented!();

            // 5. Announce massive success!
            scored_paths.inspect(|x| println!("WOW:\t{:?}", x));

As you can see, we are doing little more than creating a new dataflow graph, announcing some inputs we plan to use (one for each stated input, as well as an input for the queries themselves), and pretending to do the computation through some methods that may or may not yet exist (certainly not yet discussed).

I've clearly not written everything yet. I haven't even gotten around to figuring out the signature for the third and fourth steps yet! I kinda nailed step five though, I hope we agree.

Next steps

We have a few steps laid out ahead of us.

  1. We need to write the shortest_paths method. This method will need to go from a collection of goals (source, target) and produce ((source, target), edge) for each edge on a shortest path from source to target. At least, that is what it will do.

    This method is going to be a bit of a beast. It is about 100 lines long, which is enormous for a differential dataflow method. I think it mostly makes sense though, at least taken in small parts at a time.

  2. We need to write the score_edges method. This method will need to check out various posts and comments and look for links among them that correspond to pairs of authors we saw in edges just above.

    This isn't so bad. It's much shorter at the moment, but it doesn't compile yet so .. right.

  3. We need to do the other steps too. Three and four. I think these are easier, but I'm writing this well before I've gotten to even thinking about that. Really, writing this is the first time I've thought about it, so there might be some fairly public failure here. That should be fun!

Finding shortest paths in graphs (2019-06-16)

Today we'll be writing a method for finding the edges in a directed graph that contribute to the shortest paths between query pairs of nodes. Its signature looks like this:

fn shortest_paths<G>(
    edges: &Collection<G, Edge>,
    goals: &Collection<G, (Node, Node)>,
) -> Collection<G, ((Node, Node), Edge)>
where
    G: Scope,
    G::Timestamp: Lattice + Ord,
{
    unimplemented!()
}

There are two differential dataflow collections as inputs: edges which contains naturally enough a bunch of directed edges, and goals which contains pairs of source and target nodes for which we should find some shortest paths (and produce output). The output collection should contain pairs of goals and edges, each of which indicates that that edge participates in the shortest paths for that goal. Ideally we produce all of the edges involved in shortest paths, and none that are not.

So...

Now we actually have to think about doing this. How do we compute the set of shortest paths between a source and target node?

Amazingly, despite the volume of graph systems and research done, almost none of the papers report on anything remotely like this. The closest problem that systems attack is the "Single Source Shortest Path" problem, in which we have a source but no specific target, and the goal is to assess the distance from this node to each other node in the graph.

Note: distance not path, and to all nodes rather than one target node.

You probably agree that paths are more than distances so we'll need to fix that, but couldn't we just do paths to all nodes and call it a day? In principle we could, but the performance difference is so massive, that we should probably try and do this well. Fortunately, we've already done it well once before, and we'll just need to tidy up that whole paths versus distances thing.

Single-Source Shortest Paths

Let's start with a refresher on differential dataflows by writing out the logic for a classical single-source shortest paths computation, where we have a collection of sources, and we want to report the distances from each source to each other node. This ends up being a fairly vanilla iterative computation:

// iteratively develop `(dst, (src, len))` statements.
roots.map(|x| (x,(x,0))
     .iterate(|dists|
        // join edges with current distances.
        edges.enter(&dists.scope())
             .join(&dists)
             .map(|(_mid,(dst,(src,len)))| (dst,(src,len+1)))
             .concat(&dists)
             .map(|(dst,(src,len))| ((dst,src),len))
             .reduce(|_dst_src, input, output|
                 // output minimum distance
                 output.push((*input[0].0,1))
             )
     )

This is a not-optimized implementation that repeatedly develops statements of the form "src can reach dst along len edges". It does this by joining such statements with the collection edges of directed edges, then reducing candidate distances by (dst, src) key and retaining only the shortest distance.

It's totally fine for what it is, but it does a fairly massive amount of work for each entry in roots and it determines only the shortest distance, not the paths one should follow to achieve that distance.

Reconstructing shortest paths

To foreshadow how we might reconstruct the paths, notice that _mid coordinate that gets discarded just after the join. This is the graph node by which we made our way to dst. If we write that down with len, then in our reduce when we determine the minimum length, we can produce the _mid with that length. There might be multiple _mid with the same length, and we could either produce them all (to get all shortest paths) or pick one (to get a single shortest path).

Bi-directional Dijkstra

To efficiently search for shortest paths between specific goal pairs of source and target, we'll need a new algorithm. Fortunately, such an algorithm already exists, and is even already implemented in differential dataflow's library of graph algorithms: bijkstra.rs.

The gist of the algorithm is that we can iteratively develop paths from our source outward at the same time we develop paths from our target backwards. As we want to minimize the number of edges, we can stop the computation as soon as the outward and backward explorations intersect.

It may not be immediately obvious why this is such a clever idea. It sounds pretty similar to the SSSP algorithm up above, only we've made it more complicated.

To explain the virtue of the algorithm, we'll want a physical analogy: Imagine as we expand outward from our source we are inflating a balloon. The volume of that balloon contains all of the nodes we've visited, and with each round of iteration it grows. If we just head out from our source, and if we are talking about 3D real-world space, then by the time we reach a target at distance len away our balloon will contain an amount of air cubic in len.

On the other hand, if we start inflating balloons at both the source and the target, then they will intersect when each has radius half of len, meaning that although there are two balloons, the factor of two reduction in len means a total factor of four reduction in volume.

Now, in many real-world graphs the volume (number of nodes) does not grow cubicly with the length, but rather exponentially: each additional hop increases the number of nodes by a multiplicative factor. In this case, the reduction of bi-directional Dijkstra is even more amazing: it can be more like a square root than a factor of four.

A brief demonstration

Because maybe you don't believe me yet, let's try out both traditional single-source shortest paths and bi-directional Dijkstra on large random graphs. We have some real-ish LDBC data, but for the moment I'm just going to use G(n,m) random graphs because they are easier to play around with.

I just spun up both algorithms a random directed graph with 1 million nodes and 10 million edges, where in each round we add a new source node and for bi-directional Dijkstra an associated goal node. We are changing the graph a little as we do this, but mostly I want to show you the query times.

First, single-source shortest paths.

Echidnatron% cargo run --release --bin sssp -- 1000000 10000000 1 100 inspect
    Finished release [optimized] target(s) in 0.10s
     Running `target/release/sssp 1000000 10000000 1 100 inspect`
performing BFS on 1000000 nodes, 10000000 edges:
202.141463ms    loaded
2.295315765s    stable
    (0, 1, 1)
    (1, 1, 10)
    (2, 1, 89)
    (3, 1, 892)
    (4, 1, 8889)
    (5, 1, 84214)
    (6, 1, 515544)
    (7, 1, 388146)
    (8, 1, 2169)
    (9, 1, 1)
6.320190308s    0:  4024846716
    (1, 2, -1)
    (2, 2, -14)
    (3, 2, -156)
    (4, 2, -1631)
    (5, 2, -14898)
    (6, 2, -53846)
    (7, 2, 68138)
    (8, 2, 2406)
    (9, 2, 2)
12.201397992s   1:  5881199856
    (1, 3, -1)
    (2, 3, 5)
    (3, 3, 28)
    (4, 3, 472)
    (5, 3, 4851)
    (6, 3, 18685)
    (7, 3, -23054)
    (8, 3, -984)
    (9, 3, -2)
18.465209361s   2:  6263803408
^C
Echidnatron%

We are printing out the number of nodes at each distance as a summary of the output (we don't actually want to print all distances to all nodes). But the most important thing is that it takes about six seconds to probe a new source (and to retract the previous source, which we are also doing).

Compare that to bi-directional Dijkstra,

Echidnatron% cargo run --release --bin bijkstra -- 1000000 10000000 1 100 inspect
    Finished release [optimized] target(s) in 0.11s
     Running `target/release/bijkstra 1000000 10000000 1 100 inspect`
performing BFS on 1000000 nodes, 10000000 edges:
210.466961ms    loaded
4.604447139s    stable
    (((0, 1), 6), 1, 1)
4.611941914s    0:  7463578
    (((0, 1), 6), 2, -1)
    (((1, 2), 7), 2, 1)
4.672459858s    1:  60508281
    (((1, 2), 7), 3, -1)
    (((2, 3), 6), 3, 1)
4.740858752s    2:  68388696
    (((2, 3), 6), 4, -1)
    (((3, 4), 6), 4, 1)
4.756965996s    3:  16094611
    (((3, 4), 6), 5, -1)
    (((4, 5), 7), 5, 1)
4.811579075s    4:  54603873
    (((4, 5), 7), 6, -1)
    (((5, 6), 7), 6, 1)
4.909786273s    5:  98198597
    (((5, 6), 7), 7, -1)
    (((6, 7), 7), 7, 1)
4.994313138s    6:  84516686
    (((6, 7), 7), 8, -1)
    (((7, 8), 6), 8, 1)
5.036940575s    7:  42617149
    (((7, 8), 6), 9, -1)
    (((8, 9), 6), 9, 1)
^C
Echidnatron%

Once we get up and running, which admittedly takes longer than for SSSP because we must index the graph in both forward and reverse directions, each query comes back in fractions of a second rather than sixes of a second.

Developing bi-directional Dijkstra

Let's talk through the code we will use for determining the shortest paths between goal pairs of nodes. This code will come right where the unimplemented!() was when we described the method signature.

The algorithm is necessarily iterative, and the first thing we'll do is establish a new iterative context, in which collections can evolve over multiple rounds, and bring in the edges and goals collections:

    // Iteratively develop reachability information.
    edges.scope().iterative::<u64, _, _>(|inner| {

        let goals = goals.enter(inner);
        let edges = edges.enter(inner);

        // to be continued ...

Many of the vignette examples we've seen before use the iterate method on collections, which starts from the collection and repeatedly updates it. Our computation will need to use mutual recursion, in which two collections develop concurrently. They will interact (by ceasing once they intersect) and so we cannot just develop them independently.

To create such iterative variables, we dive into differential dataflow's iterate module, which has a Variable type suited just for this. It is essentially a collection that can be used before its contents are defined, and once they get defined creates an iterative computation.

We are going to create two variables, one for reachability in the forward direction and one for reachability in the reverse direction:

        use differential_dataflow::operators::iterate::Variable;

        // forward: ((mid1,mid2), (src, len)) can be read as
        //      "src -len-> mid1 -> mid2 is a shortest path from src to mid2."
        let forward = Variable::new(inner, Product::new(Default::default(), 1));
        // reverse: ((mid1,mid2), (dst, len)) can be read as
        //      "mid1 -> mid2 -len-> dst is a shortest path from mid1 to dst."
        let reverse = Variable::new(inner, Product::new(Default::default(), 1));

These collections will be defined towards the end of our iterative scope. Right now we need to build up the associated collections and logic that allow us to define them.

A first thing we'll want to do is see for which goals do their forward and reverse reachable sets intersect. This can be done by taking the edges in the forward and reverse collections and joining on them. We'll write down the sum of their lengths, and the edge itself so we know where to go when we find the minimizing lengths.

        // reached((src, dst), (mid1, mid2)) can be read as
        //      src -*-> mid1 -> mid2 -*-> dst is a shortest path.
        let reached: Collection<_, ((Node, Node), Edge)> =
        forward
            .join_map(&reverse, |&(m1,m2), &(src,len1), &(dst,len2)|
                ((src, dst), (len1 + len2, (m1,m2)))
            )
            .semijoin(&goals)
            .reduce(|&_src_dst, source, target| {
                let min_len = (source[0].0).0;
                for &(&(len,edge),_wgt) in source.iter() {
                    if len == min_len {
                        target.push((edge, 1));
                    }
                }
            });

This reached set will play an important role in producing the output from the method, but for now it also lets us identify "active" goals: those that have not yet been satisfied. We'll assume that goals contains distinct records (this is probably wrong) and simply subtract out distinct goals in reached. Note that above we did a semijoin with goals so the only things we should be subtracting are actual goals.

        // Subtract from goals any goal pairs that can reach each other.
        let active =
        reached
            .map(|((src,dst),_mid)| (src,dst))
            .distinct()
            .negate()
            .concat(&goals)
            .consolidate();

The active goals allow us to drive reachability forward (and reverse) for only the subset of sources (and targets) that still need to be developed. This will be what allows us to terminate the iteration as reachable sets intersect.

Now we just need to define the rule for forward reachability: how should we set forward for next iteration, based on what we have so far. Informally, we 1. restrict our attention to active sources, 2. join with edges in the forward direction, and 3. minimize for each pair of source and destination

        // Let's expand out forward queries that are active.
        let forward_active = active.map(|(x, _y)| x).distinct();
        let forward_next =
        forward
            .map(|((_mid0,mid1), (src, len))| (src, (mid1, len)))
            .semijoin(&forward_active)
            .map(|(src, (mid1, len))| (mid1, (src, len)))
            .join_map(&edges, |&mid1, &(src, len), &mid2| {
                ((mid1,mid2), (src, len + 1))
            })
            .concat(&*forward)
            .map(|((mid1,mid2),(src,len))| ((mid2,src),(len,mid1)))
            .reduce(|_key, s, t| {
                let min_len = (s[0].0).0;
                for (&(len,mid1), _weight) in s.iter() {
                    if len == min_len {
                        t.push(((len, mid1), 1));
                    }
                }
            })
            .map(|((mid2, src), (len, mid1))| ((mid1,mid2),(src,len)));

It's a lot of characters. I'm sorry. We are trying to fix this.

We can also go in the reverse direction:

        // Let's expand out reverse queries that are active.
        let reverse_active = active.map(|(_x, y)| y).distinct();
        let reverse_next =
        reverse
            .map(|((mid1,_mid2), (rev, len))| (rev, (mid1, len)))
            .semijoin(&reverse_active)
            .map(|(rev, (mid1, len))| (mid1, (rev, len)))
            .join_map(&edges.map(|(x, y)| (y, x)), |&mid1, &(rev, len), &mid0| {
                ((mid0,mid1), (rev, len + 1))
            })
            .concat(&reverse)
            .map(|((mid0,mid1),(rev,len))| ((mid0,rev),(len,mid1)))
            // .map(|(edge, (rev, len))| ((edge, rev), len))
            .reduce(|_key, s, t| {
                let min_len = (s[0].0).0;
                for (&(len,mid1), _weight) in s.iter() {
                    if len == min_len {
                        t.push(((len, mid1), 1));
                    }
                }
            })
            .map(|((mid0, rev), (len, mid1))| ((mid0,mid1), (rev, len)));

These collections, forward_next and reverse_next are what we will want to assign as the definitions of forward and reverse, but we don't want to do it just yet.

We actually have one more variable we want to introduce, which is the collection of edges on shortest paths. This is something that we will build up by walking backwards from those edges in reached, which was the collection of witnesses to shortest path reachability. All we really do here is use the output of reached to crawl back up forward and reverse, retracing our steps back to the source and target, respectively.

Here is how I wrote it:

        let shortest = Variable::new(inner, Product::new(Default::default(), 1));

        let forward_dag = forward.map(|((mid1,mid2),(src,_len))| ((src,mid2),mid1));
        let reverse_dag = reverse.map(|((mid1,mid2),(dst,_len))| ((dst,mid1),mid2));

        let short_forward =
        shortest
            .map(|((src,dst),(mid1,_mid2))| ((src,mid1),dst))
            .join_map(&forward_dag, |&(src,mid1),&dst,&mid0| ((src,dst),(mid0,mid1)));

        let short_reverse =
        shortest
            .map(|((src,dst),(_mid0,mid1))| ((dst,mid1),src))
            .join_map(&reverse_dag, |&(dst,mid1),&src,&mid2| ((src,dst),(mid1,mid2)));

        let short =
        short_forward
            .concat(&short_reverse)
            .concat(&reached)
            .distinct();

        shortest.set(&short);

At this point, we've used forward and reverse for the last time, and can bind them off too, and return the output of the computation: short just above.

        forward.set(&forward_next.concat(&goals.map(|(x, _)| ((x,x),(x,0)))));
        reverse.set(&reverse_next.concat(&goals.map(|(_, y)| ((y,y),(y,0)))));

        short.leave()
    })

Trying out the LDBC synthetic data

I've got my hands on the LDBC synthetic data generator. Apparently it requires a Hadoop cluster to run, because big data. I'm going to try it out with scale-factor 1, which produces 1GB of data. Most of the data is not the social graph, however, which has only a few hundred thousand edges.

Because I don't know any better, I'm just going to feed in the graph and start querying it with random goals, chosen from the set of nodes with non-zero out degree as sources and non-zero in degree as targets.

Echidnatron% cargo run --release --bin ldbc_bi_25 -- ~/Projects/ldbc/ldbc_snb_datagen/social_network/ true
    Finished release [optimized] target(s) in 0.10s
     Running `target/release/ldbc_bi_25 /Users/mcsherry/Projects/ldbc/ldbc_snb_datagen/social_network/ true`
read 180623 lines
performing Bi-directional Dijkstra on (8250,8466) nodes, 180623 edges:
61.834295ms loaded
137.455696ms    stable
137.473761ms    query (15393162789932 -> 21990232563951)
229.845344ms    round 0
229.864122ms    query (21990232565205 -> 26388279066830)
458.713557ms    round 1
458.732648ms    query (2199023264077 -> 15393162797342)
675.426968ms    round 2
675.445694ms    query (17592186045005 -> 6766)
787.770641ms    round 3
787.788999ms    query (30786325579455 -> 30786325580460)
901.046842ms    round 4
901.065839ms    query (8796093029585 -> 24189255813581)
980.537131ms    SHORTEST    (((8796093029585, 24189255813581), (21990232558990, 21990232563274)), 6, 1)
980.555949ms    SHORTEST    (((8796093029585, 24189255813581), (21990232558990, 21990232565255)), 6, 1)
980.558546ms    SHORTEST    (((8796093029585, 24189255813581), (21990232563274, 21990232566092)), 6, 1)
980.560684ms    SHORTEST    (((8796093029585, 24189255813581), (21990232565255, 21990232566092)), 6, 1)
1.081254950s    SHORTEST    (((8796093029585, 24189255813581), (8796093029585, 21990232558990)), 6, 1)
1.081273169s    SHORTEST    (((8796093029585, 24189255813581), (21990232566092, 24189255813581)), 6, 1)
1.085112894s    SHORTEST    (((8796093029585, 24189255813581), (8796093029585, 8796093029585)), 6, 1)
1.085130588s    SHORTEST    (((8796093029585, 24189255813581), (24189255813581, 24189255813581)), 6, 1)
1.088113365s    round 5

You may notice that the first few rounds go by with no output. That is what things look like when there is no path from the source to the target. I hope that is actually the case here!

Eventually we get to a query with actual paths, and we get to see the full magesty of LDBC node identifiers. Note: I'm totally in favor of gross and nasty identifiers, as it trips up all of the silly "I'll just index into an array" implementations out there. Ha!

What we are actually seeing above, though, is a small directed acyclic fragment of the input graph that connects source node 8796093029585 and target node 24189255813581. It is a bit wordy, so let's isolate the actual edges:

    (8796093029585, 8796093029585))
    (8796093029585, 21990232558990))
    (21990232558990, 21990232563274))
    (21990232558990, 21990232565255))
    (21990232563274, 21990232566092))
    (21990232565255, 21990232566092))
    (21990232566092, 24189255813581))
    (24189255813581, 24189255813581))

There is still a lot of noise in here, including two self-loops for the source and target, and just large but similar identifiers. Let me simplify it as

    (8796093029585, A))
    (A, B))
    (A, C))
    (B, D))
    (C, D))
    (D, 24189255813581))

which as you can see describes two paths of equal length. Probably they are the shortest paths, or we got our algorithm very wrong! Both could be true!

We can let this continue, and we reach round 100 in about 11 seconds.

...
11.206315945s   query (17592186047529 -> 24189255813921)
11.21979989s    SHORTEST    (((4398046516223, 30786325578914), (6597069767851, 24189255817517)), 101, -1)
11.219819956s   SHORTEST    (((4398046516223, 30786325578914), (13194139537399, 24189255821300)), 101, -1)
11.257095587s   SHORTEST    (((4398046516223, 30786325578914), (4398046516223, 6597069767851)), 101, -1)
11.257114196s   SHORTEST    (((4398046516223, 30786325578914), (4398046516223, 13194139537399)), 101, -1)
11.257119141s   SHORTEST    (((4398046516223, 30786325578914), (24189255817517, 30786325578914)), 101, -1)
11.257123084s   SHORTEST    (((4398046516223, 30786325578914), (24189255821300, 30786325578914)), 101, -1)
11.264713013s   SHORTEST    (((4398046516223, 30786325578914), (4398046516223, 4398046516223)), 101, -1)
11.264730266s   SHORTEST    (((4398046516223, 30786325578914), (30786325578914, 30786325578914)), 101, -1)
11.268324036s   round 100
...

The output here is a bunch of subtractions, because apparently the new query doesn't connect to its target, and we are retiring the previous query that did connect. It turns out that the more expensive operations are when no connection occurs, because then you have to explore the entire graph. Finding connections early makes things faster, which is I guess what you would expect in normal computer science (not always so, in big data science).

Scoring interactions of people (2019-06-19)

Our next step is to score the interactions of people in our shortest paths.

Specifically, we have a collection of edges, pairs of two people who are linked by the knows relationship. For each of these people, we want to look up their interactions when one replied to a post or comment of another. All of this data live in the posts and comms relations containing posts and comments respectively. We have a few more rules, and in particular for each interaction we want to return the forum in which the root post occurred, so that we can filter the interactions by forum.

The specific method signature we'll use is

fn score_edges<G>(
    edges: &Collection<G, Edge>,                    // (source, target)
    posts: &Collection<G, (Text, (Node, Forum))>,   // (id, author, in_forum)
    comms: &Collection<G, (Text, (Node, Text))>,    // (id, author, reply_to)
) -> Collection<G, (Edge, Forum)>
where
    G: Scope,
    G::Timestamp: Lattice + Ord,
{
    unimplemented!()
}

We get edges, posts, and comms as input collections, and we want to produce a collection that enumerates for each edge the associated forums in which interactions occurred. Very likely that result collection will contain multiplicities: each forum might occur multiple times, reflecting multiple interactions.

This method is much less complicated than shortest path finding, which is a bit of a relief. There are a few ways we could approach things, but we'll follow a pretty simple route.

First, we'll want to point each comment at its parent, which could be either a post or a comment itself. These two cases are different, and we want to treat them differently. In both cases, though, we'll want to extract the pair of authors that interact. Specifically, we will output the min and max of the two authors, to put them in a canonical form, because the directionality of the interaction doesn't matter for our logic. We'll also want to restrict things down to just the edges we are interested in.

In the first case we'll have a pair of authors (comment and post) and a forum identifier, which is great!

    // Perhaps a comment links to a post ...
    let comm_post: Collection<_,(Edge,Forum)> =
    comms.map(|(id, (auth, link))| (link, (id, auth)))
         .join_map(&posts, |_post, &(_id,auth_c), &(auth_p, forum)| {
             let min = std::cmp::min(auth_c, auth_p);
             let max = std::cmp::max(auth_c, auth_p);
             ((min, max), forum)
         })
         .semijoin(&edges);

In the second case we'll have a pair of authors and a .. some comment identifier, not the forum identifier, which is less great! We will capture the identifier of the parent, and resolve that in just a moment.

    // Perhaps a comment links to a comment ...
    let comm_comm: Collection<_,(Edge,Text)> =
    comms.map(|(id, (auth, link))| (link, (id, auth)))
         .join_map(&comms, |_comm, &(_id,auth_c), &(auth_p,link)| {
             let min = std::cmp::min(auth_c, auth_p);
             let max = std::cmp::max(auth_c, auth_p);
             ((min, max), link)
         })
         .semijoin(&edges);

To resolve the forum identity for a comment, we'll want access to all the comments, and the links they form from child to parent. These links form a directed forest (a bunch of trees), with the posts at the root. We'll also throw in self-links from each post to itself, to make our life easier in just a moment.

    // All comment -> parent links.
    let links =
    comms.map(|(id, (_, link))| (id, link))
         .concat(&posts.map(|(id, _)| (id, id)));

And with this in hand, we can iteratively climb along these links until we reach fixed point, because posts point at themselves. This will eventually get us to an identifier for a post, which we can use to look up the forum. The iterative climb we do with an iterate command, and the post lookup with a join.

    comm_comm
        .map(|(edge, link)| (link, edge))
        .iterate(|scores|
            // climb upward along `links`.
            links.enter(&scores.scope())
                 .join_map(&scores, |_src, &dst, &edge| (dst, edge))
        )
        .join_map(&posts, |_post,&edge,&(_, forum)| (edge, forum))
        .concat(&comm_post)
        .concat(&comm_post)
        .consolidate()
}

That curly brace means we are done. We just iteratively climb up along the links and then join with posts. We add in comm_post for good measure (twice, because the rules are that these interactions count twice as much). And we're done!

Trying it out

I changed my test harness a bit to produce deterministic queries (enumerating a Rust hash map: not deterministic), and to start from nodes with low-ish but not very low degree (I chose 10).

Echidnatron% cargo run --release --bin ldbc_bi_25 -- ~/Projects/ldbc/ldbc_snb_datagen/social_network/ 10 false
    Finished release [optimized] target(s) in 0.14s
     Running `target/release/ldbc_bi_25 /Users/mcsherry/Projects/ldbc/ldbc_snb_datagen/social_network/ 10 false`
comms: read 2052169 lines
posts: read 1003605 lines
knows: read 180623 lines
performing Bi-directional Dijkstra on (8250,8466) nodes, 180623 edges:
2.347249502s	loaded
5.334716881s	stable

The first thing to note is that comms and posts are a fair bit bigger than knows. We aren't doing quite the same exotic computation with them, though, so it's not horribly painful. We haven't even thought about how we might maintain posts in an arrangement and re-use it in two of our joins (go check!).

Once we get going, we start to see some output:

5.429971809s	round 0 (query: 1829 -> 5274)
5.71453694s	round 1 (query: 2269 -> 7567)
SHORTEDGE: (((2843, 10169), (4555, 5274)), 3, 1)
SHORTEDGE: (((2843, 10169), (5274, 7216)), 3, 1)
SHORTEDGE: (((2843, 10169), (2843, 4555)), 3, 1)
SHORTEDGE: (((2843, 10169), (7216, 10169)), 3, 1)
EDGESCORE: (((2843, 4555), 99268), 3, 24)
EDGESCORE: (((2843, 4555), 1055757), 3, 38)
EDGESCORE: (((4555, 5274), 36261), 3, 1)
EDGESCORE: (((4555, 5274), 66684), 3, 70)
EDGESCORE: (((4555, 5274), 99268), 3, 16)
EDGESCORE: (((4555, 5274), 1051665), 3, 2)
EDGESCORE: (((4555, 5274), 137438982755), 3, 2)
EDGESCORE: (((4555, 5274), 137439020231), 3, 1)
EDGESCORE: (((4555, 5274), 274878002164), 3, 1)
EDGESCORE: (((4555, 5274), 274878007528), 3, 4)
EDGESCORE: (((4555, 5274), 412317917630), 3, 1)
EDGESCORE: (((5274, 7216), 3536), 3, 50)
EDGESCORE: (((5274, 7216), 22114), 3, 1)
EDGESCORE: (((5274, 7216), 55135), 3, 3)
EDGESCORE: (((5274, 7216), 66684), 3, 94)
EDGESCORE: (((5274, 7216), 1786706465559), 3, 1)
EDGESCORE: (((7216, 10169), 3536), 3, 58)
EDGESCORE: (((7216, 10169), 88394), 3, 12)
EDGESCORE: (((7216, 10169), 1511828505852), 3, 2)
EDGESCORE: (((7216, 10169), 1511828505853), 3, 1)
5.990712664s	round 2 (query: 2843 -> 10169)

Here SHORTEDGE are edges observed on shortest paths for indicate queries. At the moment, the source is 2843 and the destination is 10169. Apparently we can get from one to the other on a short path through nodes with short names, which is a relief.

The EDGESCORE outputs are pairs of edge and forum identifier, where each edge should ideally be one of those in the SHORTEDGE output. I've just looked and verified that this is indeed the cose, so good work us. The forum identifiers can be all over the place, and you might also notice that the counts, the last coordinate, are not always 1. Some forums, like 66684, are pretty popular.

This continues for a bit, and generally seems to be pretty brisk (as before, the slow rounds are the ones with no paths, which provide no work for the edge scoring dataflow to do).

Things are looking pretty good, I figure. Of course, we aren't actually testing this for correctness yet. I'm not entirely sure when we are going to get around to that (or how, to be honest), but for the moment let's just imagine that we are quite clever and fast.

Filtering by dates (2019-06-23)

We are getting pretty close to the end here. I think.

We have so far produced two collections, shortest_edges and edge_scores, containing respectively pairs of goals and the edges of their shortest paths, and pairs of relevant edges and forums in which their endpoints have interacted (with multiplicities for the numbers of times).

Our next step is to put these together with our query inputs, pairs of goals and lower and upper time bounds, and restrict the scores for a query down to forums with a creation date within the time bounds. There will be some joins.

I did this in-line, rather than creating a method for it.

    // 3. Merge queries and scores, filter by start and end dates.
    let _scored_edges =
    query
        .join_map(&shortest_edges, |&goal, &bounds, &edge| (edge, (goal, bounds)))
        .join_map(&edge_scores, |&edge, &(goal, bounds), &forum| (forum, (goal, edge, bounds)))
        .join_map(&forum, |_forum, &(goal, edge, bounds), &time| 
            (time >= bounds.0 && time <= bounds.1, goal, edge)
        )
        .filter(|x| x.0)
        .map(|(_, goal, edge)| (goal, edge))
        .concat(&shortest_edges)
        .count()
        .map(|(x,c)| (x,c-1))
        .inspect(|x| println!("SCORED: {:?}", x))
        .probe_with(&mut probe);

The gist is what you might expect: each join fetches some data that we've already computed: first the shortest path edges for a query, then the scores for each of those edges, then the creation date for referenced forums. We filter down by forums within the bounds, and then finally, just before we accumulate up the scores, we toss in a copy of shortest_edges for good measure.

This last step is there to deal with the case that there may be no score for an edge, and we don't want to lose track of it in that case because we still have to re-assemble the shortest paths. What we are doing is kin to a "LEFT JOIN" where keys on the left side of the join that do not match are nonetheless surfaced with a NULL value indication. We can achieve the same thing by stirring in all keys, and then subtracting one from the count that results.

Most of the computation above should be relatively cheap, because we have few queries (100) and the shortest_edges and edge_scores collections are both restricted by the query as well.

Trying it out

I've changed my evaluation framework again, sorry, this time to try and get at incremental update strategies. Also, I found the actual queries that LDBC uses for BI query 25, and so I'll just use those.

We have five inputs, one for the queries and four for data. We'll load up all 100 queries at the beginning of time, so that we can monitor them as the computation proceeds. Perhaps we should add and remove them to get query latency measurements, but we'll do that in another experiment. For now, all queries are loaded initially, and then all other data are loaded in order of their creationDate attribute.

There are some three million plus events we are looking at, which means our output describes 300 million BI25 queries, each of the 100 queries refreshed after each of the event additions.

The performance depends surprisingly strongly on the physical batching in the input. If the data go in all in one batch (we still see all 300M refreshes) it takes 38 seconds (so "~10M refreshes / second" which is total BS but something I'm skilling up on). As the batch size decreases the time increases, thusly:

batch size elapsed time refresh rate
1,000 229.490s 14.37Hz
10,000 109.740s 3.01Hz
100,000 55.024s 0.60Hz
1,000,000 42.780s 0.07Hz
10,000,000 38.375s 0.03Hz

If we put a second worker into the mix, things mostly speed up, except for the small batch size.

batch size elapsed time refresh rate
1,000 !#@$ing forever
10,000 79.430s 4.15Hz
100,000 36.675s 0.90Hz
1,000,000 26.160s 0.12Hz
10,000,000 24.138s 0.04Hz

No clue what is going on with those smaller batches, except yet another bottleneck to be discovered. The work of the scientist is never done!

Reconstructing paths

A declarative implementation