Skip to content

Commit

Permalink
[ADAM-1141] Add support for saving/loading AlignmentRecords to/from C…
Browse files Browse the repository at this point in the history
…RAM.

Resolves #1141. Changes the signature of `AlignmentRecordRDD.saveAsSAM` to take
an `Option[SAMFormat]` parameter, since `asSam` is now no longer a binary
choice.
  • Loading branch information
fnothaft committed Sep 1, 2016
1 parent 2158d4b commit e18db5a
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 56 deletions.
Expand Up @@ -79,5 +79,5 @@ object Timers extends Metrics {
val WriteADAMRecord = timer("Write ADAM Record")
val WriteBAMRecord = timer("Write BAM Record")
val WriteSAMRecord = timer("Write SAM Record")

val WriteCRAMRecord = timer("Write CRAM Record")
}
Expand Up @@ -407,7 +407,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val bamFiles = getFsAndFiles(path)
val filteredFiles = bamFiles.filter(p => {
val pPath = p.getName()
pPath.endsWith(".bam") || pPath.endsWith(".sam") || pPath.startsWith("part-")
pPath.endsWith(".bam") || pPath.endsWith(".cram") ||
pPath.endsWith(".sam") || pPath.startsWith("part-")
})

require(filteredFiles.nonEmpty,
Expand Down
@@ -0,0 +1,83 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools.SAMFileHeader
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringCRAMOutputFormat,
KeyIgnoringCRAMRecordWriter,
SAMRecordWritable
}

class ADAMCRAMOutputFormat[K]
extends KeyIgnoringCRAMOutputFormat[K] with Serializable {

setWriteHeader(true)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = context.getConfiguration()

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringCRAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
true,
context)
}
}

class InstrumentedADAMCRAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteCRAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMCRAMOutputFormat[K]]
}

class ADAMCRAMOutputFormatHeaderLess[K]
extends KeyIgnoringCRAMOutputFormat[K] with Serializable {

setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = context.getConfiguration()

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringCRAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMCRAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteCRAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMCRAMOutputFormatHeaderLess[K]]
}
Expand Up @@ -23,9 +23,12 @@ import htsjdk.samtools.util.{
BlockCompressedOutputStream,
BlockCompressedStreamConstants
}
import java.io.{ InputStream, OutputStream, StringWriter, Writer }
import htsjdk.samtools._
import htsjdk.samtools.cram.ref.ReferenceSource
import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream }
import java.io.{ InputStream, OutputStream, StringWriter, Writer }
import java.net.URI
import java.nio.file.Paths
import org.apache.avro.Schema
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.io.LongWritable
Expand Down Expand Up @@ -57,7 +60,7 @@ import org.bdgenomics.adam.util.MapTools
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.misc.Logging
import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable }
import org.seqdoop.hadoop_bam._
import scala.language.implicitConversions
import scala.math.{ abs, min }

Expand Down Expand Up @@ -130,7 +133,8 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
}

/**
* Saves this RDD as BAM or SAM if the extension provided is .sam or .bam.
* Saves this RDD as BAM, CRAM, or SAM if the extension provided is .sam, .cram,
* or .bam.
*
* @param args Arguments defining where to save the file.
* @param isSorted True if input data is sorted. Sets the ordering in the SAM
Expand All @@ -141,20 +145,12 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
private[rdd] def maybeSaveBam(args: ADAMSaveAnyArgs,
isSorted: Boolean = false): Boolean = {

if (args.outputPath.endsWith(".sam")) {
log.info("Saving data in SAM format")
saveAsSam(
args.outputPath,
asSingleFile = args.asSingleFile,
isSorted = isSorted
)
true
} else if (args.outputPath.endsWith(".bam")) {
log.info("Saving data in BAM format")
if (args.outputPath.endsWith(".sam") ||
args.outputPath.endsWith(".bam") ||
args.outputPath.endsWith(".cram")) {
log.info("Saving data in SAM/BAM/CRAM format")
saveAsSam(
args.outputPath,
asSam = false,
asSingleFile = args.asSingleFile,
isSorted = isSorted
)
true
Expand Down Expand Up @@ -295,16 +291,19 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
* Saves an RDD of ADAM read data into the SAM/BAM format.
*
* @param filePath Path to save files to.
* @param asSam Selects whether to save as SAM or BAM. The default value is true (save in SAM format).
* @param asType Selects whether to save as SAM, BAM, or CRAM. The default
* value is None, which means the file type is inferred from the extension.
* @param asSingleFile If true, saves output as a single file.
* @param isSorted If the output is sorted, this will modify the header.
*/
def saveAsSam(
filePath: String,
asSam: Boolean = true,
asType: Option[SAMFormat] = None,
asSingleFile: Boolean = false,
isSorted: Boolean = false): Unit = SAMSave.time {

val fileType = asType.getOrElse(SAMFormat.inferFromFilePath(filePath))

// convert the records
val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) =
convertToSam(isSorted)
Expand All @@ -322,9 +321,9 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
val fs = headPath.getFileSystem(rdd.context.hadoopConfiguration)

// TIL: sam and bam are written in completely different ways!
if (asSam) {
if (fileType == SAMFormat.SAM) {
SAMHeaderWriter.writeHeader(fs, headPath, header)
} else {
} else if (fileType == SAMFormat.BAM) {

// get an output stream
val os = fs.create(headPath)
Expand Down Expand Up @@ -354,43 +353,78 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
compressedOut.flush()
os.flush()
os.close()
} else {
// from https://samtools.github.io/hts-specs/CRAMv3.pdf
// cram has a variety of addtional constraints:
//
// * file definition has 20 byte identifier field
// * header must have SO:pos
// * sequence records must have attached MD5s (we don't support
// embedding reference sequences)
//
// we'll defer the writing to the cram container stream writer, and will
// do validation here

require(isSorted, "To save as CRAM, input must be sorted.")
require(sequences.records.forall(_.md5.isDefined),
"To save as CRAM, all sequences must have an attached MD5. See %s".format(
sequences))
val refSource = conf.get(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY)
require(refSource != null,
"To save as CRAM, the reference source must be set in your config as %s.".format(
CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY))

// get an output stream
val os = fs.create(headPath)
.asInstanceOf[OutputStream]

// create a cram container writer
val csw = new CRAMContainerStreamWriter(os, null, // null -> do not write index
new ReferenceSource(Paths.get(URI.create(refSource))),
header,
filePath) // write filepath as id

// write the header
csw.writeHeader(header)

// finish the cram container, but don't write EOF
csw.finish(false)

// flush and close the output stream
os.flush()
os.close()
}

// set path to header file
conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString)

if (!asSingleFile) {
if (asSam) {
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
conf
)
} else {
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
conf
)
val headeredOutputFormat = fileType match {
case SAMFormat.SAM => classOf[InstrumentedADAMSAMOutputFormat[LongWritable]]
case SAMFormat.BAM => classOf[InstrumentedADAMBAMOutputFormat[LongWritable]]
case SAMFormat.CRAM => classOf[InstrumentedADAMCRAMOutputFormat[LongWritable]]
}
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
headeredOutputFormat,
conf
)

// clean up the header after writing
fs.delete(headPath, true)
} else {
log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)")
log.info(s"Writing single ${fileType} file (not Hadoop-style directory)")

val tailPath = new Path(filePath + "_tail")
val outputPath = new Path(filePath)

// set up output format
val headerLessOutputFormat = if (asSam) {
classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]]
} else {
classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]]
val headerLessOutputFormat = fileType match {
case SAMFormat.SAM => classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]]
case SAMFormat.BAM => classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]]
case SAMFormat.CRAM => classOf[InstrumentedADAMCRAMOutputFormatHeaderLess[LongWritable]]
}

// save rdd
Expand All @@ -406,25 +440,26 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
outputPath,
tailPath,
optHeaderPath = Some(headPath),
writeEmptyGzipBlock = !asSam)
writeEmptyGzipBlock = (fileType != SAMFormat.SAM))
}
}

/**
* Saves this RDD to disk as a SAM/BAM file.
* Saves this RDD to disk as a SAM/BAM/CRAM file.
*
* @param filePath Path to save the file at.
* @param asSam If true, saves as SAM. If false, saves as BAM.
* @param asType The SAMFormat to save as. If left null, we will infer the
* format from the file extension.
* @param asSingleFile If true, saves output as a single file.
* @param isSorted If the output is sorted, this will modify the header.
*/
def saveAsSam(
filePath: java.lang.String,
asSam: java.lang.Boolean,
asType: SAMFormat,
asSingleFile: java.lang.Boolean,
isSorted: java.lang.Boolean) {
saveAsSam(filePath,
asSam = asSam,
asType = Option(asType),
asSingleFile = asSingleFile,
isSorted = isSorted)
}
Expand Down

0 comments on commit e18db5a

Please sign in to comment.