Skip to content

Commit

Permalink
Merge a156b20 into 6a44ea0
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Oct 7, 2016
2 parents 6a44ea0 + a156b20 commit c60c569
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ trait CappedRegionsPartitionerArgs
"locus within each partition"
)
var explodeCoverage: Boolean = false

@Args4JOption(
name = "--trim-ranges",
usage =
"When present, store which loci in each partition are actually covered, as opposed to one blanket range (per " +
"partition per contig) spanning from the smallest to largest loci assigned to each partition. This can " +
"result in more informative statistics being collected/printed about the computed loci-partitioning, but " +
"it can also result in a significant performance hit if many discontiguous ranges or coverage must be " +
"stored / collected to the driver and processed."
)
var trimRanges: Boolean = false
}

/**
Expand All @@ -47,7 +58,8 @@ class CappedRegionsPartitioner[R <: ReferenceRegion: ClassTag](regions: RDD[R],
halfWindowSize: Int,
maxRegionsPerPartition: Int,
printPartitioningStats: Boolean,
explodeCoverage: Boolean)
explodeCoverage: Boolean,
trimRanges: Boolean)
extends LociPartitioner {

def partition(loci: LociSet): LociPartitioning = {
Expand Down Expand Up @@ -125,7 +137,8 @@ class CappedRegionsPartitioner[R <: ReferenceRegion: ClassTag](regions: RDD[R],
halfWindowSize,
loci,
maxRegionsPerPartition,
explodeCoverage
explodeCoverage,
trimRanges
)
.collect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ trait LociPartitionerArgs
regions,
halfWindowSize,
maxReadsPerPartition,
printPartitioningStats,
explodeCoverage
printPartitioningStats = printPartitioningStats,
explodeCoverage = explodeCoverage,
trimRanges = trimRanges
)

case "micro-regions" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ case class LociPartitioning(map: LociMap[PartitionIndex])

@transient lazy val partitionContigStats = Stats(partitionContigsMap.values)

@transient lazy val partitionSets = partitionsMap.values

@transient lazy val rangeSizeStats =
Stats(
partitionSets
.flatMap(_.contigs)
.flatMap(_.ranges)
.map(_.length)
)

@transient lazy val partitionRangesStats =
Stats(
for {
partitionLoci <- partitionSets
} yield
partitionLoci.contigs.map(_.ranges.length).sum
)

@transient private lazy val partitionsMap: Map[PartitionIndex, LociSet] = map.inverse

@transient private lazy val partitionSizesMap: Map[PartitionIndex, NumLoci] =
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/org/hammerlab/guacamole/loci/set/Contig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,10 @@ case class Contig(var name: ContigName, private var rangeSet: RangeSet[JLong]) e
* Iterator over string representations of each range in the map, used to assemble (possibly truncated) .toString()
* output.
*/
def stringPieces = {
def stringPieces =
ranges.iterator.map(pair =>
"%s:%d-%d".format(name, pair.start, pair.end)
)
}
}

private[loci] object Contig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ import scala.collection.mutable.ArrayBuffer
*
* @param it Iterator of [[(Position, Coverage)]] tuples, e.g. a [[ContigCoverageIterator]].
* @param maxRegionsPerPartition Maximum regions allowed to overlap each [[LociSet]].
* @param trimRanges if true: build [[Contig]]s from ranges that have empty loci dropped, resulting in more ranges being
* stored; else: each [[Contig]] should only include one range, spanning from the lowest to highest
* loci covered by its parent [[LociSet]] on that contig. The latter is more performant but the former
* can yield more interesting statistics about the distribution of reads and loci across partitions.
*/
class TakeLociIterator(it: BufferedIterator[(Position, Coverage)],
maxRegionsPerPartition: Int)
maxRegionsPerPartition: Int,
trimRanges: Boolean = false)
extends SimpleBufferedIterator[LociSet] {

// Segment the input into per-contig Iterators.
Expand Down Expand Up @@ -96,9 +101,9 @@ class TakeLociIterator(it: BufferedIterator[(Position, Coverage)],
* @param dropAbove Loci with greater than this depth are skipped over.
* @return The number of covering regions, and a [[Contig]] with the taken loci ranges.
*/
def takeLoci(it: ContigIterator[(HasLocus, Coverage)],
numRegions: Int,
dropAbove: Int): Option[(Int, Contig)] = {
private def takeLoci(it: ContigIterator[(HasLocus, Coverage)],
numRegions: Int,
dropAbove: Int): Option[(Int, Contig)] = {

// Accumulate intervals on this contig here.
val intervals = ArrayBuffer[Interval]()
Expand All @@ -112,16 +117,16 @@ class TakeLociIterator(it: BufferedIterator[(Position, Coverage)],
// If any Intervals have been found, build them into a [[Contig]] and return it, as well as the number of covering
// regions.
def result: Option[(Int, Contig)] =
if (intervals.isEmpty)
None
else
Some(
curNumRegions ->
Contig(
it.contigName,
intervals
)
)
if (intervals.isEmpty)
None
else
Some(
curNumRegions ->
Contig(
it.contigName,
intervals
)
)

// Flush the in-progress Interval `curIntervalOpt` to the `intervals` buffer, if one exists.
def maybeAddInterval(): Unit = {
Expand All @@ -141,13 +146,14 @@ class TakeLociIterator(it: BufferedIterator[(Position, Coverage)],
// case iff `curNumRegions == 0`), then this locus contributes `depth` regions to the total (because regions that
// start before and overlap `locus` must be counted); otherwise, this locus only brings in `starts` new regions.
val newRegions =
if(curNumRegions == 0)
depth
else
starts
if(curNumRegions == 0)
depth
else
starts

if (curNumRegions + newRegions > numRegions) {
// If the current locus pushed us over the maximum allowed `numRegions`, add the current in-progress Interval.
// If the current locus pushed us over the maximum allowed `numRegions`, add the current in-progress Interval
// (which ends just just shy of this overflow).
maybeAddInterval()

if (newRegions > dropAbove)
Expand All @@ -169,14 +175,15 @@ class TakeLociIterator(it: BufferedIterator[(Position, Coverage)],
curIntervalOpt =
curIntervalOpt match {
// Extend the current interval, if possible.
case Some((start, end)) if locus == end =>
case Some((start, end)) if !trimRanges || locus == end =>
Some(start -> (locus + 1))

// Otherwise, commit the current interval if it exists, and start a new one.
case _ =>
maybeAddInterval()
Some(locus -> (locus + 1))
}

it.next()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ class CoverageRDD[R <: ReferenceRegion: ClassTag](rdd: RDD[R])
def makeCappedLociSets(halfWindowSize: Int,
loci: LociSet,
maxRegionsPerPartition: Int,
explode: Boolean): RDD[LociSet] =
explode: Boolean,
trimRanges: Boolean): RDD[LociSet] =
coverage(
halfWindowSize,
sc.broadcast(loci),
explode
)
.mapPartitionsWithIndex(
(idx, it) =>
new TakeLociIterator(it.buffered, maxRegionsPerPartition)
new TakeLociIterator(it.buffered, maxRegionsPerPartition, trimRanges)
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import scala.reflect.ClassTag
* Note: the containing [[PartitionedRegions]] gets picked up by the closure-cleaner and serialized when
* [[mapPartitions]] is called.
*/
class PartitionedRegions[R <: ReferenceRegion: ClassTag](regions: RDD[R],
partitioning: LociPartitioning)
class PartitionedRegions[R <: ReferenceRegion: ClassTag] private(regions: RDD[R],
partitioning: LociPartitioning)
extends Serializable {

assert(
Expand Down Expand Up @@ -136,7 +136,13 @@ object PartitionedRegions {
lociPartitioning.partitionSizeStats.toString(),
"",
"Contigs-spanned-per-partition stats:",
lociPartitioning.partitionContigStats.toString()
lociPartitioning.partitionContigStats.toString(),
"",
"Range-size stats:",
lociPartitioning.rangeSizeStats.toString(),
"",
"Ranges-per-partition stats:",
lociPartitioning.partitionRangesStats.toString()
)

apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import org.scalatest.{FunSuite, Matchers}

class TakeLociIteratorSuite extends FunSuite with Matchers {

def check(input: ((String, Int), (Int, Int))*)(expectedStrs: String*) = {
def check(input: ((String, Int), (Int, Int))*)(expectedStrs: String*): Unit =
check(trimRanges = false, input: _*)(expectedStrs: _*)

def check(trimRanges: Boolean, input: ((String, Int), (Int, Int))*)(expectedStrs: String*): Unit = {
val depths =
for {
((contig, locus), (depth, starts)) <- input
Expand All @@ -15,7 +18,7 @@ class TakeLociIteratorSuite extends FunSuite with Matchers {

val expected = expectedStrs.map(LociSet.apply)

new TakeLociIterator(depths.iterator.buffered, 15).toList should be(expected)
new TakeLociIterator(depths.iterator.buffered, 15, trimRanges).toList should be(expected)
}

test("simple") {
Expand Down Expand Up @@ -75,6 +78,21 @@ class TakeLociIteratorSuite extends FunSuite with Matchers {
("chr3", 7) -> ( 3, 3),
("chr3", 9) -> ( 4, 3),
("chr4", 20) -> ( 2, 2)
)(
"chr1:10-12,chr2:1-2,chr3:7-10",
"chr4:20-21"
)
}

test("partition spanning several contigs, trim ranges") {
check(
trimRanges = true,
("chr1", 10) -> ( 1, 1),
("chr1", 11) -> ( 6, 5),
("chr2", 1) -> ( 2, 2),
("chr3", 7) -> ( 3, 3),
("chr3", 9) -> ( 4, 3),
("chr4", 20) -> ( 2, 2)
)(
"chr1:10-12,chr2:1-2,chr3:7-8,chr3:9-10",
"chr4:20-21"
Expand Down

0 comments on commit c60c569

Please sign in to comment.