From 71806a31d2f3589967d1c64c96c14afcfb29e930 Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Sun, 16 Sep 2018 12:40:00 -0700 Subject: [PATCH 1/3] finished multi-sample coverage --- .../org/bdgenomics/adam/models/Coverage.scala | 22 ++++++++++------ .../adam/rdd/feature/CoverageRDD.scala | 11 ++++---- .../adam/rdd/feature/FeatureRDD.scala | 2 +- .../adam/rdd/read/AlignmentRecordRDD.scala | 13 +++++----- .../adam/models/CoverageSuite.scala | 21 ++++++++++++++++ .../adam/rdd/feature/CoverageRDDSuite.scala | 25 +++++++++++++++++++ 6 files changed, 74 insertions(+), 20 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala index a4df8bcdf7..73663a8714 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala @@ -33,8 +33,8 @@ private[adam] object Coverage { * @param count Coverage count for each base pair in region * @return Coverage spanning the specified ReferenceRegion */ - def apply(region: ReferenceRegion, count: Double): Coverage = { - Coverage(region.referenceName, region.start, region.end, count) + def apply(region: ReferenceRegion, count: Double, name: Option[String]): Coverage = { + Coverage(region.referenceName, region.start, region.end, count, name) } /** @@ -54,7 +54,8 @@ private[adam] object Coverage { Coverage(feature.getContigName, feature.getStart, feature.getEnd, - feature.getScore) + feature.getScore, + Option(feature.getName)) } /** @@ -81,7 +82,7 @@ private[adam] object Coverage { * observed. * @param count The average coverage across this region. */ -case class Coverage(contigName: String, start: Long, end: Long, count: Double) { +case class Coverage(contigName: String, start: Long, end: Long, count: Double, name: Option[String] = None) { /** * Converts Coverage to Feature, setting Coverage count in the score attribute. @@ -89,20 +90,25 @@ case class Coverage(contigName: String, start: Long, end: Long, count: Double) { * @return Feature built from Coverage */ def toFeature: Feature = { - Feature.newBuilder() + val featureBuilder = Feature.newBuilder() .setContigName(contigName) .setStart(start) .setEnd(end) .setScore(count) - .build() - } + // set name, if applicable + if (name.isDefined) { + featureBuilder.setName(name.get) + } + + featureBuilder.build() + } /** * Converts Coverage to a Feature case class, for use with Spark SQL. */ def toSqlFeature: FeatureProduct = { new FeatureProduct(featureId = None, - name = None, + name = name, source = None, featureType = None, contigName = Some(contigName), diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala index 48785a7d80..e9fb924807 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala @@ -124,7 +124,7 @@ case class ParquetUnboundCoverageRDD private[rdd] ( val sqlContext = SQLContext.getOrCreate(sc) import sqlContext.implicits._ sqlContext.read.parquet(parquetFilename) - .select("contigName", "start", "end", "score") + .select("contigName", "start", "end", "score", "name") .withColumnRenamed("score", "count") .as[Coverage] } @@ -324,7 +324,7 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD val lastRegion = ReferenceRegion(lastCoverage) val (nextCoverage, nextCondensed) = if (rr.isAdjacent(lastRegion) && lastCoverage.count == cov.count) { - (Coverage(rr.merge(lastRegion), lastCoverage.count), condensed) + (Coverage(rr.merge(lastRegion), lastCoverage.count, lastCoverage.name), condensed) } else { (cov, lastCoverage :: condensed) } @@ -417,12 +417,13 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD // key coverage by binning start site mod bpPerbin // subtract region.start to shift mod to start of ReferenceRegion val start = r.start - (r.start % bpPerBin) - ReferenceRegion(r.contigName, start, start + bpPerBin) + (r.name, ReferenceRegion(r.contigName, start, start + bpPerBin)) }).mapValues(r => (r.count, 1)) .reduceByKey(reduceFn) .map(r => { + // r is a key of (id: string, region: ReferenceRegion), and value of (count: double, int) // compute average coverage in bin - Coverage(r._1.referenceName, r._1.start, r._1.end, r._2._1 / r._2._2) + Coverage(r._1._2.referenceName, r._1._2.start, r._1._2.end, r._2._1 / r._2._2, r._1._1) }) flattened.transform(rdd => newRDD) } @@ -467,7 +468,7 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD private def flatMapCoverage(rdd: RDD[Coverage]): RDD[Coverage] = { rdd.flatMap(r => { val positions = r.start until r.end - positions.map(n => Coverage(r.contigName, n, n + 1, r.count)) + positions.map(n => Coverage(r.contigName, n, n + 1, r.count, r.name)) }) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index 606ce556e7..4d47444b5b 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -311,7 +311,7 @@ case class DatasetBoundFeatureRDD private[rdd] ( def toCoverage(): CoverageRDD = { import dataset.sqlContext.implicits._ DatasetBoundCoverageRDD(dataset.toDF - .select("contigName", "start", "end", "score") + .select("contigName", "start", "end", "score", "name") .withColumnRenamed("score", "count") .as[Coverage], sequences) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 86e6d0a2e1..73c7de5355 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -348,9 +348,9 @@ case class RDDBoundAlignmentRecordRDD private[rdd] ( readMapped }).flatMap(r => { val positions: List[Long] = List.range(r.getStart, r.getEnd) - positions.map(n => (ReferencePosition(r.getContigName, n), 1)) + positions.map(n => ((r.getRecordGroupSample, ReferencePosition(r.getContigName, n)), 1)) }).reduceByKey(_ + _) - .map(r => Coverage(r._1, r._2.toDouble)) + .map(r => Coverage(r._1._2, r._2.toDouble, Option(r._1._1))) RDDBoundCoverageRDD(covCounts, sequences, None) } @@ -371,7 +371,7 @@ case class RDDBoundAlignmentRecordRDD private[rdd] ( } } -private case class AlignmentWindow(contigName: String, start: Long, end: Long) { +private case class AlignmentWindow(contigName: String, start: Long, end: Long, name: String) { } sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] { @@ -483,7 +483,8 @@ sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[A import dataset.sqlContext.implicits._ val covCounts = dataset.toDF .where($"readMapped") - .select($"contigName", $"start", $"end") + .select($"contigName", $"start", $"end", $"recordGroupSample") + .withColumnRenamed("recordGroupSample", "name") .as[AlignmentWindow] .flatMap(w => { val width = (w.end - w.start).toInt @@ -493,12 +494,12 @@ sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[A while (idx < width) { val lastPos = pos pos += 1L - buffer(idx) = Coverage(w.contigName, lastPos, pos, 1.0) + buffer(idx) = Coverage(w.contigName, lastPos, pos, 1.0, Option(w.name)) idx += 1 } buffer }).toDF - .groupBy("contigName", "start", "end") + .groupBy("contigName", "start", "end", "name") .sum("count") .withColumnRenamed("sum(count)", "count") .as[Coverage] diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/CoverageSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/CoverageSuite.scala index 17b4aebb94..173df053d5 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/models/CoverageSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/models/CoverageSuite.scala @@ -40,6 +40,27 @@ class CoverageSuite extends ADAMFunSuite { assert(coverageAfterConversion.count == featureToConvert.score) } + sparkTest("Convert to coverage from valid Feature with name") { + // create a valid feature + val featureToConvert = + Feature.newBuilder() + .setContigName("chr1") + .setStart(1) + .setEnd(2) + .setScore(100) + .setName("coverageName") + .build() + + val coverageAfterConversion = Coverage(featureToConvert) + + // all fields should match between the two + assert(coverageAfterConversion.start == featureToConvert.start) + assert(coverageAfterConversion.end == featureToConvert.end) + assert(coverageAfterConversion.contigName == featureToConvert.contigName) + assert(coverageAfterConversion.count == featureToConvert.score) + assert(coverageAfterConversion.name == Some(featureToConvert.name)) + } + sparkTest("Convert to coverage from Feature with null/empty contigName fails with correct error") { // feature with empty contigname is not valid when converting to Coverage val featureWithEmptyContigName = diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala index 0cb5fb1fb1..2f008f7cda 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala @@ -135,6 +135,8 @@ class CoverageRDDSuite extends ADAMFunSuite { val coverageRDD: CoverageRDD = featureRDD.toCoverage testMetadata(coverageRDD) + print(coverageRDD.rdd.first) + val outputFile = tmpLocation(".bed") coverageRDD.save(outputFile, false, false) @@ -181,6 +183,7 @@ class CoverageRDDSuite extends ADAMFunSuite { val selfUnion = coverage.union(coverage) assert(selfUnion.rdd.count === 6) val coverageDs = coverage.transformDataset(ds => ds) // no-op, forces to dataset + print(coverageDs.rdd.first()) val selfUnionDs = coverageDs.union(coverageDs) assert(selfUnionDs.rdd.count === 6) } @@ -202,6 +205,28 @@ class CoverageRDDSuite extends ADAMFunSuite { assert(coverage.rdd.count == 1) } + sparkTest("can read a bed file with multiple samples to coverage") { + + val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(10).setScore(3.0).setName("S1").build() + val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).setName("S1").build() + val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).setName("S1").build() + + val f4 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(10).setScore(2.0).setName("S2").build() + val f5 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).setName("S2").build() + + val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3, f4, f5))) + val coverageRDD: CoverageRDD = featureRDD.toCoverage + + val outputFile = tmpLocation(".adam") + coverageRDD.save(outputFile, false, false) + + val region = ReferenceRegion("chr1", 1, 9) + val predicate = region.toPredicate + val coverage = sc.loadParquetCoverage(outputFile, Some(predicate)) + print(coverage.rdd.collect) + assert(coverage.rdd.count == 2) + } + sparkTest("correctly flatmaps coverage without aggregated bins") { val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(5).setScore(1.0).build() val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build() From 482baaf46abb2bad32259120c3de5ea544fd4cc6 Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Sun, 16 Sep 2018 18:23:12 -0700 Subject: [PATCH 2/3] fix tests --- .../adam/rdd/feature/CoverageRDD.scala | 13 +++++++++---- .../adam/rdd/read/AlignmentRecordRDDSuite.scala | 17 +++++++++++++++++ .../adam/test/alignmentRecordRdd_test.py | 11 ++++++++--- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala index e9fb924807..3516023698 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala @@ -289,10 +289,15 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD def collapse(): CoverageRDD = { val newRDD: RDD[Coverage] = rdd .mapPartitions(iter => { - if (iter.hasNext) { - val first = iter.next - collapse(iter, first, List.empty) - } else iter + // must sort values to iteratively collapse coverage + val sortedIter = iter.toList + .sortBy(r => (r.contigName, r.start)) + .toIterator + if (sortedIter.hasNext) { + val first = sortedIter.next + println(first) + collapse(sortedIter, first, List.empty) + } else sortedIter }) transform(rdd => newRDD) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 65504d96db..089f7561c8 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -228,6 +228,23 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { testCoverage(coverageDs) } + sparkTest("computes coverage with multiple samples") { + // glob in 2 files with different samples + // testFile cannot read complex globs, so we have to parse the parent path + val relativePath = new File(testFile("NA12878.1_854950_855150.sam")).getParentFile.getPath + val inputPath = relativePath + "/{NA12878.1_854950_855150,bqsr1}.sam" + + val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) + + def countByName(coverage: CoverageRDD, name: String): Long = { + coverage.rdd.filter(r => r.name == Some(name)).count + } + + val coverageRdd = reads.toCoverage() + assert(countByName(coverageRdd, "NA12878") == 677) // number of coverage when just NA12878 is computed + assert(countByName(coverageRdd, "HG00096") == 39000) // number of coverage when just HG00096 is computed + } + sparkTest("merges adjacent records with equal coverage values") { val inputPath = testFile("artificial.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) diff --git a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py index 023ad34bfd..f1d0631344 100644 --- a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py +++ b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py @@ -135,7 +135,8 @@ def test_transmute_to_coverage(self): readsAsCoverage = reads.transmute(lambda x: x.select(x.contigName, x.start, x.end, - x.mapq.cast(DoubleType()).alias("count")), + x.mapq.cast(DoubleType()).alias("count"), + x.recordGroupSample.alias("name")), CoverageRDD) assert(isinstance(readsAsCoverage, CoverageRDD)) @@ -150,9 +151,13 @@ def test_to_coverage(self): reads = ac.loadAlignments(readsPath) coverage = reads.toCoverage() - self.assertEquals(coverage.toDF().count(), 42) + # no reads are overlapping, so collapsed count should just be the number of reads + # collapsed + self.assertEquals(coverage.toDF().count(), 5) - coverage = reads.toCoverage(collapse = False) + # 5 reads: contig 3 has 8 bp, chr2 (2 strands) has 20bp, contig 4 has 8bp, contig 1 has 10 bp + # 8 + 20 + 8 + 10 = 46 + coverage = reads.re.toCoverage(collapse = False) self.assertEquals(coverage.toDF().count(), 46) From b5e12b471870d98b142741713e157e4b7357749c Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Mon, 17 Sep 2018 10:12:23 -0700 Subject: [PATCH 3/3] syntax err --- adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py index f1d0631344..2d6b371e3c 100644 --- a/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py +++ b/adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py @@ -157,7 +157,7 @@ def test_to_coverage(self): # 5 reads: contig 3 has 8 bp, chr2 (2 strands) has 20bp, contig 4 has 8bp, contig 1 has 10 bp # 8 + 20 + 8 + 10 = 46 - coverage = reads.re.toCoverage(collapse = False) + coverage = reads.toCoverage(collapse = False) self.assertEquals(coverage.toDF().count(), 46)