Skip to content

Commit

Permalink
[ADAM-1141] Add support for saving/loading AlignmentRecords to/from C…
Browse files Browse the repository at this point in the history
…RAM.

Resolves bigdatagenomics#1141. Changes the signature of `AlignmentRecordRDD.saveAsSAM` to take
an `Option[SAMFormat]` parameter, since `asSam` is now no longer a binary
choice.
  • Loading branch information
fnothaft committed Sep 13, 2016
1 parent fd2c27b commit 0b7e03e
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 66 deletions.
Expand Up @@ -79,5 +79,5 @@ object Timers extends Metrics {
val WriteADAMRecord = timer("Write ADAM Record")
val WriteBAMRecord = timer("Write BAM Record")
val WriteSAMRecord = timer("Write SAM Record")

val WriteCRAMRecord = timer("Write CRAM Record")
}
Expand Up @@ -395,7 +395,8 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
val bamFiles = getFsAndFiles(path)
val filteredFiles = bamFiles.filter(p => {
val pPath = p.getName()
pPath.endsWith(".bam") || pPath.endsWith(".sam") || pPath.startsWith("part-")
pPath.endsWith(".bam") || pPath.endsWith(".cram") ||
pPath.endsWith(".sam") || pPath.startsWith("part-")
})

require(filteredFiles.nonEmpty,
Expand Down Expand Up @@ -1337,7 +1338,7 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
* This method can load:
*
* * AlignmentRecords via Parquet (default)
* * SAM/BAM (.sam, .bam)
* * SAM/BAM/CRAM (.sam, .bam, .cram)
* * FASTQ (interleaved, single end, paired end) (.ifq, .fq/.fastq)
* * FASTA (.fa, .fasta)
* * NucleotideContigFragments via Parquet (.contig.adam)
Expand Down Expand Up @@ -1368,8 +1369,9 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadAlignmentRecords.time {

if (filePath.endsWith(".sam") ||
filePath.endsWith(".bam")) {
log.info(s"Loading $filePath as SAM/BAM and converting to AlignmentRecords. Projection is ignored.")
filePath.endsWith(".bam") ||
filePath.endsWith(".cram")) {
log.info(s"Loading $filePath as SAM/BAM/CRAM and converting to AlignmentRecords. Projection is ignored.")
loadBam(filePath, stringency)
} else if (filePath.endsWith(".ifq")) {
log.info(s"Loading $filePath as interleaved FASTQ and converting to AlignmentRecords. Projection is ignored.")
Expand Down Expand Up @@ -1398,7 +1400,7 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
* This method can load:
*
* * Fragments via Parquet (default)
* * SAM/BAM (.sam, .bam)
* * SAM/BAM/CRAM (.sam, .bam, .cram)
* * FASTQ (interleaved only --> .ifq)
* * Autodetects AlignmentRecord as Parquet with .reads.adam extension.
*
Expand All @@ -1407,7 +1409,8 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
*/
def loadFragments(filePath: String): FragmentRDD = LoadFragments.time {
if (filePath.endsWith(".sam") ||
filePath.endsWith(".bam")) {
filePath.endsWith(".bam") ||
filePath.endsWith(".cram")) {
log.info(s"Loading $filePath as SAM/BAM and converting to Fragments.")
loadBam(filePath).toFragments
} else if (filePath.endsWith(".reads.adam")) {
Expand Down
11 changes: 11 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala
Expand Up @@ -18,6 +18,8 @@
package org.bdgenomics.adam.rdd

import htsjdk.samtools.util.BlockCompressedStreamConstants
import htsjdk.samtools.cram.build.CramIO
import htsjdk.samtools.cram.common.CramVersions
import java.io.{ InputStream, OutputStream }
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.bdgenomics.utils.misc.Logging
Expand All @@ -38,15 +40,22 @@ private[rdd] object FileMerger extends Logging {
* been written.
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
* @param writeCramEOF If true, we write CRAM's EOF signifier.
* @param bufferSize The size in bytes of the buffer used for copying.
*/
def mergeFiles(fs: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
bufferSize: Int = 1024) {

require(bufferSize > 0,
"Cannot have buffer size < 1. %d was provided.".format(bufferSize))
require(!(writeEmptyGzipBlock && writeCramEOF),
"writeEmptyGzipBlock and writeCramEOF are mutually exclusive.")

// get a list of all of the files in the tail file
val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath)))
.toSeq
Expand Down Expand Up @@ -128,6 +137,8 @@ private[rdd] object FileMerger extends Logging {
// finish the file off
if (writeEmptyGzipBlock) {
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
} else if (writeCramEOF) {
CramIO.issueEOF(CramVersions.DEFAULT_CRAM_VERSION, os)
}

// flush and close the output stream
Expand Down
@@ -0,0 +1,83 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools.SAMFileHeader
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringCRAMOutputFormat,
KeyIgnoringCRAMRecordWriter,
SAMRecordWritable
}

class ADAMCRAMOutputFormat[K]
extends KeyIgnoringCRAMOutputFormat[K] with Serializable {

setWriteHeader(true)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = context.getConfiguration()

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringCRAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
true,
context)
}
}

class InstrumentedADAMCRAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteCRAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMCRAMOutputFormat[K]]
}

class ADAMCRAMOutputFormatHeaderLess[K]
extends KeyIgnoringCRAMOutputFormat[K] with Serializable {

setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = context.getConfiguration()

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringCRAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMCRAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteCRAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMCRAMOutputFormatHeaderLess[K]]
}

0 comments on commit 0b7e03e

Please sign in to comment.