From e18db5ac5ea73ecb96a9c774f17cd89e4af43ce3 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Thu, 1 Sep 2016 14:33:57 -0700 Subject: [PATCH] [ADAM-1141] Add support for saving/loading AlignmentRecords to/from CRAM. Resolves #1141. Changes the signature of `AlignmentRecordRDD.saveAsSAM` to take an `Option[SAMFormat]` parameter, since `asSam` is now no longer a binary choice. --- .../adam/instrumentation/Timers.scala | 2 +- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 3 +- .../adam/rdd/read/ADAMCRAMOutputFormat.scala | 83 ++++++++++++ .../adam/rdd/read/AlignmentRecordRDD.scala | 125 +++++++++++------- .../rdd/read/AlignmentRecordRDDSuite.scala | 29 ++-- 5 files changed, 186 insertions(+), 56 deletions(-) create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMCRAMOutputFormat.scala diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala index 91874c6f67..4b34b5c977 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala @@ -79,5 +79,5 @@ object Timers extends Metrics { val WriteADAMRecord = timer("Write ADAM Record") val WriteBAMRecord = timer("Write BAM Record") val WriteSAMRecord = timer("Write SAM Record") - + val WriteCRAMRecord = timer("Write CRAM Record") } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index f8d28fc813..632f454623 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -407,7 +407,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val bamFiles = getFsAndFiles(path) val filteredFiles = bamFiles.filter(p => { val pPath = p.getName() - pPath.endsWith(".bam") || pPath.endsWith(".sam") || pPath.startsWith("part-") + pPath.endsWith(".bam") || pPath.endsWith(".cram") || + pPath.endsWith(".sam") || pPath.startsWith("part-") }) require(filteredFiles.nonEmpty, diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMCRAMOutputFormat.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMCRAMOutputFormat.scala new file mode 100644 index 0000000000..6cffed1ba0 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMCRAMOutputFormat.scala @@ -0,0 +1,83 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read + +import htsjdk.samtools.SAMFileHeader +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext } +import org.apache.spark.rdd.InstrumentedOutputFormat +import org.bdgenomics.adam.instrumentation.Timers +import org.seqdoop.hadoop_bam.{ + KeyIgnoringCRAMOutputFormat, + KeyIgnoringCRAMRecordWriter, + SAMRecordWritable +} + +class ADAMCRAMOutputFormat[K] + extends KeyIgnoringCRAMOutputFormat[K] with Serializable { + + setWriteHeader(true) + + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = { + val conf = context.getConfiguration() + + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path")) + + // read the header file + readSAMHeaderFrom(path, conf) + + // now that we have the header set, we need to make a record reader + return new KeyIgnoringCRAMRecordWriter[K](getDefaultWorkFile(context, ""), + header, + true, + context) + } +} + +class InstrumentedADAMCRAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] { + override def timerName(): String = Timers.WriteCRAMRecord.timerName + override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMCRAMOutputFormat[K]] +} + +class ADAMCRAMOutputFormatHeaderLess[K] + extends KeyIgnoringCRAMOutputFormat[K] with Serializable { + + setWriteHeader(false) + + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = { + val conf = context.getConfiguration() + + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path")) + + // read the header file + readSAMHeaderFrom(path, conf) + + // now that we have the header set, we need to make a record reader + return new KeyIgnoringCRAMRecordWriter[K](getDefaultWorkFile(context, ""), + header, + false, + context) + } +} + +class InstrumentedADAMCRAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] { + override def timerName(): String = Timers.WriteCRAMRecord.timerName + override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMCRAMOutputFormatHeaderLess[K]] +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index bb3a2c209e..d1c86a2665 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -23,9 +23,12 @@ import htsjdk.samtools.util.{ BlockCompressedOutputStream, BlockCompressedStreamConstants } -import java.io.{ InputStream, OutputStream, StringWriter, Writer } import htsjdk.samtools._ +import htsjdk.samtools.cram.ref.ReferenceSource import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream } +import java.io.{ InputStream, OutputStream, StringWriter, Writer } +import java.net.URI +import java.nio.file.Paths import org.apache.avro.Schema import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.hadoop.io.LongWritable @@ -57,7 +60,7 @@ import org.bdgenomics.adam.util.MapTools import org.bdgenomics.adam.rdd.fragment.FragmentRDD import org.bdgenomics.formats.avro._ import org.bdgenomics.utils.misc.Logging -import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable } +import org.seqdoop.hadoop_bam._ import scala.language.implicitConversions import scala.math.{ abs, min } @@ -130,7 +133,8 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, } /** - * Saves this RDD as BAM or SAM if the extension provided is .sam or .bam. + * Saves this RDD as BAM, CRAM, or SAM if the extension provided is .sam, .cram, + * or .bam. * * @param args Arguments defining where to save the file. * @param isSorted True if input data is sorted. Sets the ordering in the SAM @@ -141,20 +145,12 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, private[rdd] def maybeSaveBam(args: ADAMSaveAnyArgs, isSorted: Boolean = false): Boolean = { - if (args.outputPath.endsWith(".sam")) { - log.info("Saving data in SAM format") - saveAsSam( - args.outputPath, - asSingleFile = args.asSingleFile, - isSorted = isSorted - ) - true - } else if (args.outputPath.endsWith(".bam")) { - log.info("Saving data in BAM format") + if (args.outputPath.endsWith(".sam") || + args.outputPath.endsWith(".bam") || + args.outputPath.endsWith(".cram")) { + log.info("Saving data in SAM/BAM/CRAM format") saveAsSam( args.outputPath, - asSam = false, - asSingleFile = args.asSingleFile, isSorted = isSorted ) true @@ -295,16 +291,19 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, * Saves an RDD of ADAM read data into the SAM/BAM format. * * @param filePath Path to save files to. - * @param asSam Selects whether to save as SAM or BAM. The default value is true (save in SAM format). + * @param asType Selects whether to save as SAM, BAM, or CRAM. The default + * value is None, which means the file type is inferred from the extension. * @param asSingleFile If true, saves output as a single file. * @param isSorted If the output is sorted, this will modify the header. */ def saveAsSam( filePath: String, - asSam: Boolean = true, + asType: Option[SAMFormat] = None, asSingleFile: Boolean = false, isSorted: Boolean = false): Unit = SAMSave.time { + val fileType = asType.getOrElse(SAMFormat.inferFromFilePath(filePath)) + // convert the records val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) = convertToSam(isSorted) @@ -322,9 +321,9 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, val fs = headPath.getFileSystem(rdd.context.hadoopConfiguration) // TIL: sam and bam are written in completely different ways! - if (asSam) { + if (fileType == SAMFormat.SAM) { SAMHeaderWriter.writeHeader(fs, headPath, header) - } else { + } else if (fileType == SAMFormat.BAM) { // get an output stream val os = fs.create(headPath) @@ -354,43 +353,78 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, compressedOut.flush() os.flush() os.close() + } else { + // from https://samtools.github.io/hts-specs/CRAMv3.pdf + // cram has a variety of addtional constraints: + // + // * file definition has 20 byte identifier field + // * header must have SO:pos + // * sequence records must have attached MD5s (we don't support + // embedding reference sequences) + // + // we'll defer the writing to the cram container stream writer, and will + // do validation here + + require(isSorted, "To save as CRAM, input must be sorted.") + require(sequences.records.forall(_.md5.isDefined), + "To save as CRAM, all sequences must have an attached MD5. See %s".format( + sequences)) + val refSource = conf.get(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY) + require(refSource != null, + "To save as CRAM, the reference source must be set in your config as %s.".format( + CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY)) + + // get an output stream + val os = fs.create(headPath) + .asInstanceOf[OutputStream] + + // create a cram container writer + val csw = new CRAMContainerStreamWriter(os, null, // null -> do not write index + new ReferenceSource(Paths.get(URI.create(refSource))), + header, + filePath) // write filepath as id + + // write the header + csw.writeHeader(header) + + // finish the cram container, but don't write EOF + csw.finish(false) + + // flush and close the output stream + os.flush() + os.close() } // set path to header file conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString) if (!asSingleFile) { - if (asSam) { - withKey.saveAsNewAPIHadoopFile( - filePath, - classOf[LongWritable], - classOf[SAMRecordWritable], - classOf[InstrumentedADAMSAMOutputFormat[LongWritable]], - conf - ) - } else { - withKey.saveAsNewAPIHadoopFile( - filePath, - classOf[LongWritable], - classOf[SAMRecordWritable], - classOf[InstrumentedADAMBAMOutputFormat[LongWritable]], - conf - ) + val headeredOutputFormat = fileType match { + case SAMFormat.SAM => classOf[InstrumentedADAMSAMOutputFormat[LongWritable]] + case SAMFormat.BAM => classOf[InstrumentedADAMBAMOutputFormat[LongWritable]] + case SAMFormat.CRAM => classOf[InstrumentedADAMCRAMOutputFormat[LongWritable]] } + withKey.saveAsNewAPIHadoopFile( + filePath, + classOf[LongWritable], + classOf[SAMRecordWritable], + headeredOutputFormat, + conf + ) // clean up the header after writing fs.delete(headPath, true) } else { - log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)") + log.info(s"Writing single ${fileType} file (not Hadoop-style directory)") val tailPath = new Path(filePath + "_tail") val outputPath = new Path(filePath) // set up output format - val headerLessOutputFormat = if (asSam) { - classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]] - } else { - classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]] + val headerLessOutputFormat = fileType match { + case SAMFormat.SAM => classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]] + case SAMFormat.BAM => classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]] + case SAMFormat.CRAM => classOf[InstrumentedADAMCRAMOutputFormatHeaderLess[LongWritable]] } // save rdd @@ -406,25 +440,26 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, outputPath, tailPath, optHeaderPath = Some(headPath), - writeEmptyGzipBlock = !asSam) + writeEmptyGzipBlock = (fileType != SAMFormat.SAM)) } } /** - * Saves this RDD to disk as a SAM/BAM file. + * Saves this RDD to disk as a SAM/BAM/CRAM file. * * @param filePath Path to save the file at. - * @param asSam If true, saves as SAM. If false, saves as BAM. + * @param asType The SAMFormat to save as. If left null, we will infer the + * format from the file extension. * @param asSingleFile If true, saves output as a single file. * @param isSorted If the output is sorted, this will modify the header. */ def saveAsSam( filePath: java.lang.String, - asSam: java.lang.Boolean, + asType: SAMFormat, asSingleFile: java.lang.Boolean, isSorted: java.lang.Boolean) { saveAsSam(filePath, - asSam = asSam, + asType = Option(asType), asSingleFile = asSingleFile, isSorted = isSorted) } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 38d1e3dccc..48ee948e9a 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -32,7 +32,7 @@ import org.bdgenomics.adam.rdd.TestSaveArgs import org.bdgenomics.adam.rdd.features.CoverageRDD import org.bdgenomics.adam.util.ADAMFunSuite import org.bdgenomics.formats.avro._ - +import org.seqdoop.hadoop_bam.SAMFormat import scala.util.Random private object SequenceIndexWithReadOrdering extends Ordering[((Int, Long), (AlignmentRecord, Int))] { @@ -158,7 +158,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val tempFile = Files.createTempDirectory("reads12") ardd.saveAsSam(tempFile.toAbsolutePath.toString + "/reads12.sam", - asSam = true) + asType = Some(SAMFormat.SAM)) val rdd12B = sc.loadBam(tempFile.toAbsolutePath.toString + "/reads12.sam/part-r-00000") @@ -315,8 +315,9 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val rRdd = sc.loadAlignments(inputPath) rRdd.rdd.cache() rRdd.saveAsSam("%s/%s".format(tempFile.toAbsolutePath.toString, filename), - asSam = true, + asType = if (asSam) Some(SAMFormat.SAM) else Some(SAMFormat.BAM), asSingleFile = true) + println("Saved file to %s/%s".format(tempFile.toAbsolutePath.toString, filename)) val rdd2 = sc.loadAlignments("%s/%s".format(tempFile.toAbsolutePath.toString, filename)) rdd2.rdd.cache() @@ -453,7 +454,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val inputPath = resourcePath("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) val outputPath = tmpLocation(".sam") - reads.saveAsSam(outputPath, true) + reads.saveAsSam(outputPath, asType = Some(SAMFormat.SAM)) assert(new File(outputPath).exists()) } @@ -461,7 +462,9 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val inputPath = resourcePath("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) val outputPath = tmpLocation(".sam") - reads.saveAsSam(outputPath, true, true) + reads.saveAsSam(outputPath, + asType = Some(SAMFormat.SAM), + asSingleFile = true) assert(new File(outputPath).exists()) } @@ -469,7 +472,10 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val inputPath = resourcePath("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) val outputPath = tmpLocation(".sam") - reads.saveAsSam(outputPath, true, true, true) + reads.saveAsSam(outputPath, + asType = Some(SAMFormat.SAM), + asSingleFile = true, + isSorted = true) assert(new File(outputPath).exists()) } @@ -477,7 +483,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val inputPath = resourcePath("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) val outputPath = tmpLocation(".bam") - reads.saveAsSam(outputPath, false) + reads.saveAsSam(outputPath, asType = Some(SAMFormat.BAM)) assert(new File(outputPath).exists()) } @@ -485,7 +491,9 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val inputPath = resourcePath("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) val outputPath = tmpLocation(".bam") - reads.saveAsSam(outputPath, false, true) + reads.saveAsSam(outputPath, + asType = Some(SAMFormat.BAM), + asSingleFile = true) assert(new File(outputPath).exists()) } @@ -493,7 +501,10 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val inputPath = resourcePath("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) val outputPath = tmpLocation(".bam") - reads.saveAsSam(outputPath, false, true, true) + reads.saveAsSam(outputPath, + asType = Some(SAMFormat.BAM), + asSingleFile = true, + isSorted = true) assert(new File(outputPath).exists()) }