Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-sample coverage #2050

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 14 additions & 8 deletions adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala
Expand Up @@ -33,8 +33,8 @@ private[adam] object Coverage {
* @param count Coverage count for each base pair in region
* @return Coverage spanning the specified ReferenceRegion
*/
def apply(region: ReferenceRegion, count: Double): Coverage = {
Coverage(region.referenceName, region.start, region.end, count)
def apply(region: ReferenceRegion, count: Double, name: Option[String]): Coverage = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have this be source instead of name?

Coverage(region.referenceName, region.start, region.end, count, name)
}

/**
Expand All @@ -54,7 +54,8 @@ private[adam] object Coverage {
Coverage(feature.getContigName,
feature.getStart,
feature.getEnd,
feature.getScore)
feature.getScore,
Option(feature.getName))
}

/**
Expand All @@ -81,28 +82,33 @@ private[adam] object Coverage {
* observed.
* @param count The average coverage across this region.
*/
case class Coverage(contigName: String, start: Long, end: Long, count: Double) {
case class Coverage(contigName: String, start: Long, end: Long, count: Double, name: Option[String] = None) {

/**
* Converts Coverage to Feature, setting Coverage count in the score attribute.
*
* @return Feature built from Coverage
*/
def toFeature: Feature = {
Feature.newBuilder()
val featureBuilder = Feature.newBuilder()
.setContigName(contigName)
.setStart(start)
.setEnd(end)
.setScore(count)
.build()
}

// set name, if applicable
if (name.isDefined) {
featureBuilder.setName(name.get)
}

featureBuilder.build()
}
/**
* Converts Coverage to a Feature case class, for use with Spark SQL.
*/
def toSqlFeature: FeatureProduct = {
new FeatureProduct(featureId = None,
name = None,
name = name,
source = None,
featureType = None,
contigName = Some(contigName),
Expand Down
Expand Up @@ -124,7 +124,7 @@ case class ParquetUnboundCoverageRDD private[rdd] (
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
sqlContext.read.parquet(parquetFilename)
.select("contigName", "start", "end", "score")
.select("contigName", "start", "end", "score", "name")
.withColumnRenamed("score", "count")
.as[Coverage]
}
Expand Down Expand Up @@ -289,10 +289,15 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD
def collapse(): CoverageRDD = {
val newRDD: RDD[Coverage] = rdd
.mapPartitions(iter => {
if (iter.hasNext) {
val first = iter.next
collapse(iter, first, List.empty)
} else iter
// must sort values to iteratively collapse coverage
val sortedIter = iter.toList
.sortBy(r => (r.contigName, r.start))
.toIterator
if (sortedIter.hasNext) {
val first = sortedIter.next
println(first)
collapse(sortedIter, first, List.empty)
} else sortedIter
})

transform(rdd => newRDD)
Expand Down Expand Up @@ -324,7 +329,7 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD
val lastRegion = ReferenceRegion(lastCoverage)
val (nextCoverage, nextCondensed) =
if (rr.isAdjacent(lastRegion) && lastCoverage.count == cov.count) {
(Coverage(rr.merge(lastRegion), lastCoverage.count), condensed)
(Coverage(rr.merge(lastRegion), lastCoverage.count, lastCoverage.name), condensed)
} else {
(cov, lastCoverage :: condensed)
}
Expand Down Expand Up @@ -417,12 +422,13 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD
// key coverage by binning start site mod bpPerbin
// subtract region.start to shift mod to start of ReferenceRegion
val start = r.start - (r.start % bpPerBin)
ReferenceRegion(r.contigName, start, start + bpPerBin)
(r.name, ReferenceRegion(r.contigName, start, start + bpPerBin))
}).mapValues(r => (r.count, 1))
.reduceByKey(reduceFn)
.map(r => {
// r is a key of (id: string, region: ReferenceRegion), and value of (count: double, int)
// compute average coverage in bin
Coverage(r._1.referenceName, r._1.start, r._1.end, r._2._1 / r._2._2)
Coverage(r._1._2.referenceName, r._1._2.start, r._1._2.end, r._2._1 / r._2._2, r._1._1)
})
flattened.transform(rdd => newRDD)
}
Expand Down Expand Up @@ -467,7 +473,7 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD
private def flatMapCoverage(rdd: RDD[Coverage]): RDD[Coverage] = {
rdd.flatMap(r => {
val positions = r.start until r.end
positions.map(n => Coverage(r.contigName, n, n + 1, r.count))
positions.map(n => Coverage(r.contigName, n, n + 1, r.count, r.name))
})
}
}
Expand Down
Expand Up @@ -311,7 +311,7 @@ case class DatasetBoundFeatureRDD private[rdd] (
def toCoverage(): CoverageRDD = {
import dataset.sqlContext.implicits._
DatasetBoundCoverageRDD(dataset.toDF
.select("contigName", "start", "end", "score")
.select("contigName", "start", "end", "score", "name")
.withColumnRenamed("score", "count")
.as[Coverage], sequences)
}
Expand Down
Expand Up @@ -348,9 +348,9 @@ case class RDDBoundAlignmentRecordRDD private[rdd] (
readMapped
}).flatMap(r => {
val positions: List[Long] = List.range(r.getStart, r.getEnd)
positions.map(n => (ReferencePosition(r.getContigName, n), 1))
positions.map(n => ((r.getRecordGroupSample, ReferencePosition(r.getContigName, n)), 1))
}).reduceByKey(_ + _)
.map(r => Coverage(r._1, r._2.toDouble))
.map(r => Coverage(r._1._2, r._2.toDouble, Option(r._1._1)))

RDDBoundCoverageRDD(covCounts, sequences, None)
}
Expand All @@ -371,7 +371,7 @@ case class RDDBoundAlignmentRecordRDD private[rdd] (
}
}

private case class AlignmentWindow(contigName: String, start: Long, end: Long) {
private case class AlignmentWindow(contigName: String, start: Long, end: Long, name: String) {
}

sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] {
Expand Down Expand Up @@ -483,7 +483,8 @@ sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[A
import dataset.sqlContext.implicits._
val covCounts = dataset.toDF
.where($"readMapped")
.select($"contigName", $"start", $"end")
.select($"contigName", $"start", $"end", $"recordGroupSample")
.withColumnRenamed("recordGroupSample", "name")
.as[AlignmentWindow]
.flatMap(w => {
val width = (w.end - w.start).toInt
Expand All @@ -493,12 +494,12 @@ sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[A
while (idx < width) {
val lastPos = pos
pos += 1L
buffer(idx) = Coverage(w.contigName, lastPos, pos, 1.0)
buffer(idx) = Coverage(w.contigName, lastPos, pos, 1.0, Option(w.name))
idx += 1
}
buffer
}).toDF
.groupBy("contigName", "start", "end")
.groupBy("contigName", "start", "end", "name")
.sum("count")
.withColumnRenamed("sum(count)", "count")
.as[Coverage]
Expand Down
Expand Up @@ -40,6 +40,27 @@ class CoverageSuite extends ADAMFunSuite {
assert(coverageAfterConversion.count == featureToConvert.score)
}

sparkTest("Convert to coverage from valid Feature with name") {
// create a valid feature
val featureToConvert =
Feature.newBuilder()
.setContigName("chr1")
.setStart(1)
.setEnd(2)
.setScore(100)
.setName("coverageName")
.build()

val coverageAfterConversion = Coverage(featureToConvert)

// all fields should match between the two
assert(coverageAfterConversion.start == featureToConvert.start)
assert(coverageAfterConversion.end == featureToConvert.end)
assert(coverageAfterConversion.contigName == featureToConvert.contigName)
assert(coverageAfterConversion.count == featureToConvert.score)
assert(coverageAfterConversion.name == Some(featureToConvert.name))
}

sparkTest("Convert to coverage from Feature with null/empty contigName fails with correct error") {
// feature with empty contigname is not valid when converting to Coverage
val featureWithEmptyContigName =
Expand Down
Expand Up @@ -135,6 +135,8 @@ class CoverageRDDSuite extends ADAMFunSuite {
val coverageRDD: CoverageRDD = featureRDD.toCoverage
testMetadata(coverageRDD)

print(coverageRDD.rdd.first)

val outputFile = tmpLocation(".bed")
coverageRDD.save(outputFile, false, false)

Expand Down Expand Up @@ -181,6 +183,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val selfUnion = coverage.union(coverage)
assert(selfUnion.rdd.count === 6)
val coverageDs = coverage.transformDataset(ds => ds) // no-op, forces to dataset
print(coverageDs.rdd.first())
val selfUnionDs = coverageDs.union(coverageDs)
assert(selfUnionDs.rdd.count === 6)
}
Expand All @@ -202,6 +205,28 @@ class CoverageRDDSuite extends ADAMFunSuite {
assert(coverage.rdd.count == 1)
}

sparkTest("can read a bed file with multiple samples to coverage") {

val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(10).setScore(3.0).setName("S1").build()
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).setName("S1").build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).setName("S1").build()

val f4 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(10).setScore(2.0).setName("S2").build()
val f5 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).setName("S2").build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3, f4, f5)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".adam")
coverageRDD.save(outputFile, false, false)

val region = ReferenceRegion("chr1", 1, 9)
val predicate = region.toPredicate
val coverage = sc.loadParquetCoverage(outputFile, Some(predicate))
print(coverage.rdd.collect)
assert(coverage.rdd.count == 2)
}

sparkTest("correctly flatmaps coverage without aggregated bins") {
val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(5).setScore(1.0).build()
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
Expand Down
Expand Up @@ -228,6 +228,23 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
testCoverage(coverageDs)
}

sparkTest("computes coverage with multiple samples") {
// glob in 2 files with different samples
// testFile cannot read complex globs, so we have to parse the parent path
val relativePath = new File(testFile("NA12878.1_854950_855150.sam")).getParentFile.getPath
val inputPath = relativePath + "/{NA12878.1_854950_855150,bqsr1}.sam"

val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath)

def countByName(coverage: CoverageRDD, name: String): Long = {
coverage.rdd.filter(r => r.name == Some(name)).count
}

val coverageRdd = reads.toCoverage()
assert(countByName(coverageRdd, "NA12878") == 677) // number of coverage when just NA12878 is computed
assert(countByName(coverageRdd, "HG00096") == 39000) // number of coverage when just HG00096 is computed
}

sparkTest("merges adjacent records with equal coverage values") {
val inputPath = testFile("artificial.sam")
val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath)
Expand Down
9 changes: 7 additions & 2 deletions adam-python/bdgenomics/adam/test/alignmentRecordRdd_test.py
Expand Up @@ -135,7 +135,8 @@ def test_transmute_to_coverage(self):
readsAsCoverage = reads.transmute(lambda x: x.select(x.contigName,
x.start,
x.end,
x.mapq.cast(DoubleType()).alias("count")),
x.mapq.cast(DoubleType()).alias("count"),
x.recordGroupSample.alias("name")),
CoverageRDD)

assert(isinstance(readsAsCoverage, CoverageRDD))
Expand All @@ -150,8 +151,12 @@ def test_to_coverage(self):
reads = ac.loadAlignments(readsPath)

coverage = reads.toCoverage()
self.assertEquals(coverage.toDF().count(), 42)
# no reads are overlapping, so collapsed count should just be the number of reads
# collapsed
self.assertEquals(coverage.toDF().count(), 5)

# 5 reads: contig 3 has 8 bp, chr2 (2 strands) has 20bp, contig 4 has 8bp, contig 1 has 10 bp
# 8 + 20 + 8 + 10 = 46
coverage = reads.toCoverage(collapse = False)
self.assertEquals(coverage.toDF().count(), 46)

Expand Down