Skip to content

Commit

Permalink
[ADAM-1224] Replace BroadcastRegionJoin with tree based algo.
Browse files Browse the repository at this point in the history
Resolves #1224. Adds and fixes docs. Updating for utils/#94 and bumps to
utils 0.2.10 release.
  • Loading branch information
fnothaft authored and heuermh committed Dec 7, 2016
1 parent eb17701 commit d7eabbe
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 222 deletions.
Expand Up @@ -80,4 +80,11 @@ object Timers extends Metrics {
val WriteBAMRecord = timer("Write BAM Record")
val WriteSAMRecord = timer("Write SAM Record")
val WriteCRAMRecord = timer("Write CRAM Record")

// org.bdgenomics.adam.rdd.TreeRegionJoin
val TreeJoin = timer("Running broadcast join with interval tree")
val BuildingTrees = timer("Building interval tree")
val SortingRightSide = timer("Sorting right side of join")
val GrowingTrees = timer("Growing forest of trees")
val RunningMapSideJoin = timer("Running map-side join")
}
Expand Up @@ -20,7 +20,7 @@ package org.bdgenomics.adam.models
import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.intervaltree.Interval
import org.bdgenomics.utils.intervalarray.Interval
import scala.math.{ max, min }

trait ReferenceOrdering[T <: ReferenceRegion] extends Ordering[T] {
Expand Down Expand Up @@ -256,7 +256,7 @@ case class ReferenceRegion(
end: Long,
orientation: Strand = Strand.INDEPENDENT)
extends Comparable[ReferenceRegion]
with Interval {
with Interval[ReferenceRegion] {

assert(start >= 0 && end >= start,
"Failed when trying to create region %s %d %d on %s strand.".format(
Expand Down Expand Up @@ -401,6 +401,18 @@ case class ReferenceRegion(
start <= other.start && end >= other.end
}

/**
* Checks if our region overlaps (wholly or partially) another region,
* independent of strand.
*
* @param other The region to compare against.
* @return True if any section of the two regions overlap.
*/
def covers(other: ReferenceRegion): Boolean = {
referenceName == other.referenceName &&
end > other.start && start < other.end
}

/**
* Checks if our region overlaps (wholly or partially) another region.
*
Expand Down

This file was deleted.

76 changes: 68 additions & 8 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -401,7 +401,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {

// key the RDDs and join
GenericGenomicRDD[(T, X)](InnerBroadcastRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
GenericGenomicRDD[(T, X)](InnerTreeRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => { getReferenceRegions(kv._1) ++ genomicRdd.getReferenceRegions(kv._2) })
Expand Down Expand Up @@ -429,7 +429,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](RightOuterBroadcastRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
GenericGenomicRDD[(Option[T], X)](RightOuterTreeRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => {
Expand Down Expand Up @@ -475,6 +475,60 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
finalSequences)
}

/**
* Performs a broadcast inner join between this RDD and another RDD.
*
* In a broadcast join, the left RDD (this RDD) is collected to the driver,
* and broadcast to all the nodes in the cluster. The key equality function
* used for this join is the reference region overlap function. Since this
* is an inner join, all values who do not overlap a value from the other
* RDD are dropped.
*
* @param genomicRdd The right RDD in the join.
* @return Returns a new genomic RDD containing all pairs of keys that
* overlapped in the genomic coordinate space.
*/
def broadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = {

// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](InnerTreeRegionJoinAndGroupByRight[T, X]().partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => { (kv._1.flatMap(getReferenceRegions) ++ genomicRdd.getReferenceRegions(kv._2)).toSeq })
.asInstanceOf[GenomicRDD[(Iterable[T], X), Z]]
}

/**
* Performs a broadcast right outer join between this RDD and another RDD.
*
* In a broadcast join, the left RDD (this RDD) is collected to the driver,
* and broadcast to all the nodes in the cluster. The key equality function
* used for this join is the reference region overlap function. Since this
* is a right outer join, all values in the left RDD that do not overlap a
* value from the right RDD are dropped. If a value from the right RDD does
* not overlap any values in the left RDD, it will be paired with a `None`
* in the product of the join.
*
* @param genomicRdd The right RDD in the join.
* @return Returns a new genomic RDD containing all pairs of keys that
* overlapped in the genomic coordinate space, and all keys from the
* right RDD that did not overlap a key in the left RDD.
*/
def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = {

// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](RightOuterTreeRegionJoinAndGroupByRight[T, X]().partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => {
Seq(kv._1.map(v => getReferenceRegions(v))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
})
.asInstanceOf[GenomicRDD[(Iterable[T], X), Z]]
}

/**
* Performs a sort-merge inner join between this RDD and another RDD.
*
Expand All @@ -493,7 +547,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, X)](
Expand Down Expand Up @@ -527,7 +582,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](
Expand Down Expand Up @@ -564,7 +620,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, Option[X])](
Expand Down Expand Up @@ -600,7 +657,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], Option[X])](
Expand Down Expand Up @@ -637,7 +695,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, Iterable[X])](
Expand Down Expand Up @@ -676,7 +735,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], Iterable[X])](
Expand Down
Expand Up @@ -32,7 +32,7 @@ import scala.reflect.ClassTag
* @tparam RU The type of data yielded by the right RDD at the output of the
* join.
*/
trait RegionJoin[T, U, RT, RU] {
trait RegionJoin[T, U, RT, RU] extends Serializable {

/**
* Performs a region join between two RDDs.
Expand Down

0 comments on commit d7eabbe

Please sign in to comment.