broadcast join using interval tree #1224

Closed
jpdna opened this Issue Oct 24, 2016 · 11 comments

Comments

Projects
4 participants
@jpdna
Member

jpdna commented Oct 24, 2016

Broadcast interval join currently is being done in a way that does a Cartesian product and then filter:

* Finally, within each separate partition, we essentially perform a cartesian-product-and-filter

that feel inefficient.

Most genomics tools for finding interval overlap use some kind of interval or R-tree as discussed:
https://www.biostars.org/p/2244/

Proposed implementation, when one side of the join is small enough ( < 100 MB) to be broadcast.

  1. read the "smaller" side into an intervaltree data structure, either the one for Mango or, https://samtools.github.io/htsjdk/javadoc/htsjdk/htsjdk/samtools/util/IntervalTree.html
    Possible as a map of intervaltrees, one for each chromosome/contig

  2. Broadcast that map of interval trees

  3. do a map or mapPartitions, which loops through each member of the RDD, and does the "map side join" using overlap operation of the interval tree.

At the least I think we should implement this approach as a comparison with other join modes we develop.

Thoughts? This doesn't already exist in ADAM code does it?

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 24, 2016

Member

I concur! (see #1110, which I've closed in favor of this issue). I spoke with @tdanford a while back and there were historical reasons for why the BroadcastJoin was called BroadcastJoin. The documents in the BroadcastJoin code are actually kinda wrong as well; we don't actually do a cartesian and filter, we actually do a hash join and filter. I would +1 your proposed implementation.

Member

fnothaft commented Oct 24, 2016

I concur! (see #1110, which I've closed in favor of this issue). I spoke with @tdanford a while back and there were historical reasons for why the BroadcastJoin was called BroadcastJoin. The documents in the BroadcastJoin code are actually kinda wrong as well; we don't actually do a cartesian and filter, we actually do a hash join and filter. I would +1 your proposed implementation.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 24, 2016

Member

Also see https://bmcbioinformatics.biomedcentral.com/articles/10.1186/s12859-016-0904-1.

Also, @akmorrow13's interval tree code in https://github.com/bigdatagenomics/utils/tree/master/utils-intervalrdd/src/main/scala/org/bdgenomics/utils/intervaltree should allow us to do an efficient broadcastJoinAndGroupByRight implementation, akin to our shuffleJoinAndGroupByLeft code.

Member

fnothaft commented Oct 24, 2016

Also see https://bmcbioinformatics.biomedcentral.com/articles/10.1186/s12859-016-0904-1.

Also, @akmorrow13's interval tree code in https://github.com/bigdatagenomics/utils/tree/master/utils-intervalrdd/src/main/scala/org/bdgenomics/utils/intervaltree should allow us to do an efficient broadcastJoinAndGroupByRight implementation, akin to our shuffleJoinAndGroupByLeft code.

@jpdna

This comment has been minimized.

Show comment
Hide comment
@jpdna

jpdna Oct 28, 2016

Member

@fnothaft you mentioned on call today some other code relevant to this that I should look at. When you get the chance can you paste it on this thread?

Member

jpdna commented Oct 28, 2016

@fnothaft you mentioned on call today some other code relevant to this that I should look at. When you get the chance can you paste it on this thread?

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 28, 2016

Member

Yes! You'll want to take a look at the following commits:

The list is in chronological order. This does a true shuffle-free broadcast join. I'm using it right now to join the GRCh37 alignment of a PCR-free ~60x WGS of NA12878 against a 3GB set of variants, and it works well. Alas, I had to roll my own replacement for the interval tree. @akmorrow13 I'm not sure what was happening, but the interval tree was blowing up when I was trying to build a tree per contig for my variants. Adding the first partition took 10s, second partition took 30s, third partition took 8min, etc. Here, I've just collected a sorted array, and then I'm using binary search to get O(log n) access. This might actually be faster than an interval tree anyways; I don't know why I didn't think of that in the first place... In any case, this still needs a bit more work. As you may note, I uh, "did it live" this afternoon with this code, and it conspicuously lacks tests. I'll write those up tomorrow, do some more TLC, and PR this back against ADAM as time allows (prolly Wednesday of next week).

Member

fnothaft commented Oct 28, 2016

Yes! You'll want to take a look at the following commits:

The list is in chronological order. This does a true shuffle-free broadcast join. I'm using it right now to join the GRCh37 alignment of a PCR-free ~60x WGS of NA12878 against a 3GB set of variants, and it works well. Alas, I had to roll my own replacement for the interval tree. @akmorrow13 I'm not sure what was happening, but the interval tree was blowing up when I was trying to build a tree per contig for my variants. Adding the first partition took 10s, second partition took 30s, third partition took 8min, etc. Here, I've just collected a sorted array, and then I'm using binary search to get O(log n) access. This might actually be faster than an interval tree anyways; I don't know why I didn't think of that in the first place... In any case, this still needs a bit more work. As you may note, I uh, "did it live" this afternoon with this code, and it conspicuously lacks tests. I'll write those up tomorrow, do some more TLC, and PR this back against ADAM as time allows (prolly Wednesday of next week).

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 28, 2016

Member

If you want to take a look at the perf of the broadcast join, it was used in application_1477597669149_0029 on the cluster.

Member

fnothaft commented Oct 28, 2016

If you want to take a look at the perf of the broadcast join, it was used in application_1477597669149_0029 on the cluster.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 28, 2016

Member

Of specific interest, here's the join timing snipped out:

+---------------------------------------------------------------+--------------------------+-----------------+----------------+-------------+-----------------+-----------------+-----------------+
|                            Metric                             |       Worker Total       |  Driver Total   |  Driver Only   |    Count    |      Mean       |       Min       |       Max       |
+---------------------------------------------------------------+--------------------------+-----------------+----------------+-------------+-----------------+-----------------+-----------------+
|     ├─ Joining reads against variants                         |                        - |  4 mins 27 secs | 2 mins 51 secs |           1 |  4 mins 27 secs |  4 mins 27 secs |  4 mins 27 secs |
|     │   ├─ Running broadcast join with interval tree          |                        - |  4 mins 27 secs | 2 mins 51 secs |           1 |  4 mins 27 secs |  4 mins 27 secs |  4 mins 27 secs |
|     │   │   ├─ Building interval tree                         |                        - |   3 mins 2 secs |  1 min 27 secs |           1 |   3 mins 2 secs |   3 mins 2 secs |   3 mins 2 secs |
|     │   │   │   └─ Sorting right side of join                 |                        - |   3 mins 2 secs |  1 min 27 secs |           1 |   3 mins 2 secs |   3 mins 2 secs |   3 mins 2 secs |
|     │   │   │       ├─ sortByKey at TreeRegionJoin.scala:37   |                        - |         8.13 ms |              - |           1 |         8.13 ms |         8.13 ms |         8.13 ms |
|     │   │   │       │   └─ function call                      |                3.16 secs |               - |              - |    24506717 |          129 ns |           65 ns |         3.27 ms |
|     │   │   │       ├─ sortByKey at TreeRegionJoin.scala:37   |                        - |        14.18 ms |              - |           1 |        14.18 ms |        14.18 ms |        14.18 ms |
|     │   │   │       │   └─ function call                      |               28.18 secs |               - |              - |        1361 |         20.7 ms |        10.93 ms |       132.72 ms |
|     │   │   │       └─ sortByKey at TreeRegionJoin.scala:37   |                        - |   1 min 35 secs |              - |           1 |   1 min 35 secs |   1 min 35 secs |   1 min 35 secs |
|     │   │   └─ Running map-side join                          |                        - |   1 min 24 secs |  1 min 24 secs |           1 |   1 min 24 secs |   1 min 24 secs |   1 min 24 secs |
|     │   │       └─ flatMap at TreeRegionJoin.scala:153        |                        - |        11.91 ms |              - |           1 |        11.91 ms |        11.91 ms |        11.91 ms |
|     │   │           └─ function call                          |  13 hours 55 mins 4 secs |               - |              - |   706991574 |        70.87 µs |          202 ns |   1 min 23 secs |
Member

fnothaft commented Oct 28, 2016

Of specific interest, here's the join timing snipped out:

+---------------------------------------------------------------+--------------------------+-----------------+----------------+-------------+-----------------+-----------------+-----------------+
|                            Metric                             |       Worker Total       |  Driver Total   |  Driver Only   |    Count    |      Mean       |       Min       |       Max       |
+---------------------------------------------------------------+--------------------------+-----------------+----------------+-------------+-----------------+-----------------+-----------------+
|     ├─ Joining reads against variants                         |                        - |  4 mins 27 secs | 2 mins 51 secs |           1 |  4 mins 27 secs |  4 mins 27 secs |  4 mins 27 secs |
|     │   ├─ Running broadcast join with interval tree          |                        - |  4 mins 27 secs | 2 mins 51 secs |           1 |  4 mins 27 secs |  4 mins 27 secs |  4 mins 27 secs |
|     │   │   ├─ Building interval tree                         |                        - |   3 mins 2 secs |  1 min 27 secs |           1 |   3 mins 2 secs |   3 mins 2 secs |   3 mins 2 secs |
|     │   │   │   └─ Sorting right side of join                 |                        - |   3 mins 2 secs |  1 min 27 secs |           1 |   3 mins 2 secs |   3 mins 2 secs |   3 mins 2 secs |
|     │   │   │       ├─ sortByKey at TreeRegionJoin.scala:37   |                        - |         8.13 ms |              - |           1 |         8.13 ms |         8.13 ms |         8.13 ms |
|     │   │   │       │   └─ function call                      |                3.16 secs |               - |              - |    24506717 |          129 ns |           65 ns |         3.27 ms |
|     │   │   │       ├─ sortByKey at TreeRegionJoin.scala:37   |                        - |        14.18 ms |              - |           1 |        14.18 ms |        14.18 ms |        14.18 ms |
|     │   │   │       │   └─ function call                      |               28.18 secs |               - |              - |        1361 |         20.7 ms |        10.93 ms |       132.72 ms |
|     │   │   │       └─ sortByKey at TreeRegionJoin.scala:37   |                        - |   1 min 35 secs |              - |           1 |   1 min 35 secs |   1 min 35 secs |   1 min 35 secs |
|     │   │   └─ Running map-side join                          |                        - |   1 min 24 secs |  1 min 24 secs |           1 |   1 min 24 secs |   1 min 24 secs |   1 min 24 secs |
|     │   │       └─ flatMap at TreeRegionJoin.scala:153        |                        - |        11.91 ms |              - |           1 |        11.91 ms |        11.91 ms |        11.91 ms |
|     │   │           └─ function call                          |  13 hours 55 mins 4 secs |               - |              - |   706991574 |        70.87 µs |          202 ns |   1 min 23 secs |
@heuermh

This comment has been minimized.

Show comment
Hide comment
@heuermh

heuermh Oct 28, 2016

Member

@akmorrow13 @fnothaft is the interval tree rebalancing as it is being populated? I found that to be a major problem with the text book implementation (and some implementations in other libraries) and as such wrote an immutable version that performs better at construction time.

Member

heuermh commented Oct 28, 2016

@akmorrow13 @fnothaft is the interval tree rebalancing as it is being populated? I found that to be a major problem with the text book implementation (and some implementations in other libraries) and as such wrote an immutable version that performs better at construction time.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 28, 2016

Member

I believe that it is, but I'm not 100% sure. In any case, if you have a sorted array, binary search gives you the same access time as an interval tree, no? At the least, it works well enough for now. It'd be good to do more of a deep dive later.

Member

fnothaft commented Oct 28, 2016

I believe that it is, but I'm not 100% sure. In any case, if you have a sorted array, binary search gives you the same access time as an interval tree, no? At the least, it works well enough for now. It'd be good to do more of a deep dive later.

@akmorrow13

This comment has been minimized.

Show comment
Hide comment
@akmorrow13

akmorrow13 Oct 28, 2016

Contributor

'@akmorrow13 @fnothaft is the interval tree rebalancing as it is being populated? ' It depends what function you are calling. If you are inserting using insert(kvs: Iterator[(K, T)]) it waits until all elements are inserted. If you are inserting one at a time it will repopulate between inserts.

@fnothaft I am not sure off the top of my head why it is blowing up. It could be that we are arbitrarily setting the threshold for rebalancing to be too small so it is constantly rebalancing. That doesn't explain why the trees are taking longer to create. What dataset are you using?

Contributor

akmorrow13 commented Oct 28, 2016

'@akmorrow13 @fnothaft is the interval tree rebalancing as it is being populated? ' It depends what function you are calling. If you are inserting using insert(kvs: Iterator[(K, T)]) it waits until all elements are inserted. If you are inserting one at a time it will repopulate between inserts.

@fnothaft I am not sure off the top of my head why it is blowing up. It could be that we are arbitrarily setting the threshold for rebalancing to be too small so it is constantly rebalancing. That doesn't explain why the trees are taking longer to create. What dataset are you using?

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 28, 2016

Member

'@akmorrow13 @fnothaft is the interval tree rebalancing as it is being populated? ' It depends what function you are calling. If you are inserting using insert(kvs: Iterator[(K, T)]) it waits until all elements are inserted. If you are inserting one at a time it will repopulate between inserts.

Ah, I was running the one at a time insert, which could be the culprit here.

That doesn't explain why the trees are taking longer to create. What dataset are you using?

The dataset is a 3GB set of variants. Perhaps we can sit down and look at this more next week? The array/binary search code works well enough for now, and I'm on a hard deadline for Tuesday.

Member

fnothaft commented Oct 28, 2016

'@akmorrow13 @fnothaft is the interval tree rebalancing as it is being populated? ' It depends what function you are calling. If you are inserting using insert(kvs: Iterator[(K, T)]) it waits until all elements are inserted. If you are inserting one at a time it will repopulate between inserts.

Ah, I was running the one at a time insert, which could be the culprit here.

That doesn't explain why the trees are taking longer to create. What dataset are you using?

The dataset is a 3GB set of variants. Perhaps we can sit down and look at this more next week? The array/binary search code works well enough for now, and I'm on a hard deadline for Tuesday.

@akmorrow13

This comment has been minimized.

Show comment
Hide comment
@akmorrow13

akmorrow13 Oct 29, 2016

Contributor

Great let's talk Wednesday.

Contributor

akmorrow13 commented Oct 29, 2016

Great let's talk Wednesday.

fnothaft added a commit to fnothaft/adam that referenced this issue Nov 1, 2016

@fnothaft fnothaft added this to the 0.21.0 milestone Nov 8, 2016

fnothaft added a commit to fnothaft/adam that referenced this issue Nov 26, 2016

fnothaft added a commit to fnothaft/adam that referenced this issue Nov 29, 2016

fnothaft added a commit to fnothaft/adam that referenced this issue Dec 1, 2016

[ADAM-1224] Replace BroadcastRegionJoin with tree based algo.
Resolves #1224. Adds and fixes docs. Updating for utils/#94 and bumps to
utils 0.2.10 release.

fnothaft added a commit to fnothaft/adam that referenced this issue Dec 7, 2016

[ADAM-1224] Replace BroadcastRegionJoin with tree based algo.
Resolves #1224. Adds and fixes docs. Updating for utils/#94 and bumps to
utils 0.2.10 release.

@heuermh heuermh closed this in d7eabbe Dec 7, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment