Skip to content

Commit

Permalink
[QUININE-14] Compute coverage statistics.
Browse files Browse the repository at this point in the history
Adds modules for computing coverage counts, as well as aggregate statistics. On
this, we provide a CLI for computing these stats. Resolves bigdatagenomics#14.
  • Loading branch information
fnothaft committed Jul 7, 2016
1 parent 9359ece commit 820e1f5
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -399,6 +399,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions quinine-cli/pom.xml
Expand Up @@ -145,6 +145,10 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
</dependency>
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
Expand Down
135 changes: 135 additions & 0 deletions quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/Coverage.scala
@@ -0,0 +1,135 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.quinine.cli

import java.io.OutputStream
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.quinine.metrics.coverage.{ CoverageStats, ReadCoverage }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object Coverage extends BDGCommandCompanion {

val commandName = "coverage"
val commandDescription = "Compute the read coverage across our panel."

def apply(cmdLine: Array[String]): BDGCommand = {
new Coverage(Args4j[CoverageArgs](cmdLine))
}
}

class CoverageArgs extends Args4jBase {
@Argument(required = true,
metaVar = "READS",
usage = "The sequenced dataset.",
index = 0)
val readPath: String = null

@Args4jOption(required = false,
name = "-statPath",
usage = "File to write stats to. If omitted, writes to standard out.")
val statPath: String = null

@Args4jOption(required = false,
name = "-rawCoveragePath",
usage = "Optional path to save the raw coverage counts.")
val coveragePath: String = null

@Args4jOption(required = false,
name = "-sortCoverage",
usage = "If saving coverage, sort by position. Default is false.")
val sortCoverage: Boolean = false

@Args4jOption(required = false,
name = "-saveRawCoverageAsText",
usage = "If saving raw coverage, save as text. Default is false.")
val saveAsText: Boolean = false
}

class Coverage(protected val args: CoverageArgs) extends BDGSparkCommand[CoverageArgs] {

val companion = Coverage

def run(sc: SparkContext) {

// load in the reads
val reads = sc.loadAlignments(args.readPath)

// compute the intermediate coverage results
val coverageCounts = ReadCoverage(reads)

// do we need to save the coverage output?
if (args.coveragePath != null) {

// cache the coverage counts
coverageCounts.cache()

// need to map the keys into ref regions
// ref pos is not a case class, ref reg is
val coverageRdd = coverageCounts.map(kv => {
(ReferenceRegion(kv._1), kv._2)
})

// convert to a dataframe
val sqlContext = new SQLContext(sc)
val coverageDf = sqlContext.createDataFrame(coverageRdd)

// does this need to be sorted?
val dfToSave = if (args.sortCoverage) {
coverageDf.orderBy(coverageDf("_1.referenceName"),
coverageDf("_1.start"))
} else {
coverageDf
}

// are we saving as text or not?
if (args.saveAsText) {
dfToSave.write.json(args.coveragePath)
} else {
dfToSave.write.parquet(args.coveragePath)
}
}

// compute the final stats
val coverageStats = CoverageStats(coverageCounts)

// if desired, write the coverage stats to a file
if (args.statPath != null) {

// get the underlying file system
val fs = FileSystem.get(sc.hadoopConfiguration)

// get a stream to write to a file
val os = fs.create(new Path(args.statPath))
.asInstanceOf[OutputStream]

// write the stats
os.write(coverageStats.toString.getBytes)

// close the output stream
os.flush()
os.close()
} else {
println(coverageStats)
}
}
}
@@ -0,0 +1,148 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.quinine.metrics.coverage

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion
import scala.annotation.tailrec

/**
* Simple statistics about coverage.
*
* @param median The median coverage of a set of sites.
* @param mean The (arithmetic) mean coverage of a set of sites.
* @param distribution A map between coverage and the number of sites covered
* at that coverage.
*/
case class CoverageStats(median: Double,
mean: Double,
distribution: Map[Int, Int]) {
}

/**
* Companion object for building coverage stats.
*/
object CoverageStats extends Serializable {

/**
* Takes the arithmetic mean of a Seq that contains (value, weight) pairs.
*
* @param distribution Seq of (value, weight) pairs.
* @return The mean of this distribution.
*/
private[coverage] def takeMean(distribution: Seq[(Int, Int)]): Double = {
distribution.map(p => p._1 * p._2)
.sum
.toDouble / distribution.map(p => p._2)
.sum
.toDouble
}

/**
* Takes the arithmetic mean of a Seq that contains (value, weight) pairs.
*
* @param distribution Seq of (value, weight) pairs.
* @return The mean of this distribution.
*
* @note Assumes that the input is sorted by key.
*/
private[coverage] def takeMedian(distribution: Seq[(Int, Int)]): Double = {

// sum the weights to get the total number of occurrences
// divide this in half, and you have the median point
val medianCount = (distribution.map(_._2).sum + 1).toDouble / 2.0

@tailrec def recurse(iter: Iterator[(Int, Int)],
observed: Int,
lastIdx: Int,
optLower: Option[Int]): Double = {

// if this triggers, something has gone terribly wrong
// we should stop midway through the iterator
// so, assume it passes, and pop the head
assert(iter.hasNext)
val (idx, count) = iter.next

// we require that the list is sorted,
// and thus that indices increase monotonically
assert(idx > lastIdx)

// where are we?
if ((observed + 1).toDouble > medianCount) {

// are we moving beyond the median?
// if so, we should've seen that this was about to happen in the last step
assert(optLower.isDefined)

// if this happens, take the arithmetic mean of this and the last index
(optLower.get + idx).toDouble / 2.0
} else if (observed.toDouble <= medianCount && (observed + count).toDouble >= medianCount) {

// are we centered around the median?
// if so, return our current index
idx.toDouble
} else {

// haven't hit the median yet! but,
// is the median between us and the next step?
val nextOptLower = if ((observed + count).toDouble < medianCount &&
(observed + count + 1).toDouble > medianCount) {
Some(idx)
} else {
None
}

// recurse!
recurse(iter, observed + count, idx, nextOptLower)
}
}

// compute the median
recurse(distribution.toIterator, 0, -1, None)
}

/**
* Computes coverage stats from an RDD containing per-site coverage counts.
*
* @param rdd RDD containing tuples that describe the number of reads covering
* a given genomic locus/region.
* @return Returns an instance describing the distribution of coverage over
* those loci.
*
* @tparam T A class that describes a genomic interval.
*/
def apply[T <: ReferenceRegion](rdd: RDD[(T, Int)]): CoverageStats = {

// compute coverage distribution, collect to driver, and sort
val coverageDistribution = rdd.map(p => (p._2, 1))
.reduceByKey(_ + _)
.collect
.toSeq
.sortBy(_._1)

// compute arithmetic mean of coverage
val mean = takeMean(coverageDistribution)

// compute median of coverage
val median = takeMedian(coverageDistribution)

// build and return
CoverageStats(median, mean, coverageDistribution.toMap)
}
}
@@ -0,0 +1,62 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.quinine.metrics.coverage

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferencePosition
import org.bdgenomics.formats.avro.AlignmentRecord

/**
* Computes coverage at all sites covered in a sequencing dataset.
*/
object ReadCoverage extends Serializable {

/**
* Flatten a read into individual coverage observations.
*
* If a read is mapped, generates a Seq containing a tuple per position
* covered by the read, where the tuple contains a ReferencePosition and the
* integer 1. If the read is not mapped, returns an empty Seq.
*
* @param read The read to turn into coverage observations.
* @return Returns (site, coverage) pairs if the read is mapped, or an empty
* Seq if the read is not mapped.
*/
private[coverage] def flattenRead(read: AlignmentRecord): Seq[(ReferencePosition, Int)] = {
if (read.getReadMapped) {
(read.getStart.toInt until read.getEnd.toInt).map(idx => {
(ReferencePosition(read.getContig.getContigName, idx), 1)
})
} else {
Seq.empty
}
}

/**
* Given an RDD of reads, computes the coverage of all sites where at least
* one read is aligned.
*
* @param reads An RDD of aligned reads.
* @return Returns an RDD containing tuples of (genomic position, number of
* bases at this site).
*/
def apply(reads: RDD[AlignmentRecord]): RDD[(ReferencePosition, Int)] = {
reads.flatMap(flattenRead)
.reduceByKey(_ + _)
}
}

0 comments on commit 820e1f5

Please sign in to comment.