Skip to content

Commit

Permalink
[ADAM-1112] Add interface for piping commands.
Browse files Browse the repository at this point in the history
Resolves bigdatagenomics#1112. Adds interfaces for piping SAM/BAM and VCF to subprocesses that
are run in parallel under Apache Spark.
  • Loading branch information
fnothaft committed Sep 15, 2016
1 parent 5e2853e commit 293df36
Show file tree
Hide file tree
Showing 16 changed files with 694 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,29 @@ case class ReferenceRegion(
else
None

/**
* @param by The number of bases to extend the region by from both the start
* and the end.
* @return Returns a new reference region where the start and end have been
* moved.
*/
def pad(by: Long): ReferenceRegion = {
pad(by, by)
}

/**
* @param byStart The number of bases to move the start position forward by.
* @param byEnd The number of bases to move the end position back by.
* @return Returns a new reference region where the start and/or end have been
* moved.
*/
def pad(byStart: Long, byEnd: Long): ReferenceRegion = {
new ReferenceRegion(referenceName,
start - byStart,
end + byEnd,
orientation)
}

def contains(other: ReferencePosition): Boolean = {
orientation == other.orientation &&
referenceName == other.referenceName &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.bdgenomics.adam.models
import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord }
import scala.collection.JavaConversions._

object SAMFileHeaderWritable {
private[adam] object SAMFileHeaderWritable {
def apply(header: SAMFileHeader): SAMFileHeaderWritable = {
new SAMFileHeaderWritable(header)
}
}

class SAMFileHeaderWritable(@transient hdr: SAMFileHeader) extends Serializable {
private[adam] class SAMFileHeaderWritable(@transient hdr: SAMFileHeader) extends Serializable {
// extract fields that are needed in order to recreate the SAMFileHeader
protected val text = {
val txt: String = hdr.getTextHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ private case class LocatableReferenceRegion(rr: ReferenceRegion) extends Locatab
}

object ADAMContext {

// conversion functions for pipes
implicit def sameTypeConversionFn[T, U <: GenomicRDD[T, U]](gRdd: U,
rdd: RDD[T]): U = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

// Add ADAM Spark context methods
implicit def sparkContextToADAMContext(sc: SparkContext): ADAMContext = new ADAMContext(sc)

Expand Down
93 changes: 93 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.bdgenomics.adam.rdd

import java.util.concurrent.Executors
import org.apache.avro.generic.IndexedRecord
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.api.java.JavaRDD
Expand All @@ -28,6 +29,7 @@ import org.bdgenomics.adam.models.{
}
import org.bdgenomics.formats.avro.{ Contig, RecordGroupMetadata, Sample }
import org.bdgenomics.utils.cli.SaveArgs
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

private[rdd] class JavaSaveArgs(var outputPath: String,
Expand All @@ -53,6 +55,97 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
replaceRdd(tFn(rdd))
}

/**
* Pipes genomic data to a subprocess that runs in parallel using Spark.
*
* Files are substituted in to the command with a $x syntax. E.g., to invoke
* a command that uses the first file from the files Seq, use $0.
*
* @param cmd Command to run.
* @param files Files to make locally available to the commands being run.
* @param flankSize Number of bases to flank each command invocation by.
* @return Returns a new GenomicRDD of type Y.
*
* @tparam X The type of the record created by the piped command.
* @tparam Y A GenomicRDD containing X's.
*/
def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](cmd: String,
files: Seq[String] = Seq.empty,
flankSize: Int = 0)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {

// TODO: support broadcasting files
assert(files.isEmpty)

// make formatter
val tFormatter: V = tFormatterCompanion.apply(this.asInstanceOf[U])

// make bins
val seqLengths = sequences.records.toSeq.map(rec => (rec.name, rec.length)).toMap
val totalLength = seqLengths.values.sum
val bins = GenomeBins(totalLength / rdd.partitions.size, seqLengths)

// get region covered, expand region by flank size, and tag with bins
val binKeyedRdd = rdd.flatMap(r => {

// get regions and expand
val regions = getReferenceRegions(r).map(_.pad(flankSize))

// get all the bins this record falls into
val recordBins = regions.flatMap(rr => {
(bins.getStartBin(rr) to bins.getEndBin(rr))
}).distinct

// key the record by those bins and return
// TODO: this should key with the reference region corresponding to a bin
recordBins.map(b => (b, r))
})

// repartition yonder our data
// TODO: this should repartition and sort within the partition
val partitionedRdd = binKeyedRdd.partitionBy(ManualRegionPartitioner(bins.numBins))

// call map partitions and pipe
val pipedRdd = partitionedRdd.values
.mapPartitions(iter => {

// split command and run
// TODO: substitute in strings
val pb = new ProcessBuilder(cmd.split(" ").toList)
pb.redirectError(ProcessBuilder.Redirect.INHERIT)
val process = pb.start()
val os = process.getOutputStream()
val is = process.getInputStream()

// wrap in and out formatters
val ifr = new InFormatterRunner[T, U, V](iter, tFormatter, os)
val ofr = new OutFormatterRunner[X, OutFormatter[X]](xFormatter, is)

// launch thread pool and submit formatters
val pool = Executors.newFixedThreadPool(2)
pool.submit(ifr)
val futureIter = pool.submit(ofr)

// wait for process to finish
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new RuntimeException("Piped command %s exited with error code %d.".format(
cmd, exitCode))
}

// shut thread pool
pool.shutdown()

futureIter.get
})

// build the new GenomicRDD and return
convFn(this.asInstanceOf[U], pipedRdd)
}

protected def replaceRdd(newRdd: RDD[T]): U

protected def getReferenceRegions(elem: T): Seq[ReferenceRegion]
Expand Down
55 changes: 55 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/InFormatter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import java.io.OutputStream

private[rdd] class InFormatterRunner[T, U <: GenomicRDD[T, U], V <: InFormatter[T, U, V]](iter: Iterator[T],
formatter: V,
os: OutputStream) extends Runnable {

def run() {
formatter.write(os, iter)
os.flush()
os.close()
}
}

trait InFormatterCompanion[T, U <: GenomicRDD[T, U], V <: InFormatter[T, U, V]] {

def apply(gRdd: U): V
}

/**
* Formats data going into a pipe to an invoked process.
*
* @tparam T The type of records being formatted.
*/
trait InFormatter[T, U <: GenomicRDD[T, U], V <: InFormatter[T, U, V]] extends Serializable {

protected val companion: InFormatterCompanion[T, U, V]

/**
* Writes records from an iterator into an output stream.
*
* @param os An OutputStream connected to a process we are piping to.
* @param iter An iterator of records to write.
*/
def write(os: OutputStream, iter: Iterator[T])
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import java.io.InputStream
import java.util.concurrent.Callable

private[rdd] class OutFormatterRunner[T, U <: OutFormatter[T]](formatter: U,
is: InputStream) extends Callable[Iterator[T]] {

def call(): Iterator[T] = {
formatter.read(is)
}
}

/**
* Deserializes data coming out of a pipe from an invoked process.
*
* @tparam T The type of records being formatted.
*/
trait OutFormatter[T] extends Serializable {

/**
* Reads an iterator of records from an input stream.
*
* @param is The input stream coming from a process to read records from.
* @return Returns an iterator of records that have been read from this
* stream.
*/
def read(is: InputStream): Iterator[T]
}

Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ private[rdd] case class ShuffleRegionJoin(sd: SequenceDictionary, partitionSize:
* @param binSize The size of each bin in nucleotides
* @param seqLengths A map containing the length of each contig
*/
case class GenomeBins(binSize: Long, seqLengths: Map[String, Long]) extends Serializable {
@transient private val names: Seq[String] = seqLengths.keys.toSeq.sortWith(_ < _)
@transient private val lengths: Seq[Long] = names.map(seqLengths(_))
@transient private val parts: Seq[Int] = lengths.map(v => round(ceil(v.toDouble / binSize)).toInt)
@transient private val cumulParts: Seq[Int] = parts.scan(0)(_ + _)
@transient private val contig2cumulParts: Map[String, Int] = Map(names.zip(cumulParts): _*)
private[adam] case class GenomeBins(binSize: Long,
seqLengths: Map[String, Long]) extends Serializable {
private val names: Seq[String] = seqLengths.keys.toSeq.sortWith(_ < _)
private val lengths: Seq[Long] = names.map(seqLengths(_))
private val parts: Seq[Int] = lengths.map(v => round(ceil(v.toDouble / binSize)).toInt)
private val cumulParts: Seq[Int] = parts.scan(0)(_ + _)
private val contig2cumulParts: Map[String, Int] = Map(names.zip(cumulParts): _*)

/**
* The total number of bins induced by this partition.
Expand Down Expand Up @@ -170,10 +171,11 @@ case class GenomeBins(binSize: Long, seqLengths: Map[String, Long]) extends Seri
*
* @param partitions should correspond to the number of bins in the corresponding GenomeBins
*/
private case class ManualRegionPartitioner(partitions: Int) extends Partitioner {
private[rdd] case class ManualRegionPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = key match {
case (r: ReferenceRegion, p: Int) => p
case p: Int => p
case _ => throw new AssertionError("Unexpected key in ManualRegionPartitioner")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools.{ SAMFileHeader, SAMFileWriter }
import java.io.OutputStream
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.models.{
RecordGroupDictionary,
SAMFileHeaderWritable
}
import org.bdgenomics.adam.rdd.{ InFormatter, InFormatterCompanion }
import org.bdgenomics.formats.avro.AlignmentRecord

trait AnySAMInFormatterCompanion[T <: AnySAMInFormatter[T]] extends InFormatterCompanion[AlignmentRecord, AlignmentRecordRDD, T] {
protected def makeFormatter(header: SAMFileHeaderWritable,
recordGroups: RecordGroupDictionary,
converter: AlignmentRecordConverter): T

def apply(gRdd: AlignmentRecordRDD): T = {

// make a converter
val arc = new AlignmentRecordConverter

// build a header and set the sort order
val header = arc.createSAMHeader(gRdd.sequences, gRdd.recordGroups)
header.setSortOrder(SAMFileHeader.SortOrder.coordinate)

// construct the in formatter
makeFormatter(new SAMFileHeaderWritable(header), gRdd.recordGroups, arc)
}
}

private[read] trait AnySAMInFormatter[T <: AnySAMInFormatter[T]] extends InFormatter[AlignmentRecord, AlignmentRecordRDD, T] {

val header: SAMFileHeaderWritable
val recordGroups: RecordGroupDictionary
val converter: AlignmentRecordConverter

protected def makeWriter(os: OutputStream): SAMFileWriter

/**
* Writes alignment records to an output stream in SAM format.
*
* @param os An OutputStream connected to a process we are piping to.
* @param iter An iterator of records to write.
*/
def write(os: OutputStream, iter: Iterator[AlignmentRecord]) {

// create a sam file writer connected to the output stream
val writer = makeWriter(os)

// write the records
iter.foreach(r => {
val samRecord = converter.convert(r, header, recordGroups)
writer.addAlignment(samRecord)
})

// close the writer, else stream may be defective
writer.close()
}
}
Loading

0 comments on commit 293df36

Please sign in to comment.