Skip to content

Commit

Permalink
Finishing up the cleanup on org.bdgenomics.adam.rdd.
Browse files Browse the repository at this point in the history
* Made `MDTagging` private to `read`. Exposed it through `AlignmentRecordRDD`.
  Cleaned up where it was used in `Transform`.
* Made `FlagStat` and most related models private to `read`.
* Made all `read.recalibration` and `read.realignment` classes private to
  package. Depending on the class, this varied from:
  * `adam` for classes that need to be registered with `kryo`
  * `read` for the core `RealignIndels` and `BaseQualityRecalibrator` engines
  * Their subpackage where possible.
* Made class values for genomic partitioners as private as possible.
* Added documentation to all `InFormatter`s, `InFormatterCompanion`s, and
  `OutFormatter`s. Made the `InFormatter`s package private with private
  constructors.
* Added method/class level scaladoc where missing.
* Moved `org.bdgenomics.adam.cli.FlagStatSuite` to `org.bdgenomics.adam.read`,
  along with the `NA12878.sam` test resource, which was otherwise unused in the
  `adam-cli` submodule.
  • Loading branch information
fnothaft committed Nov 15, 2016
1 parent c21401f commit 4063b29
Show file tree
Hide file tree
Showing 38 changed files with 463 additions and 126 deletions.
20 changes: 9 additions & 11 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Expand Up @@ -316,19 +316,17 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
* these tags are recomputed or not. If MD tagging isn't requested, we
* return the input RDD.
*/
def maybeMdTag(rdd: AlignmentRecordRDD,
def maybeMdTag(sc: SparkContext,
rdd: AlignmentRecordRDD,
stringencyOpt: Option[ValidationStringency]): AlignmentRecordRDD = {
if (args.mdTagsReferenceFile != null) {
log.info(s"Adding MDTags to reads based on reference file ${args.mdTagsReferenceFile}")
rdd.transform(r => {
MDTagging(
r,
args.mdTagsReferenceFile,
fragmentLength = args.mdTagsFragmentSize,
overwriteExistingTags = args.mdTagsOverwrite,
validationStringency = stringencyOpt.getOrElse(ValidationStringency.STRICT)
)
})
val referenceFile = sc.loadReferenceFile(args.mdTagsReferenceFile,
fragmentLength = args.mdTagsFragmentSize)
rdd.computeMismatchingPositions(
referenceFile,
overwriteExistingTags = args.mdTagsOverwrite,
validationStringency = stringencyOpt.getOrElse(ValidationStringency.STRICT))
} else {
rdd
}
Expand Down Expand Up @@ -360,7 +358,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
val maybeSortedRdd = maybeSort(finalPreprocessedRdd, sl)

// recompute md tags, if requested, and return
maybeMdTag(maybeSortedRdd, stringencyOpt)
maybeMdTag(sc, maybeSortedRdd, stringencyOpt)
}

def forceNonParquet(): Boolean = {
Expand Down
Expand Up @@ -87,6 +87,10 @@ private case class LocatableReferenceRegion(rr: ReferenceRegion) extends Locatab
def getContig(): String = rr.referenceName
}

/**
* This singleton provides an implicit conversion from a SparkContext to the
* ADAMContext, as well as implicit functions for the Pipe API.
*/
object ADAMContext {

// conversion functions for pipes
Expand Down Expand Up @@ -141,6 +145,11 @@ private class FileFilter(private val name: String) extends PathFilter {

import org.bdgenomics.adam.rdd.ADAMContext._

/**
* The ADAMContext provides functions on top of a SparkContext for loading genomic data.
*
* @param sc The SparkContext to wrap.
*/
class ADAMContext private (@transient val sc: SparkContext) extends Serializable with Logging {

/**
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.bdgenomics.adam.rdd

import java.util.logging.Level

import java.io.OutputStream
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
Expand All @@ -41,9 +40,32 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil
import scala.reflect.ClassTag

/**
* Argument configuration for saving any output format.
*/
trait ADAMSaveAnyArgs extends SaveArgs {

/**
* If true and saving as FASTQ, we will sort by read name.
*/
var sortFastqOutput: Boolean

/**
* If true and saving as a legacy format, we will write shards so that they
* can be merged into a single file.
*
* @see deferMerging
*/
var asSingleFile: Boolean

/**
* If true and asSingleFile is true, we will not merge the shards once we
* write them, and will leave them for the user to merge later. If false and
* asSingleFile is true, then we will merge the shards on write. If
* asSingleFile is false, this is ignored.
*
* @see asSingleFile
*/
var deferMerging: Boolean
}

Expand Down Expand Up @@ -104,6 +126,17 @@ private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] exten
)
}

/**
* Saves an RDD of Avro data to Parquet.
*
* @param filePath The path to save the file to.
* @param blockSize The size in bytes of blocks to write.
* @param pageSize The size in bytes of pages to write.
* @param compressCodec The compression codec to apply to pages.
* @param disableDictionaryEncoding If false, dictionary encoding is used. If
* true, delta encoding is used.
* @param schema The schema to set.
*/
protected def saveRddAsParquet(
filePath: String,
blockSize: Int = 128 * 1024 * 1024,
Expand Down Expand Up @@ -138,10 +171,26 @@ private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] exten
since = "0.20.0")
private[rdd] class ConcreteADAMRDDFunctions[T <% IndexedRecord: Manifest](val rdd: RDD[T]) extends ADAMRDDFunctions[T] {

/**
* Saves an RDD of Avro data to Parquet.
*
* @param args The output format configuration to use when saving the data.
*/
def saveAsParquet(args: SaveArgs): Unit = {
saveRddAsParquet(args)
}

/**
* Saves an RDD of Avro data to Parquet.
*
* @param filePath The path to save the file to.
* @param blockSize The size in bytes of blocks to write.
* @param pageSize The size in bytes of pages to write.
* @param compressCodec The compression codec to apply to pages.
* @param disableDictionaryEncoding If false, dictionary encoding is used. If
* true, delta encoding is used.
* @param schema The schema to set.
*/
def saveAsParquet(
filePath: String,
blockSize: Int = 128 * 1024 * 1024,
Expand Down
Expand Up @@ -41,27 +41,44 @@ case class GenomicPositionPartitioner(numParts: Int, seqLengths: Map[String, Lon
log.info("Have genomic position partitioner with " + numParts + " partitions, and sequences:")
seqLengths.foreach(kv => log.info("Contig " + kv._1 + " with length " + kv._2))

val names: Seq[String] = seqLengths.keys.toSeq.sortWith(_ < _)
val lengths: Seq[Long] = names.map(seqLengths(_))
private val names: Seq[String] = seqLengths.keys.toSeq.sortWith(_ < _)
private val lengths: Seq[Long] = names.map(seqLengths(_))
private val cumuls: Seq[Long] = lengths.scan(0L)(_ + _)

// total # of bases in the sequence dictionary
val totalLength: Long = lengths.sum
private val totalLength: Long = lengths.sum

// referenceName -> cumulative length before this sequence (using seqDict.records as the implicit ordering)
val cumulativeLengths: Map[String, Long] = Map(
private[rdd] val cumulativeLengths: Map[String, Long] = Map(
names.zip(cumuls): _*
)

/**
* 'parts' is the total number of partitions for non-UNMAPPED ReferencePositions --
* the total number of partitions (see numPartitions, below) is parts+1, with the
* extra partition being included for handling ReferencePosition.UNMAPPED
*
* @see numPartitions
*/
private val parts = min(numParts, totalLength).toInt

/**
* This is the total number of partitions for both mapped and unmapped
* positions. All unmapped positions go into the last partition.
*
* @see parts
*/
override def numPartitions: Int = parts + 1

/**
* Computes the partition for a key.
*
* @param key A key to compute the partition for.
* @return The partition that this key belongs to.
*
* @throws IllegalArgumentException if the key is not a ReferencePosition, or
* (ReferencePosition, _) tuple.
*/
override def getPartition(key: Any): Int = {

// This allows partitions that cross chromosome boundaries.
Expand Down Expand Up @@ -101,9 +118,11 @@ case class GenomicPositionPartitioner(numParts: Int, seqLengths: Map[String, Lon
override def toString(): String = {
return "%d parts, %d partitions, %s" format (parts, numPartitions, cumulativeLengths.toString)
}

}

/**
* Helper for creating genomic position partitioners.
*/
object GenomicPositionPartitioner {

/**
Expand All @@ -120,7 +139,17 @@ object GenomicPositionPartitioner {
seqDict.records.toSeq.map(rec => (rec.name, rec.length)).toMap
}

case class GenomicRegionPartitioner(partitionSize: Long, seqLengths: Map[String, Long], start: Boolean = true) extends Partitioner with Logging {
/**
* A partitioner for ReferenceRegion-keyed data.
*
* @param partitionSize The number of bases per partition.
* @param seqLengths A map between contig names and contig lengths.
* @param start If true, use the start position (instead of the end position) to
* decide which partition a key belongs to.
*/
case class GenomicRegionPartitioner(partitionSize: Long,
seqLengths: Map[String, Long],
start: Boolean = true) extends Partitioner with Logging {
private val names: Seq[String] = seqLengths.keys.toSeq.sortWith(_ < _)
private val lengths: Seq[Long] = names.map(seqLengths(_))
private val parts: Seq[Int] = lengths.map(v => round(ceil(v.toDouble / partitionSize)).toInt)
Expand All @@ -138,8 +167,19 @@ case class GenomicRegionPartitioner(partitionSize: Long, seqLengths: Map[String,
(cumulParts(refReg.referenceName) + pos / partitionSize).toInt
}

/**
* @return The number of partitions described by this partitioner. Roughly the
* size of the genome divided by the partition length.
*/
override def numPartitions: Int = parts.sum

/**
* @param key The key to get the partition index for.
* @return The partition that a key should map to.
*
* @throws IllegalArgumentException Throws an exception if the data is not a
* ReferenceRegion or a tuple of (ReferenceRegion, _).
*/
override def getPartition(key: Any): Int = {
key match {
case region: ReferenceRegion => {
Expand All @@ -153,6 +193,9 @@ case class GenomicRegionPartitioner(partitionSize: Long, seqLengths: Map[String,
}
}

/**
* Helper object for creating GenomicRegionPartitioners.
*/
object GenomicRegionPartitioner {

/**
Expand Down
58 changes: 58 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -81,16 +81,39 @@ private[rdd] object GenomicRDD {
}
}

/**
* A trait that wraps an RDD of genomic data with helpful metadata.
*
* @tparam T The type of the data in the wrapped RDD.
* @tparam U The type of this GenomicRDD.
*/
trait GenomicRDD[T, U <: GenomicRDD[T, U]] {

/**
* The RDD of genomic data that we are wrapping.
*/
val rdd: RDD[T]

/**
* The sequence dictionary describing the reference assembly this data is
* aligned to.
*/
val sequences: SequenceDictionary

/**
* The underlying RDD of genomic data, as a JavaRDD.
*/
lazy val jrdd: JavaRDD[T] = {
rdd.toJavaRDD()
}

/**
* Applies a function that transforms the underlying RDD into a new RDD.
*
* @param tFn A function that transforms the underlying RDD.
* @return A new RDD where the RDD of genomic data has been replaced, but the
* metadata (sequence dictionary, and etc) is copied without modification.
*/
def transform(tFn: RDD[T] => RDD[T]): U = {
replaceRdd(tFn(rdd))
}
Expand Down Expand Up @@ -321,6 +344,14 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
})
}

/**
* Runs a filter that selects data in the underlying RDD that overlaps a
* single genomic region.
*
* @param query The region to query for.
* @return Returns a new GenomicRDD containing only data that overlaps the
* query region.
*/
def filterByOverlappingRegion(query: ReferenceRegion): U = {
replaceRdd(rdd.filter(elem => {

Expand Down Expand Up @@ -647,13 +678,28 @@ private case class GenericGenomicRDD[T](rdd: RDD[T],
}
}

/**
* A trait describing a GenomicRDD with data from multiple samples.
*/
trait MultisampleGenomicRDD[T, U <: MultisampleGenomicRDD[T, U]] extends GenomicRDD[T, U] {

/**
* The samples who have data contained in this GenomicRDD.
*/
val samples: Seq[Sample]
}

/**
* An abstract class describing a GenomicRDD where:
*
* * The data are Avro IndexedRecords.
* * The data are associated to read groups (i.e., they are reads or fragments).
*/
abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroReadGroupGenomicRDD[T, U]] extends AvroGenomicRDD[T, U] {

/**
* A dictionary describing the read groups attached to this GenomicRDD.
*/
val recordGroups: RecordGroupDictionary

override protected def saveMetadata(filePath: String) {
Expand All @@ -675,6 +721,10 @@ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroRe
}
}

/**
* An abstract class that extends the MultisampleGenomicRDD trait, where the data
* are Avro IndexedRecords.
*/
abstract class MultisampleAvroGenomicRDD[T <% IndexedRecord: Manifest, U <: MultisampleAvroGenomicRDD[T, U]] extends AvroGenomicRDD[T, U]
with MultisampleGenomicRDD[T, U] {

Expand All @@ -695,6 +745,11 @@ abstract class MultisampleAvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Mult
}
}

/**
* An abstract class that extends GenomicRDD and where the underlying data is
* Avro IndexedRecords. This abstract class provides methods for saving to
* Parquet, and provides hooks for writing the metadata.
*/
abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroGenomicRDD[T, U]] extends ADAMRDDFunctions[T]
with GenomicRDD[T, U] {

Expand Down Expand Up @@ -793,6 +848,9 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroGenomicRDD[
}
}

/**
* A trait for genomic data that is not aligned to a reference (e.g., raw reads).
*/
trait Unaligned {

val sequences = SequenceDictionary.empty
Expand Down
17 changes: 17 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/InFormatter.scala
Expand Up @@ -30,8 +30,25 @@ private[rdd] class InFormatterRunner[T, U <: GenomicRDD[T, U], V <: InFormatter[
}
}

/**
* A trait for singleton objects that build an InFormatter from a GenomicRDD.
*
* Often, when creating an outputstream, we need to add metadata to the output
* that is not attached to individual records. An example of this is writing a
* header with contig/read group/format info, as is done with SAM/BAM/VCF.
*
* @tparam T The type of the records this InFormatter writes out.
* @tparam U The type of the GenomicRDD this companion object understands.
* @tparam V The type of InFormatter this companion object creates.
*/
trait InFormatterCompanion[T, U <: GenomicRDD[T, U], V <: InFormatter[T, U, V]] {

/**
* Creates an InFormatter from a GenomicRDD.
*
* @param gRdd The GenomicRDD to get metadata from.
* @return Returns an InFormatter with attached metadata.
*/
def apply(gRdd: U): V
}

Expand Down

0 comments on commit 4063b29

Please sign in to comment.