Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single file save from #733, rebased #901

Merged
merged 2 commits into from Dec 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -73,3 +73,14 @@ class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMBAMOutputFormat[K]]
}

class ADAMBAMOutputFormatHeaderLess[K]
extends KeyIgnoringBAMOutputFormat[K] with Serializable {

setSAMHeader(ADAMBAMOutputFormat.getHeader)
setWriteHeader(false)
}

class InstrumentedADAMBAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteBAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMBAMOutputFormatHeaderLess[K]]
}
Expand Up @@ -72,3 +72,16 @@ class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
override def timerName(): String = Timers.WriteSAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMSAMOutputFormat[K]]
}

class ADAMSAMOutputFormatHeaderLess[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {

setSAMHeader(ADAMSAMOutputFormat.getHeader)
setWriteHeader(false)

}

class InstrumentedADAMSAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteSAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMSAMOutputFormatHeaderLess[K]]
}
Expand Up @@ -19,12 +19,11 @@ package org.bdgenomics.adam.rdd.read

import java.io.StringWriter
import htsjdk.samtools.{ SAMFileHeader, SAMTextHeaderCodec, SAMTextWriter, ValidationStringency }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.fs.{ Path, FileUtil, FileSystem }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{ PartitionPruningRDD, RDD }
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus.{ ConsensusGenerator, ConsensusGeneratorFromReads }
import org.bdgenomics.adam.converters.AlignmentRecordConverter
Expand All @@ -39,6 +38,8 @@ import org.bdgenomics.adam.util.MapTools
import org.bdgenomics.formats.avro._
import org.seqdoop.hadoop_bam.SAMRecordWritable

import scala.language.implicitConversions

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
extends ADAMSequenceDictionaryRDDAggregator[AlignmentRecord](rdd) {

Expand Down Expand Up @@ -132,9 +133,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) = rdd.adamConvertToSAM(isSorted)

// add keys to our records
val withKey =
if (asSingleFile) convertRecords.keyBy(v => new LongWritable(v.get.getAlignmentStart)).coalesce(1)
else convertRecords.keyBy(v => new LongWritable(v.get.getAlignmentStart))
val withKey = convertRecords.keyBy(v => new LongWritable(v.get.getAlignmentStart))

val bcastHeader = rdd.context.broadcast(header)
val mp = rdd.mapPartitionsWithIndex((idx, iter) => {
Expand Down Expand Up @@ -179,36 +178,73 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

// write file to disk
val conf = rdd.context.hadoopConfiguration
asSam match {
case true =>
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
conf
)
case false =>
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
conf
)
}
if (asSingleFile) {

if (!asSingleFile) {
asSam match {
case true =>
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
conf
)
case false =>
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
conf
)

}
} else {
log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)")
val conf = new Configuration()
val (outputFormat, headerLessOutputFormat) = asSam match {
case true =>
(classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]])

case false =>
(classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]])
}

val headPath = filePath + "_head"
val tailPath = filePath + "_tail"

PartitionPruningRDD.create(withKey, i => { i == 0 }).saveAsNewAPIHadoopFile(
headPath,
classOf[LongWritable],
classOf[SAMRecordWritable],
outputFormat,
conf
)
PartitionPruningRDD.create(withKey, i => { i != 0 }).saveAsNewAPIHadoopFile(
tailPath,
classOf[LongWritable],
classOf[SAMRecordWritable],
headerLessOutputFormat,
conf
)

val fs = FileSystem.get(conf)
val ouputParentDir = filePath.substring(0, filePath.lastIndexOf("/") + 1)
val tmpPath = ouputParentDir + "tmp" + System.currentTimeMillis().toString
fs.rename(new Path(filePath + "/part-r-00000"), new Path(tmpPath))
fs.delete(new Path(filePath), true)
fs.rename(new Path(tmpPath), new Path(filePath))

fs.listStatus(headPath)
.find(_.getPath.getName.startsWith("part-r"))
.map(fileStatus => FileUtil.copy(fs, fileStatus.getPath, fs, tailPath + "/head", false, false, conf))
fs.delete(headPath, true)

FileUtil.copyMerge(fs, tailPath, fs, filePath, true, conf, null)
fs.delete(tailPath, true)

}

}

implicit def str2Path(str: String): Path = new Path(str)

def getSequenceRecordsFromElement(elem: AlignmentRecord): Set[SequenceRecord] = {
SequenceRecord.fromADAMRecord(elem).toSet
}
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.rdd.read

import java.nio.file.Files
import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
Expand Down Expand Up @@ -264,4 +265,79 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {

checkFiles(resourcePath("ordered.sam"), actualSortedPath)
}

sparkTest("write single sam file back") {
val inputPath = resourcePath("bqsr1.sam")
val tempFile = Files.createTempDirectory("bqsr1")
val rdd = sc.loadAlignments(inputPath).cache()
rdd.adamSAMSave(tempFile.toAbsolutePath.toString + "/bqsr1.sam",
asSam = true,
asSingleFile = true)
val rdd2 = sc.loadAlignments(tempFile.toAbsolutePath.toString + "/bqsr1.sam")
.cache()

val (fsp1, fsf1) = rdd.adamFlagStat()
val (fsp2, fsf2) = rdd2.adamFlagStat()

assert(rdd.count === rdd2.count)
assert(fsp1 === fsp2)
assert(fsf1 === fsf2)

val jrdd = rdd.map(r => ((r.getReadName, r.getReadNum, r.getReadMapped), r))
.join(rdd2.map(r => ((r.getReadName, r.getReadNum, r.getReadMapped), r)))
.cache()

assert(rdd.count === jrdd.count)

jrdd.map(kv => kv._2)
.collect
.foreach(p => {
val (p1, p2) = p

assert(p1.getReadNum === p2.getReadNum)
assert(p1.getReadName === p2.getReadName)
assert(p1.getSequence === p2.getSequence)
assert(p1.getQual === p2.getQual)
assert(p1.getOrigQual === p2.getOrigQual)
assert(p1.getRecordGroupSample === p2.getRecordGroupSample)
assert(p1.getRecordGroupName === p2.getRecordGroupName)
assert(p1.getFailedVendorQualityChecks === p2.getFailedVendorQualityChecks)
assert(p1.getBasesTrimmedFromStart === p2.getBasesTrimmedFromStart)
assert(p1.getBasesTrimmedFromEnd === p2.getBasesTrimmedFromEnd)

assert(p1.getReadMapped === p2.getReadMapped)
// note: BQSR1.sam has reads that are unmapped, but where the mapping flags are set
// that is why we split this check out
// the SAM spec doesn't say anything particularly meaningful about this, other than
// that some fields should be disregarded if the read is not mapped
if (p1.getReadMapped && p2.getReadMapped) {
assert(p1.getDuplicateRead === p2.getDuplicateRead)
assert(p1.getContig.getContigName === p2.getContig.getContigName)
assert(p1.getStart === p2.getStart)
assert(p1.getEnd === p2.getEnd)
assert(p1.getCigar === p2.getCigar)
assert(p1.getOldCigar === p2.getOldCigar)
assert(p1.getPrimaryAlignment === p2.getPrimaryAlignment)
assert(p1.getSecondaryAlignment === p2.getSecondaryAlignment)
assert(p1.getSupplementaryAlignment === p2.getSupplementaryAlignment)
assert(p1.getReadNegativeStrand === p2.getReadNegativeStrand)
}

assert(p1.getReadPaired === p2.getReadPaired)
// a variety of fields are undefined if the reads are not paired
if (p1.getReadPaired && p2.getReadPaired) {
assert(p1.getInferredInsertSize === p2.getInferredInsertSize)
assert(p1.getProperPair === p2.getProperPair)

// same caveat about read alignment applies to mates
assert(p1.getMateMapped === p2.getMateMapped)
if (p1.getMateMapped && p2.getMateMapped) {
assert(p1.getMateNegativeStrand === p2.getMateNegativeStrand)
assert(p1.getMateContig.getContigName === p2.getMateContig.getContigName)
assert(p1.getMateAlignmentStart === p2.getMateAlignmentStart)
assert(p1.getMateAlignmentEnd === p2.getMateAlignmentEnd)
}
}
})
}
}