diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala index 2f30bf2f6b..23c145c60e 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala @@ -56,12 +56,18 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs { var onlyPositiveStrands: Boolean = false @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file") var asSingleFile: Boolean = false + @Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.") + var sortLexicographically: Boolean = false } class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] { val companion: BDGCommandCompanion = Reads2Coverage def run(sc: SparkContext): Unit = { + if (args.sortLexicographically) { + require(args.collapse, + "-sort_lexicographically can only be provided when collapsing (-collapse).") + } val proj = Projection(readMapped, contigName, start, end, cigar) @@ -80,7 +86,20 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom readsRdd } - finalReads.toCoverage(args.collapse) - .save(args.outputPath, asSingleFile = args.asSingleFile) + val coverage = finalReads.toCoverage() + + val maybeCollapsedCoverage = if (args.collapse) { + val sortedCoverage = if (args.sortLexicographically) { + coverage.sortLexicographically() + } else { + coverage.sort() + } + sortedCoverage.collapse() + } else { + coverage + } + + maybeCollapsedCoverage.save(args.outputPath, + asSingleFile = args.asSingleFile) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala index 5aeb4cea2b..b32c2c6601 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala @@ -99,9 +99,11 @@ case class CoverageRDD(rdd: RDD[Coverage], * For example, adjacent records Coverage("chr1", 1, 10, 3.0) and Coverage("chr1", 10, 20, 3.0) * would be merged into one record Coverage("chr1", 1, 20, 3.0). * + * @note Data must be sorted before collapse is called. + * * @return merged tuples of adjacent ReferenceRegions and coverage. */ - def collapse: CoverageRDD = { + def collapse(): CoverageRDD = { val newRDD: RDD[Coverage] = rdd .mapPartitions(iter => { if (iter.hasNext) { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 979448d773..e3e6129cc8 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -162,10 +162,9 @@ case class AlignmentRecordRDD( /** * Converts this set of reads into a corresponding CoverageRDD. * - * @param collapse Determines whether to merge adjacent coverage elements with the same score a single coverage. * @return CoverageRDD containing mapped RDD of Coverage. */ - def toCoverage(collapse: Boolean = true): CoverageRDD = { + def toCoverage(): CoverageRDD = { val covCounts = rdd.rdd .filter(r => { @@ -182,20 +181,9 @@ case class AlignmentRecordRDD( val t: List[Long] = List.range(r.getStart, r.getEnd) t.map(n => (ReferenceRegion(r.getContigName, n, n + 1), 1)) }).reduceByKey(_ + _) - .cache() + .map(r => Coverage(r._1, r._2.toDouble)) - val coverage = ( - if (collapse) covCounts.sortByKey() - else covCounts - ).map(r => Coverage(r._1, r._2.toDouble)) - - val coverageRdd = - if (collapse) CoverageRDD(coverage, sequences) - .collapse - else CoverageRDD(coverage, sequences) - - covCounts.unpersist() - coverageRdd + CoverageRDD(covCounts, sequences) } /** diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 95ebfad0e1..9f7027f7c4 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -103,7 +103,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { // get pileup at position 30 val pointCoverage = reads.filterByOverlappingRegion(ReferenceRegion("artificial", 30, 31)).rdd.count - val coverage: CoverageRDD = reads.toCoverage(false) + val coverage: CoverageRDD = reads.toCoverage() assert(coverage.rdd.filter(r => r.start == 30).first.count == pointCoverage) } @@ -113,7 +113,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { // get pileup at position 30 val pointCoverage = reads.filterByOverlappingRegions(Array(ReferenceRegion("artificial", 30, 31)).toList).rdd.count - val coverage: CoverageRDD = reads.toCoverage(false) + val coverage: CoverageRDD = reads.toCoverage() assert(coverage.rdd.filter(r => r.start == 30).first.count == pointCoverage) } @@ -122,7 +122,10 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) // repartition reads to 1 partition to acheive maximal merging of coverage - val coverage: CoverageRDD = reads.transform(_.repartition(1)).toCoverage(true) + val coverage: CoverageRDD = reads.transform(_.repartition(1)) + .toCoverage() + .sort() + .collapse() assert(coverage.rdd.count == 18) assert(coverage.flatten.rdd.count == 170) diff --git a/docs/source/50_cli.md b/docs/source/50_cli.md index 87dad38cc3..54fbf54670 100644 --- a/docs/source/50_cli.md +++ b/docs/source/50_cli.md @@ -269,6 +269,8 @@ following options: negative strand. Conflicts with `-only_positive_strands`. * `-only_positive_strands`: Only computes coverage for reads aligned on the positive strand. Conflicts with `-only_negative_strands`. +* `-sort_lexicographically`: Sorts coverage by position. Contigs are ordered + lexicographically. Only applies if running with `-collapse`. ## Conversion tools {#conversions}