Skip to content

Commit

Permalink
[ADAM-1483] Remove collapse parameter from AlignmentRecordRDD.toCoverage
Browse files Browse the repository at this point in the history
Resolves bigdatagenomics#1483. Instead of providing a parameter in the `toCoverage` method to
allow users to collapse the coverage, makes the `toCoverage` method stupid. This
makes the `toCoverage` method simpler and the behavior easier to reason about.
  • Loading branch information
fnothaft committed Apr 17, 2017
1 parent 04444aa commit 82c982e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 21 deletions.
Expand Up @@ -56,12 +56,18 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
var onlyPositiveStrands: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
val companion: BDGCommandCompanion = Reads2Coverage

def run(sc: SparkContext): Unit = {
if (args.sortLexicographically) {
require(args.collapse,
"-sort_lexicographically can only be provided when collapsing (-collapse).")
}

val proj = Projection(readMapped, contigName, start, end, cigar)

Expand All @@ -80,7 +86,20 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
readsRdd
}

finalReads.toCoverage(args.collapse)
.save(args.outputPath, asSingleFile = args.asSingleFile)
val coverage = finalReads.toCoverage()

val maybeCollapsedCoverage = if (args.collapse) {
val sortedCoverage = if (args.sortLexicographically) {
coverage.sortLexicographically()
} else {
coverage.sort()
}
sortedCoverage.collapse()
} else {
coverage
}

maybeCollapsedCoverage.save(args.outputPath,
asSingleFile = args.asSingleFile)
}
}
Expand Up @@ -99,9 +99,11 @@ case class CoverageRDD(rdd: RDD[Coverage],
* For example, adjacent records Coverage("chr1", 1, 10, 3.0) and Coverage("chr1", 10, 20, 3.0)
* would be merged into one record Coverage("chr1", 1, 20, 3.0).
*
* @note Data must be sorted before collapse is called.
*
* @return merged tuples of adjacent ReferenceRegions and coverage.
*/
def collapse: CoverageRDD = {
def collapse(): CoverageRDD = {
val newRDD: RDD[Coverage] = rdd
.mapPartitions(iter => {
if (iter.hasNext) {
Expand Down
Expand Up @@ -162,10 +162,9 @@ case class AlignmentRecordRDD(
/**
* Converts this set of reads into a corresponding CoverageRDD.
*
* @param collapse Determines whether to merge adjacent coverage elements with the same score a single coverage.
* @return CoverageRDD containing mapped RDD of Coverage.
*/
def toCoverage(collapse: Boolean = true): CoverageRDD = {
def toCoverage(): CoverageRDD = {
val covCounts =
rdd.rdd
.filter(r => {
Expand All @@ -182,20 +181,9 @@ case class AlignmentRecordRDD(
val t: List[Long] = List.range(r.getStart, r.getEnd)
t.map(n => (ReferenceRegion(r.getContigName, n, n + 1), 1))
}).reduceByKey(_ + _)
.cache()
.map(r => Coverage(r._1, r._2.toDouble))

val coverage = (
if (collapse) covCounts.sortByKey()
else covCounts
).map(r => Coverage(r._1, r._2.toDouble))

val coverageRdd =
if (collapse) CoverageRDD(coverage, sequences)
.collapse
else CoverageRDD(coverage, sequences)

covCounts.unpersist()
coverageRdd
CoverageRDD(covCounts, sequences)
}

/**
Expand Down
Expand Up @@ -103,7 +103,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {

// get pileup at position 30
val pointCoverage = reads.filterByOverlappingRegion(ReferenceRegion("artificial", 30, 31)).rdd.count
val coverage: CoverageRDD = reads.toCoverage(false)
val coverage: CoverageRDD = reads.toCoverage()
assert(coverage.rdd.filter(r => r.start == 30).first.count == pointCoverage)
}

Expand All @@ -113,7 +113,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {

// get pileup at position 30
val pointCoverage = reads.filterByOverlappingRegions(Array(ReferenceRegion("artificial", 30, 31)).toList).rdd.count
val coverage: CoverageRDD = reads.toCoverage(false)
val coverage: CoverageRDD = reads.toCoverage()
assert(coverage.rdd.filter(r => r.start == 30).first.count == pointCoverage)
}

Expand All @@ -122,7 +122,10 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath)

// repartition reads to 1 partition to acheive maximal merging of coverage
val coverage: CoverageRDD = reads.transform(_.repartition(1)).toCoverage(true)
val coverage: CoverageRDD = reads.transform(_.repartition(1))
.toCoverage()
.sort()
.collapse()

assert(coverage.rdd.count == 18)
assert(coverage.flatten.rdd.count == 170)
Expand Down
2 changes: 2 additions & 0 deletions docs/source/50_cli.md
Expand Up @@ -269,6 +269,8 @@ following options:
negative strand. Conflicts with `-only_positive_strands`.
* `-only_positive_strands`: Only computes coverage for reads aligned on the
positive strand. Conflicts with `-only_negative_strands`.
* `-sort_lexicographically`: Sorts coverage by position. Contigs are ordered
lexicographically. Only applies if running with `-collapse`.

## Conversion tools {#conversions}

Expand Down

0 comments on commit 82c982e

Please sign in to comment.