Skip to content

Commit

Permalink
Another major refactor of structure, added closest functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn committed Apr 17, 2017
1 parent 33b9b35 commit 99f1de2
Show file tree
Hide file tree
Showing 11 changed files with 753 additions and 95 deletions.
38 changes: 38 additions & 0 deletions lime-cli/src/main/scala/org/bdgenomics/lime/cli/Coverage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.bdgenomics.lime.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.Argument

object Coverage extends BDGCommandCompanion {
val commandName = "coverage"
val commandDescription = "Compute the coverage over defined intervals"

def apply(cmdLine: Array[String]) = {
new Coverage(Args4j[CoverageArgs](cmdLine))
}

class CoverageArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true,
metaVar = "INPUT1",
usage = "The first file for intersection",
index = 0)
var input: String = null

override var sortFastqOutput: Boolean = false
override var asSingleFile: Boolean = false
override var deferMerging: Boolean = false
override var outputPath: String = ""
}

class Coverage(protected val args: CoverageArgs) extends BDGSparkCommand[CoverageArgs] {
val companion = Intersection

def run(sc: SparkContext) {
val leftGenomicRDD = sc.loadBed(args.input).toCoverage
leftGenomicRDD.rdd.collect.foreach(println)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ object Intersection extends BDGCommandCompanion {
val companion = Intersection

def run(sc: SparkContext) {
val leftGenomicRDD = sc.loadBed(args.leftInput)
val rightGenomicRDD = sc.loadBed(args.rightInput)
val leftGenomicRDD = sc.loadBed(args.leftInput).repartitionAndSort()
val rightGenomicRDD = sc.loadBed(args.rightInput).copartitionByReferenceRegion(leftGenomicRDD)

DistributedIntersection(leftGenomicRDD.flattenRddByRegions, rightGenomicRDD.flattenRddByRegions, leftGenomicRDD.partitionMap.get)
.compute()
.collect()
.foreach(println)
}
}
}
2 changes: 1 addition & 1 deletion lime-cli/src/main/scala/org/bdgenomics/lime/cli/Sort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object Sort extends BDGCommandCompanion {
def run(sc: SparkContext) {
val genomicRdd = sc.loadBed(args.input)

genomicRdd.sort.rdd.collect.foreach(println)
genomicRdd.repartitionAndSort().rdd.collect.foreach(println)
}
}
}
287 changes: 287 additions & 0 deletions lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Closest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
package org.bdgenomics.lime.set_theory

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.lime.util.Partitioners.ReferenceRegionRangePartitioner
import org.bdgenomics.utils.interval.array.IntervalArray
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag

sealed abstract class Closest[T: ClassTag, U: ClassTag] extends SetTheoryBetweenCollections[T, U, T, U] {
}

class SingleClosest[T: ClassTag, U: ClassTag](protected val leftRdd: RDD[(ReferenceRegion, T)],
protected val rightRdd: RDD[(ReferenceRegion, U)],
protected val partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]],
protected val threshold: Long = 0L) extends Closest[T, U] {

var currentClosest: ReferenceRegion = ReferenceRegion("", 0, 0)

/**
* Prepares the two RDDs for the closest operation. Copartitions the right
* according to the left. In the case that no data is assigned to a
* partition, there is a second pass that duplicates the data on both
* flanking nodes.
*
* @return A tuple containing:
* The left RDD, unchanged.
* The right RDD, copartitioned with the left.
* The original partition map.
*/
override protected def prepare(): (RDD[(ReferenceRegion, T)], RDD[(ReferenceRegion, U)], Array[Option[(ReferenceRegion, ReferenceRegion)]]) = {

val adjustedPartitionMapWithIndex = partitionMap
// the zipWithIndex gives us the destination partition ID
.zipWithIndex
.filter(_._1.nonEmpty)
.map(f => (f._1.get, f._2))
.map(g => {
// in the case where we span multiple referenceNames
if (g._1._1.referenceName != g._1._2.referenceName) {
// create a ReferenceRegion that goes to the end of the chromosome
(ReferenceRegion(
g._1._1.referenceName,
g._1._1.start,
g._1._1.end),
g._2)
} else {
// otherwise we just have the ReferenceRegion span from partition
// start to end
(ReferenceRegion(
g._1._1.referenceName,
g._1._1.start,
g._1._2.end),
g._2)
}
})

val partitionMapIntervals = IntervalArray(
adjustedPartitionMapWithIndex,
adjustedPartitionMapWithIndex.maxBy(_._1.width)._1.width,
sorted = true)

val assignedRightRdd = {
val firstPass = rightRdd.mapPartitions(iter => {
iter.flatMap(f => {
val rangeOfHits = partitionMapIntervals.get(f._1, requireOverlap = false)
rangeOfHits.map(g => ((f._1, g._2), f._2))
})
}, preservesPartitioning = true)

val partitionsWithoutData =
partitionMap.indices.filterNot(firstPass.map(_._1._2).distinct().collect.contains)

val partitionsToSend = partitionsWithoutData.foldLeft(List.empty[List[Int]])((b, a) => {
if (b.isEmpty) {
List(List(a))
} else if (a == b.last.last + 1) {
b.dropRight(1).:+(b.last.:+(a))
} else {
b.:+(List(a))
}
}).flatMap(f => List((f.head - 1, f.length), (f.last + 1, -1 * f.length)))

firstPass.flatMap(f => {
val index = partitionsToSend.indexWhere(_._1 == f._1._2)
if (index < 0) {
List(f)
} else {
if (partitionsToSend(index)._2 < 0) {
(partitionsToSend(index)._2 to 0)
.map(g => ((f._1._1, f._1._2 + g), f._2))
} else {
(0 to partitionsToSend(index)._2)
.map(g => ((f._1._1, f._1._2 + g), f._2)) ++ {
if (index == partitionsToSend.lastIndexWhere(_._1 == f._1._2)) {
List()
} else {
val endIndex = partitionsToSend.lastIndexWhere(_._1 == f._1._2)
(partitionsToSend(endIndex)._2 to -1)
.map(g => ((f._1._1, f._1._2 + g), f._2))
}
}
}
}
})
}
val preparedRightRdd =
assignedRightRdd
.repartitionAndSortWithinPartitions(
new ReferenceRegionRangePartitioner(partitionMap.length))
// return to an RDD[(ReferenceRegion, T)], removing the partition ID
.map(f => (f._1._1, f._2))

(leftRdd, preparedRightRdd, partitionMap)
}

/**
* The primitive to be computed in the case of closest is simply to return
* the firstRegion.
*
* @param firstRegion The first region to compute.
* @param secondRegion The second region to compute.
* @param threshold The distance requirement for closest.
* @return The first region.
*/
override protected def primitive(firstRegion: ReferenceRegion,
secondRegion: ReferenceRegion,
threshold: Long = 0L): ReferenceRegion = {
firstRegion
}

/**
* The condition requirement here is that the first region be closer to the
* second region than the current closest.
*
* @param firstRegion The region to test against.
* @param secondRegion The region to test.
* @param threshold The distance requirement for closest.
* @return True if the threshold requirement is met.
* False if the threshold requirement is not met.
*/
override protected def condition(firstRegion: ReferenceRegion,
secondRegion: ReferenceRegion,
threshold: Long = 0L): Boolean = {

firstRegion.unstrandedDistance(secondRegion).get ==
firstRegion.unstrandedDistance(currentClosest).get
}

/**
* Prunes the cache of all regions that are no longer candidates for the
* closest region.
*
* @param cachedRegion The current region in the cache.
* @param to The region that is compared against.
* @return True for regions that should be removed.
* False for all regions that should remain in the cache.
*/
override protected def pruneCacheCondition(cachedRegion: ReferenceRegion,
to: ReferenceRegion): Boolean = {
if (cachedRegion.referenceName != to.referenceName) {
true
} else {
to.unstrandedDistance(cachedRegion).get >
to.unstrandedDistance(currentClosest).getOrElse(0L)
}

}

/**
* Advances the cache to add the closest region to the cache.
*
* @param candidateRegion The current candidate region.
* @param until The region to compare against.
* @return True for all regions to be added to the cache.
* False for regions that should not be added to the cache.
*/
override protected def advanceCacheCondition(candidateRegion: ReferenceRegion,
until: ReferenceRegion): Boolean = {
if (candidateRegion.referenceName != until.referenceName) {
false
} else if (until.referenceName != currentClosest.referenceName ||
until.unstrandedDistance(candidateRegion).get <=
until.unstrandedDistance(currentClosest).getOrElse(Long.MaxValue)) {

currentClosest = candidateRegion
true
} else {
false
}
}

/**
* Processes the hits and pairs the current left region with the closest
* region on the right.
*
* @param current The current left row, keyed by the ReferenceRegion.
* @param cache The cache of potential hits.
* @return An iterator containing the current left with the closest region.
*/
override protected def processHits(current: (ReferenceRegion, T),
cache: ListBuffer[(ReferenceRegion, U)]): Iterator[(ReferenceRegion, (T, U))] = {

val (currentRegion, currentValue) = current
cache.filter(f => {
val (rightRegion, _) = f
condition(currentRegion, rightRegion)
}).map(f => {
val (rightRegion, rightValue) = f
(primitive(currentRegion, rightRegion), (currentValue, rightValue))
}).toIterator
}
}

case class SingleClosestSingleOverlap[T: ClassTag, U: ClassTag](override val leftRdd: RDD[(ReferenceRegion, T)],
override val rightRdd: RDD[(ReferenceRegion, U)],
override val partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]],
override val threshold: Long = 0L) extends SingleClosest[T, U](leftRdd, rightRdd, partitionMap, threshold) {
/**
* The condition requirement here is that the first region be closer to the
* second region than the current closest.
*
* @param firstRegion The region to test against.
* @param secondRegion The region to test.
* @param threshold The distance requirement for closest.
* @return True if the threshold requirement is met.
* False if the threshold requirement is not met.
*/
override protected def condition(firstRegion: ReferenceRegion,
secondRegion: ReferenceRegion,
threshold: Long = 0L): Boolean = {

(firstRegion.unstrandedDistance(secondRegion).get ==
firstRegion.unstrandedDistance(currentClosest).get &&
firstRegion.unstrandedDistance(secondRegion).get != 0) ||
(firstRegion.covers(secondRegion) &&
firstRegion.coversBy(secondRegion).get == firstRegion.coversBy(currentClosest).get)
}

/**
* Prunes the cache of all regions that are no longer candidates for the
* closest region.
*
* @param cachedRegion The current region in the cache.
* @param to The region that is compared against.
* @return True for regions that should be removed.
* False for all regions that should remain in the cache.
*/
override protected def pruneCacheCondition(cachedRegion: ReferenceRegion,
to: ReferenceRegion): Boolean = {
if (cachedRegion.referenceName != to.referenceName) {
true
} else {
(to.covers(cachedRegion) &&
to.coversBy(cachedRegion).get < to.coversBy(currentClosest).getOrElse(Long.MaxValue)) ||
(to.unstrandedDistance(cachedRegion).get >
to.unstrandedDistance(currentClosest).getOrElse(0L))
}

}

/**
* Advances the cache to add the closest region to the cache.
*
* @param candidateRegion The current candidate region.
* @param until The region to compare against.
* @return True for all regions to be added to the cache.
* False for regions that should not be added to the cache.
*/
override protected def advanceCacheCondition(candidateRegion: ReferenceRegion,
until: ReferenceRegion): Boolean = {
if (candidateRegion.referenceName != until.referenceName) {
false
} else if (until.referenceName != currentClosest.referenceName ||
(until.covers(candidateRegion) &&
until.coversBy(candidateRegion).get >= until.coversBy(currentClosest).getOrElse(0L)) ||
(!until.covers(candidateRegion) &&
until.unstrandedDistance(candidateRegion).get <=
until.unstrandedDistance(currentClosest).getOrElse(Long.MaxValue))) {

currentClosest = candidateRegion
true
} else {
false
}
}
}
Loading

0 comments on commit 99f1de2

Please sign in to comment.