Skip to content

Commit

Permalink
Add comments and style fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jey committed Mar 11, 2014
1 parent fdf53ed commit 3b68675
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ extends Serializable with Logging {

// First phase
val observed: ObservationTable = reads.
filter(shouldIncludeRead).map(observe).
aggregate(ObservationAccumulator(covariates))(_ ++= _, _ += _).result
filter(shouldIncludeRead).flatMap(observe).
aggregate(ObservationAccumulator(covariates))(_ += _, _ ++= _).result

// Log the ObservationTable
// TODO: delete once unneeded; temporarily added for creating plots
println(observed.toCSV)
// Log the ObservationTable (for debugging and generating plots)
//println(observed.toCSV)

// Second phase
val recalibrator = Recalibrator(observed, minAcceptableQuality)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,26 @@ import edu.berkeley.cs.amplab.adam.rich.DecadentRead._
import edu.berkeley.cs.amplab.adam.util.QualityScore
import edu.berkeley.cs.amplab.adam.util.Util

/**
* A Covariate represents a predictor, also known as a "feature" or
* "independent variable".
*
* @note Concrete implementations of Covariate should inherit from
* AbstractCovariate, not Covariate.
*/
trait Covariate {
type Value

/**
* Given a read, computes the value of this covariate for each residue in the
* read.
*
* The returned values must be in the same order as the residues. A value
* of None means this covariate does not apply to the corresponding residue.
*
* Example: The DinucCovariate returns a pair of bases for each residue,
* except for bases at the start of a read, for which it returns None.
*/
def compute(read: DecadentRead): Seq[Option[Value]]

def apply(read: DecadentRead): Seq[Option[Value]] = compute(read)
Expand All @@ -42,6 +59,12 @@ abstract class AbstractCovariate[ValueT] extends Covariate with Serializable {
override type Value = ValueT
}

/**
* Represents a tuple containing a value for each covariate.
*
* The values for mandatory covariates are stored in member fields and optional
* covariate valuess are in `extras`.
*/
class CovariateKey(
val readGroup: String,
val quality: QualityScore,
Expand All @@ -63,6 +86,10 @@ class CovariateKey(
override def hashCode = Util.hashCombine(0xD20D1E51, parts.hashCode)
}

/**
* Represents the abstract space of all possible CovariateKeys for the given set
* of Covariates.
*/
class CovariateSpace(val extras: IndexedSeq[Covariate]) extends Serializable {
// Computes the covariate values for all residues in this read
def apply(read: DecadentRead): Seq[CovariateKey] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import edu.berkeley.cs.amplab.adam.rich.DecadentRead._

// TODO: should inherit from something like AbstractCovariate[(DNABase, DNABase)]
class DinucCovariate extends AbstractCovariate[(Char, Char)] {
// TODO: does this covariate even make sense? why not (isNegative: Boolean, (Char, Char))
// instead of reversing the sense of the strand? or is the machine's chemistry such that
// this is what makes the most sense?

def compute(read: DecadentRead): Seq[Option[(Char, Char)]] = {
val sequence = read.sequence.map(_.base)
if(read.isNegativeRead) {
/* Use the reverse-complement of the sequence to get back the original
* sequence as it was read by the sequencing machine. The sequencer
* always reads from the 5' to the 3' end of each strand, but the output
* from the aligner is always in the same sense as the reference, so we
* use the reverse-complement if this read was originally from the
* complementary strand.
*/
dinucs(complement(sequence.reverse)).reverse
} else {
dinucs(sequence)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import edu.berkeley.cs.amplab.adam.util.QualityScore
import edu.berkeley.cs.amplab.adam.util.Util
import scala.collection.mutable

/**
* An empirical frequency count of mismatches from the reference.
*
* This is used in ObservationTable, which maps from CovariateKey to Observation.
*/
class Observation(val total: Long, val mismatches: Long) extends Serializable {
require(mismatches >= 0 && mismatches <= total)

Expand Down Expand Up @@ -115,11 +120,22 @@ object Aggregate {
new Aggregate(value.total, value.mismatches, key.quality.errorProbability * value.mismatches)
}

/**
* Table containing the empirical frequency of mismatches for each set of covariate values.
*/
class ObservationTable(
val space: CovariateSpace,
val entries: Map[CovariateKey, Observation]
) extends Serializable {

// `func' computes the aggregation key
def aggregate[K](func: (CovariateKey, Observation) => K): Map[K, Aggregate] = {
val grouped = entries.groupBy { case (key, value) => func(key, value) }
val newEntries = grouped.mapValues(bucket =>
bucket.map { case (oldKey, obs) => Aggregate(oldKey, obs) }.fold(Aggregate.empty)(_ + _))
newEntries.toMap
}

override def toString = entries.map { case (k, v) => "%s\t%s".format(k, v) }.mkString("\n")

// Format as CSV compatible with GATK's output
Expand All @@ -131,25 +147,15 @@ class ObservationTable(
}

def csvHeader: Seq[String] = space.csvHeader ++ Seq("TotalCount", "MismatchCount", "EmpiricalQ", "IsSkipped")

// `func' computes the aggregation key
def aggregate[K](func: (CovariateKey, Observation) => K): Map[K, Aggregate] = {
val grouped = entries.groupBy { case (key, value) => func(key, value) }
val newEntries = grouped.mapValues(bucket =>
bucket.map { case (oldKey, obs) => Aggregate(oldKey, obs) }.fold(Aggregate.empty)(_ + _))
newEntries.toMap
}
}

class ObservationAccumulator(val space: CovariateSpace) extends Serializable {
private val entries = mutable.HashMap[CovariateKey, Observation]()

def ++= (that: Seq[(CovariateKey, Observation)]): ObservationAccumulator = {
that.foreach { case (k, v) => accum(k, v) }
this
}
def += (that: (CovariateKey, Observation)): ObservationAccumulator =
accum(that._1, that._2)

def += (that: ObservationAccumulator): ObservationAccumulator = {
def ++= (that: ObservationAccumulator): ObservationAccumulator = {
if(this.space != that.space)
throw new IllegalArgumentException("Can only combine observations with matching CovariateSpaces")
that.entries.foreach { case (k, v) => accum(k, v) }
Expand Down

0 comments on commit 3b68675

Please sign in to comment.