Skip to content

Commit

Permalink
[ADAM-916] New strategy for writing header.
Browse files Browse the repository at this point in the history
Resolves #916. Makes several modifications that should eliminate the header
attach issue when writing back to SAM/BAM:

* Writes the SAM/BAM header as a single file.
* Instead of trying to attach the SAM/BAM header to the output format via a
  singleton object, we pass the path to the SAM/BAM header file via the Hadoop
  configuration.
* The output format reads the header from HDFS when creating the record writer.
* At the end, once we've written the full RDD and the header file, we merge all
  via Hadoop's FsUtil.
  • Loading branch information
fnothaft committed Jan 12, 2016
1 parent 4415b04 commit b8ec48e
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 84 deletions.
Expand Up @@ -17,11 +17,17 @@
*/
package org.bdgenomics.adam.rdd.read

import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringBAMOutputFormat }
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.apache.hadoop.mapreduce.OutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringBAMOutputFormat,
KeyIgnoringBAMRecordWriter,
SAMRecordWritable
}

object ADAMBAMOutputFormat extends Serializable {

Expand Down Expand Up @@ -76,11 +82,26 @@ class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
class ADAMBAMOutputFormatHeaderLess[K]
extends KeyIgnoringBAMOutputFormat[K] with Serializable {

setSAMHeader(ADAMBAMOutputFormat.getHeader)
setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMBAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteBAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMBAMOutputFormatHeaderLess[K]]
}
}
Expand Up @@ -17,11 +17,18 @@
*/
package org.bdgenomics.adam.rdd.read

import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringAnySAMOutputFormat, SAMFormat }
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.apache.hadoop.mapreduce.OutputFormat
import org.seqdoop.hadoop_bam.{
KeyIgnoringAnySAMOutputFormat,
KeyIgnoringSAMRecordWriter,
SAMFormat,
SAMRecordWritable
}

object ADAMSAMOutputFormat extends Serializable {

Expand Down Expand Up @@ -76,9 +83,23 @@ class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
class ADAMSAMOutputFormatHeaderLess[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {

setSAMHeader(ADAMSAMOutputFormat.getHeader)
setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMSAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
Expand Down
Expand Up @@ -17,8 +17,9 @@
*/
package org.bdgenomics.adam.rdd.read

import java.io.{ OutputStream, StringWriter }
import htsjdk.samtools.{ SAMFileHeader, SAMTextHeaderCodec, SAMTextWriter, ValidationStringency }
import java.io.{ OutputStream, StringWriter, Writer }
import htsjdk.samtools._
import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream }
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.specific.{ SpecificDatumWriter, SpecificRecordBase }
Expand All @@ -28,7 +29,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.{ PartitionPruningRDD, RDD }
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus.{ ConsensusGenerator, ConsensusGeneratorFromReads }
import org.bdgenomics.adam.converters.AlignmentRecordConverter
Expand All @@ -42,9 +43,8 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.util.MapTools
import org.bdgenomics.formats.avro._
import org.seqdoop.hadoop_bam.SAMRecordWritable
import scala.reflect.ClassTag

import scala.language.implicitConversions
import scala.reflect.ClassTag

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
extends ADAMSequenceDictionaryRDDAggregator[AlignmentRecord](rdd) {
Expand Down Expand Up @@ -297,7 +297,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
rgd: RecordGroupDictionary,
asSam: Boolean = true,
asSingleFile: Boolean = false,
isSorted: Boolean = false) = SAMSave.time {
isSorted: Boolean = false): Unit = SAMSave.time {

// if the file is sorted, make sure the sequence dictionary is sorted
val sdFinal = if (isSorted) {
Expand All @@ -314,51 +314,51 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
// add keys to our records
val withKey = convertRecords.keyBy(v => new LongWritable(v.get.getAlignmentStart))

val bcastHeader = rdd.context.broadcast(header)
val mp = rdd.mapPartitionsWithIndex((idx, iter) => {
log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx")
val header = bcastHeader.value
synchronized {
// perform map partition call to ensure that the SAM/BAM header is set on all
// nodes in the cluster; see:
// https://github.com/bigdatagenomics/adam/issues/353,
// https://github.com/bigdatagenomics/adam/issues/676

asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info(s"Set SAM header for partition $idx")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info(s"Set BAM header for partition $idx")
}
}
Iterator[Int]()
}).count()

// force value check, ensure that computation happens
if (mp != 0) {
log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.")
}

// attach header to output format
asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info("Set SAM header on driver")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info("Set BAM header on driver")
}

// write file to disk
val conf = rdd.context.hadoopConfiguration

if (!asSingleFile) {
val bcastHeader = rdd.context.broadcast(header)
val mp = rdd.mapPartitionsWithIndex((idx, iter) => {
log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx")
val header = bcastHeader.value
synchronized {
// perform map partition call to ensure that the SAM/BAM header is set on all
// nodes in the cluster; see:
// https://github.com/bigdatagenomics/adam/issues/353,
// https://github.com/bigdatagenomics/adam/issues/676

asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info(s"Set SAM header for partition $idx")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info(s"Set BAM header for partition $idx")
}
}
Iterator[Int]()
}).count()

// force value check, ensure that computation happens
if (mp != 0) {
log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.")
}

// attach header to output format
asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info("Set SAM header on driver")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info("Set BAM header on driver")
}

asSam match {
case true =>
withKey.saveAsNewAPIHadoopFile(
Expand All @@ -380,50 +380,106 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
}
} else {
log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)")
val (outputFormat, headerLessOutputFormat) = asSam match {
case true =>
(
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]]
)

case false =>
(
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]]
)
}
val fs = FileSystem.get(conf)

val headPath = filePath + "_head"
val tailPath = filePath + "_tail"

PartitionPruningRDD.create(withKey, i => { i == 0 }).saveAsNewAPIHadoopFile(
headPath,
classOf[LongWritable],
classOf[SAMRecordWritable],
outputFormat,
conf
)
PartitionPruningRDD.create(withKey, i => { i != 0 }).saveAsNewAPIHadoopFile(
// get an output stream
val os = fs.create(new Path(headPath))
.asInstanceOf[OutputStream]

// TIL: sam and bam are written in completely different ways!
if (asSam) {
val sw: Writer = new StringWriter()
val stw = new SAMTextWriter(os)
stw.setHeader(header)
stw.close()
} else {
// create htsjdk specific streams for writing the bam header
val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null)
val binaryCodec = new BinaryCodec(compressedOut);

// write a bam header - cribbed from Hadoop-BAM
binaryCodec.writeBytes("BAM\001".getBytes())
val sw: Writer = new StringWriter()
new SAMTextHeaderCodec().encode(sw, header)
binaryCodec.writeString(sw.toString, true, false)

// write sequence dictionary
val ssd = header.getSequenceDictionary()
binaryCodec.writeInt(ssd.size())
ssd.getSequences
.toList
.foreach(r => {
binaryCodec.writeString(r.getSequenceName(), true, true)
binaryCodec.writeInt(r.getSequenceLength())
})

// flush and close all the streams
compressedOut.flush()
compressedOut.close()
}

// more flushing and closing
os.flush()
os.close()

// set path to header file
conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString)

// set up output format
val headerLessOutputFormat = if (asSam) {
classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]]
} else {
classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]]
}

// save rdd
withKey.saveAsNewAPIHadoopFile(
tailPath,
classOf[LongWritable],
classOf[SAMRecordWritable],
headerLessOutputFormat,
conf
)

val fs = FileSystem.get(conf)
// get a list of all of the files in the tail file
val tailFiles = fs.globStatus("%s/part-*".format(tailPath))
.toSeq
.map(_.getPath)
.sortBy(_.getName)
.toArray

fs.listStatus(headPath)
.find(_.getPath.getName.startsWith("part-r"))
.map(fileStatus => FileUtil.copy(fs, fileStatus.getPath, fs, tailPath + "/head", false, false, conf))
fs.delete(headPath, true)
// try to merge this via the fs api, which should guarantee ordering...?
// however! this function is not implemented on all platforms, hence the try.
try {

FileUtil.copyMerge(fs, tailPath, fs, filePath, true, conf, null)
fs.delete(tailPath, true)
// concat files together
fs.concat(headPath, tailFiles)

}
// move concatted file
fs.rename(headPath, new Path(filePath))

// delete tail files
fs.delete(tailPath, true)
} catch {
case e: Throwable => {

log.warn("Caught exception when merging via Hadoop FileSystem API:\n%s".format(e))
log.warn("Retrying via Hadoop FileUtil, which will degrade performance.")

// copy and merge files
FileUtil.copy(fs, headPath, fs, tailPath + "/head", false, false, conf)
FileUtil.copyMerge(fs, tailPath, fs, filePath, true, conf, null)

// delete temp files
fs.delete(headPath, true)
fs.delete(tailPath, true)
}
}
}
}

implicit def str2Path(str: String): Path = new Path(str)
Expand Down
Expand Up @@ -298,7 +298,7 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
checkFiles(resourcePath("ordered.sam"), actualSortedPath)
}

sparkTest("write single sam file back") {
def testBQSR(asSam: Boolean, filename: String) {
val inputPath = resourcePath("bqsr1.sam")
val tempFile = Files.createTempDirectory("bqsr1")
val rRdd = sc.loadAlignments(inputPath)
Expand All @@ -311,7 +311,7 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
rgd,
asSam = true,
asSingleFile = true)
val rdd2 = sc.loadAlignments(tempFile.toAbsolutePath.toString + "/bqsr1.sam").rdd
val rdd2 = sc.loadAlignments("%s/%s".format(tempFile.toAbsolutePath.toString, filename)).rdd
rdd2.cache()

val (fsp1, fsf1) = rdd.adamFlagStat()
Expand Down Expand Up @@ -378,4 +378,12 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
}
})
}

sparkTest("write single sam file back") {
testBQSR(true, "bqsr1.sam")
}

sparkTest("write single bam file back") {
testBQSR(true, "bqsr1.bam")
}
}

0 comments on commit b8ec48e

Please sign in to comment.