Navigation Menu

Skip to content

Commit

Permalink
[ADAM-1257] Add program record support for alignment/fragment files.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Jul 17, 2017
1 parent 406d1e3 commit 257695d
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 189 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import java.time.Instant
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
Expand All @@ -29,6 +30,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, QualityScoreBin }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.ProcessingStep
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand All @@ -38,7 +40,9 @@ object TransformAlignments extends BDGCommandCompanion {
val commandDescription = "Convert SAM/BAM to ADAM format and optionally perform read pre-processing transformations"

def apply(cmdLine: Array[String]) = {
new TransformAlignments(Args4j[TransformAlignmentsArgs](cmdLine))
val args = Args4j[TransformAlignmentsArgs](cmdLine)
args.command = cmdLine.mkString(" ")
new TransformAlignments(args)
}
}

Expand Down Expand Up @@ -125,6 +129,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var cache: Boolean = false
@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false

var command: String = null
}

class TransformAlignments(protected val args: TransformAlignmentsArgs) extends BDGSparkCommand[TransformAlignmentsArgs] with Logging {
Expand Down Expand Up @@ -434,7 +442,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
)
}

val aRdd: AlignmentRecordRDD =
val loadedRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
if (args.regionPredicate != null) {
val loci = ReferenceRegion.fromString(args.regionPredicate)
Expand Down Expand Up @@ -484,9 +492,31 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
loadedReads
}
}

val aRdd: AlignmentRecordRDD = if (args.disableProcessingStep) {
loadedRdd
} else {
// add program info
val about = new About()
val version = if (about.isSnapshot()) {
"%s--%s".format(about.version(), about.commit())
} else {
about.version()
}
val epoch = Instant.now.getEpochSecond
val processingStep = ProcessingStep.newBuilder
.setId("ADAM_%d".format(epoch))
.setProgramName("org.bdgenomics.adam.cli.TransformAlignments")
.setVersion(version)
.setCommandLine(args.command)
.build
loadedRdd.addProcessingStep(processingStep)
}

val rdd = aRdd.rdd
val sd = aRdd.sequences
val rgd = aRdd.recordGroups
val pgs = aRdd.processingSteps

// Optionally load a second RDD and concatenate it with the first.
// Paired-FASTQ loading is avoided here because that wouldn't make sense
Expand All @@ -507,12 +537,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
})

// if we have a second rdd that we are merging in, process the merger here
val (mergedRdd, mergedSd, mergedRgd) = concatOpt.fold((rdd, sd, rgd))(t => {
(rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups)
val (mergedRdd, mergedSd, mergedRgd, mergedPgs) = concatOpt.fold((rdd, sd, rgd, pgs))(t => {
(rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups, pgs ++ t.processingSteps)
})

// make a new aligned read rdd, that merges the two RDDs together
val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd)
val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd, mergedPgs)

// run our transformation
val outputRdd = this.apply(newRdd)
Expand Down
Expand Up @@ -26,7 +26,8 @@ class MergeShardsSuite extends ADAMFunSuite {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array("-single", "-defer_merging", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-defer_merging", "-disable_pg",
inputPath, actualPath)).run(sc)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head")).run(sc)
checkFiles(expectedPath, actualPath)
Expand All @@ -41,6 +42,7 @@ class MergeShardsSuite extends ADAMFunSuite {
"-sort_lexicographically",
"-defer_merging",
inputPath, actualPath)).run(sc)
println(actualPath)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head")).run(sc)
checkFiles(expectedPath, actualPath)
Expand Down
Expand Up @@ -25,15 +25,15 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array("-single", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam to ordered sam") {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand All @@ -42,8 +42,8 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val intermediateAdamPath = tmpFile("unordered.adam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", intermediateAdamPath, actualPath)).run(sc)
TransformAlignments(Array("-disable_pg", inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand All @@ -53,7 +53,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand Down

This file was deleted.

Expand Up @@ -18,6 +18,8 @@
package org.bdgenomics.adam.models

import htsjdk.samtools.SAMFileHeader
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import scala.collection.JavaConversions._

private[adam] object SAMFileHeaderWritable {
Expand Down Expand Up @@ -51,7 +53,7 @@ private[adam] class SAMFileHeaderWritable private (hdr: SAMFileHeader) extends S
private val sd = SequenceDictionary(hdr.getSequenceDictionary)
private val pgl = {
val pgs = hdr.getProgramRecords
pgs.map(ProgramRecord(_))
pgs.map(ADAMContext.convertSAMProgramRecord)
}
private val comments = {
val cmts = hdr.getComments
Expand All @@ -68,7 +70,7 @@ private[adam] class SAMFileHeaderWritable private (hdr: SAMFileHeader) extends S
// add back optional fields
text.foreach(h.setTextHeader)
h.setSequenceDictionary(sd.toSAMSequenceDictionary)
pgl.foreach(p => h.addProgramRecord(p.toSAMProgramRecord))
pgl.foreach(p => h.addProgramRecord(AlignmentRecordRDD.processingStepToSam(p)))
comments.foreach(h.addComment)
rgs.recordGroups.foreach(rg => h.addReadGroup(rg.toSAMReadGroupRecord))

Expand Down

0 comments on commit 257695d

Please sign in to comment.