diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala index cc98f58124..38da920769 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala @@ -18,6 +18,7 @@ package org.bdgenomics.adam.cli import htsjdk.samtools.ValidationStringency +import java.time.Instant import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel @@ -29,6 +30,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, QualityScoreBin } import org.bdgenomics.adam.rich.RichVariant +import org.bdgenomics.formats.avro.ProcessingStep import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } @@ -38,7 +40,9 @@ object TransformAlignments extends BDGCommandCompanion { val commandDescription = "Convert SAM/BAM to ADAM format and optionally perform read pre-processing transformations" def apply(cmdLine: Array[String]) = { - new TransformAlignments(Args4j[TransformAlignmentsArgs](cmdLine)) + val args = Args4j[TransformAlignmentsArgs](cmdLine) + args.command = cmdLine.mkString(" ") + new TransformAlignments(args) } } @@ -125,6 +129,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu var cache: Boolean = false @Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.") var storageLevel: String = "MEMORY_ONLY" + @Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") + var disableProcessingStep = false + + var command: String = null } class TransformAlignments(protected val args: TransformAlignmentsArgs) extends BDGSparkCommand[TransformAlignmentsArgs] with Logging { @@ -434,7 +442,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B ) } - val aRdd: AlignmentRecordRDD = + val loadedRdd: AlignmentRecordRDD = if (args.forceLoadBam) { if (args.regionPredicate != null) { val loci = ReferenceRegion.fromString(args.regionPredicate) @@ -484,9 +492,33 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B loadedReads } } + + val aRdd: AlignmentRecordRDD = if (args.disableProcessingStep) { + loadedRdd + } else { + // add program info + val about = new About() + val version = if (about.isSnapshot()) { + "%s--%s".format(about.version(), about.commit()) + } else { + about.version() + } + val epoch = Instant.now.getEpochSecond + val processingStep = ProcessingStep.newBuilder + .setId("ADAM_%d".format(epoch)) + .setProgramName("org.bdgenomics.adam.cli.TransformAlignments") + .setVersion(version) + .setCommandLine(args.command) + .build + val newSteps: Seq[ProcessingStep] = (loadedRdd.processingSteps ++ Seq(processingStep)) + val rddWithProvenance: AlignmentRecordRDD = loadedRdd.copy(processingSteps = newSteps) + rddWithProvenance + } + val rdd = aRdd.rdd val sd = aRdd.sequences val rgd = aRdd.recordGroups + val pgs = aRdd.processingSteps // Optionally load a second RDD and concatenate it with the first. // Paired-FASTQ loading is avoided here because that wouldn't make sense @@ -507,12 +539,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B }) // if we have a second rdd that we are merging in, process the merger here - val (mergedRdd, mergedSd, mergedRgd) = concatOpt.fold((rdd, sd, rgd))(t => { - (rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups) + val (mergedRdd, mergedSd, mergedRgd, mergedPgs) = concatOpt.fold((rdd, sd, rgd, pgs))(t => { + (rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups, pgs ++ t.processingSteps) }) // make a new aligned read rdd, that merges the two RDDs together - val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd) + val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd, mergedPgs) // run our transformation val outputRdd = this.apply(newRdd) diff --git a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala index 36a45eb44d..2f4d2b817a 100644 --- a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala +++ b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala @@ -26,7 +26,8 @@ class MergeShardsSuite extends ADAMFunSuite { val inputPath = copyResource("unordered.sam") val actualPath = tmpFile("unordered.sam") val expectedPath = inputPath - TransformAlignments(Array("-single", "-defer_merging", inputPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-defer_merging", "-disable_pg", + inputPath, actualPath)).run(sc) MergeShards(Array(actualPath + "_tail", actualPath, "-header_path", actualPath + "_head")).run(sc) checkFiles(expectedPath, actualPath) @@ -41,6 +42,7 @@ class MergeShardsSuite extends ADAMFunSuite { "-sort_lexicographically", "-defer_merging", inputPath, actualPath)).run(sc) + println(actualPath) MergeShards(Array(actualPath + "_tail", actualPath, "-header_path", actualPath + "_head")).run(sc) checkFiles(expectedPath, actualPath) diff --git a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala index ebb75054a7..60ead5d13a 100644 --- a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala +++ b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala @@ -25,7 +25,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val inputPath = copyResource("unordered.sam") val actualPath = tmpFile("unordered.sam") val expectedPath = inputPath - TransformAlignments(Array("-single", inputPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", inputPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } @@ -33,7 +33,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val inputPath = copyResource("unordered.sam") val actualPath = tmpFile("ordered.sam") val expectedPath = copyResource("ordered.sam") - TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } @@ -42,8 +42,8 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val intermediateAdamPath = tmpFile("unordered.adam") val actualPath = tmpFile("unordered.sam") val expectedPath = inputPath - TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc) - TransformAlignments(Array("-single", intermediateAdamPath, actualPath)).run(sc) + TransformAlignments(Array("-disable_pg", inputPath, intermediateAdamPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", intermediateAdamPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } @@ -53,7 +53,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val actualPath = tmpFile("ordered.sam") val expectedPath = copyResource("ordered.sam") TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc) - TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala deleted file mode 100644 index 15d1bc750b..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.models - -import htsjdk.samtools.SAMProgramRecord - -private[models] object ProgramRecord { - - /** - * Builds a program record model from a SAM program record. - * - * @param pr The SAM program record to build from. - * @return Returns a serializable program record model. - */ - def apply(pr: SAMProgramRecord): ProgramRecord = { - // ID is a required field - val id: String = pr.getId - - // these fields are optional and can be left null, so must check for null... - val commandLine: Option[String] = Option(pr.getCommandLine) - val name: Option[String] = Option(pr.getProgramName) - val version: Option[String] = Option(pr.getProgramVersion) - val previousID: Option[String] = Option(pr.getPreviousProgramGroupId) - - new ProgramRecord(id, commandLine, name, version, previousID) - } -} - -/** - * A serializable equivalent to the htsjdk SAMProgramRecord. - * - * @param id The ID of the program record line. - * @param commandLine An optional command line that was run. - * @param name An optional name for the command/tool that was run. - * @param version An optional version for the command/tool that was run. - * @param previousID An optional ID for the ID of the previous stage that was - * run. - */ -private[models] case class ProgramRecord( - id: String, - commandLine: Option[String], - name: Option[String], - version: Option[String], - previousID: Option[String]) { - - /** - * @return Exports back to the htsjdk SAMProgramRecord. - */ - def toSAMProgramRecord(): SAMProgramRecord = { - val pr = new SAMProgramRecord(id) - - // set optional fields - commandLine.foreach(cl => pr.setCommandLine(cl)) - name.foreach(n => pr.setProgramName(n)) - version.foreach(v => pr.setProgramVersion(v)) - previousID.foreach(id => pr.setPreviousProgramGroupId(id)) - - pr - } -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/SAMFileHeaderWritable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/SAMFileHeaderWritable.scala index 5aea05443b..a677791ef4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/SAMFileHeaderWritable.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/SAMFileHeaderWritable.scala @@ -18,6 +18,8 @@ package org.bdgenomics.adam.models import htsjdk.samtools.SAMFileHeader +import org.bdgenomics.adam.rdd.ADAMContext +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD import scala.collection.JavaConversions._ private[adam] object SAMFileHeaderWritable { @@ -51,7 +53,7 @@ private[adam] class SAMFileHeaderWritable private (hdr: SAMFileHeader) extends S private val sd = SequenceDictionary(hdr.getSequenceDictionary) private val pgl = { val pgs = hdr.getProgramRecords - pgs.map(ProgramRecord(_)) + pgs.map(ADAMContext.convertSAMProgramRecord) } private val comments = { val cmts = hdr.getComments @@ -68,7 +70,7 @@ private[adam] class SAMFileHeaderWritable private (hdr: SAMFileHeader) extends S // add back optional fields text.foreach(h.setTextHeader) h.setSequenceDictionary(sd.toSAMSequenceDictionary) - pgl.foreach(p => h.addProgramRecord(p.toSAMProgramRecord)) + pgl.foreach(p => h.addProgramRecord(AlignmentRecordRDD.processingStepToSam(p))) comments.foreach(h.addComment) rgs.recordGroups.foreach(rg => h.addReadGroup(rg.toSAMReadGroupRecord)) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index be9663dfe7..ab82867992 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -18,7 +18,7 @@ package org.bdgenomics.adam.rdd import java.io.{ File, FileNotFoundException, InputStream } -import htsjdk.samtools.{ SAMFileHeader, ValidationStringency } +import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency } import htsjdk.samtools.util.Locatable import htsjdk.variant.vcf.{ VCFHeader, @@ -84,6 +84,7 @@ import org.bdgenomics.formats.avro.{ Fragment, Genotype, NucleotideContigFragment, + ProcessingStep, RecordGroup => RecordGroupMetadata, Sample, Variant @@ -146,7 +147,7 @@ object ADAMContext { implicit def fragmentsToReadsConversionFn(fRdd: FragmentRDD, rdd: RDD[AlignmentRecord]): AlignmentRecordRDD = { - AlignmentRecordRDD(rdd, fRdd.sequences, fRdd.recordGroups) + AlignmentRecordRDD(rdd, fRdd.sequences, fRdd.recordGroups, fRdd.processingSteps) } // Add ADAM Spark context methods @@ -157,6 +158,23 @@ object ADAMContext { // Add implicits for the rich adam objects implicit def recordToRichRecord(record: AlignmentRecord): RichAlignmentRecord = new RichAlignmentRecord(record) + + /** + * Builds a program description from a htsjdk program record. + * + * @param record Program record to convert. + * @return Returns an Avro formatted program record. + */ + private[adam] def convertSAMProgramRecord( + record: SAMProgramRecord): ProcessingStep = { + val builder = ProcessingStep.newBuilder + .setId(record.getId) + Option(record.getPreviousProgramGroupId).foreach(builder.setPreviousId(_)) + Option(record.getProgramVersion).foreach(builder.setVersion(_)) + Option(record.getProgramName).foreach(builder.setProgramName(_)) + Option(record.getCommandLine).foreach(builder.setCommandLine(_)) + builder.build + } } /** @@ -199,6 +217,16 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log RecordGroupDictionary.fromSAMHeader(samHeader) } + /** + * @param samHeader The header to extract processing lineage from. + * @return Returns the dictionary converted to an Avro model. + */ + private[rdd] def loadBamPrograms( + samHeader: SAMFileHeader): Seq[ProcessingStep] = { + val pgs = samHeader.getProgramRecords().toSeq + pgs.map(ADAMContext.convertSAMProgramRecord) + } + /** * @param pathName The path name to load VCF format metadata from. * Globs/directories are supported. @@ -250,6 +278,18 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log .distinct } + /** + * @param pathName The path name to load Avro processing steps from. + * Globs/directories are supported. + * @return Returns a seq of processing steps. + */ + private[rdd] def loadAvroPrograms(pathName: String): Seq[ProcessingStep] = { + getFsAndFilesWithFilter(pathName, new FileFilter("_processing.avro")) + .map(p => { + loadAvro[ProcessingStep](p.toString, ProcessingStep.SCHEMA$) + }).reduce(_ ++ _) + } + /** * @param pathName The path name to load Avro sequence dictionaries from. * Globs/directories are supported. @@ -550,7 +590,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log require(filteredFiles.nonEmpty, "Did not find any BAM files at %s.".format(path)) - val (seqDict, readGroups) = + val (seqDict, readGroups, programs) = filteredFiles .flatMap(fp => { try { @@ -562,7 +602,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log log.info("Loaded header from " + fp) val sd = loadBamDictionary(samHeader) val rg = loadBamReadGroups(samHeader) - Some((sd, rg)) + val pgs = loadBamPrograms(samHeader) + Some((sd, rg, pgs)) } catch { case e: Throwable => { if (validationStringency == ValidationStringency.STRICT) { @@ -576,7 +617,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } }).reduce((kv1, kv2) => { - (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) + (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3) }) val job = HadoopUtil.newJob(sc) @@ -602,7 +643,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)), seqDict, - readGroups) + readGroups, + programs) } /** @@ -643,7 +685,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log require(bamFiles.nonEmpty, "Did not find any BAM files at %s.".format(path)) - val (seqDict, readGroups) = bamFiles + val (seqDict, readGroups, programs) = bamFiles .map(fp => { // We need to separately read the header, so that we can inject the sequence dictionary // data into each individual Read (see the argument to samRecordConverter.convert, @@ -653,10 +695,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log log.info("Loaded header from " + fp) val sd = loadBamDictionary(samHeader) val rg = loadBamReadGroups(samHeader) + val pgs = loadBamPrograms(samHeader) - (sd, rg) + (sd, rg, pgs) }).reduce((kv1, kv2) => { - (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) + (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3) }) val job = HadoopUtil.newJob(sc) @@ -671,7 +714,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val samRecordConverter = new SAMRecordConverter AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)), seqDict, - readGroups) + readGroups, + programs) } /** @@ -863,15 +907,18 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // convert avro to sequence dictionary val rgd = loadAvroRecordGroupDictionary(pathName) + // load processing step descriptions + val pgs = loadAvroPrograms(pathName) + (optPredicate, optProjection) match { case (None, None) => { - ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd) + ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) } case (_, _) => { // load from disk val rdd = loadParquet[AlignmentRecord](pathName, optPredicate, optProjection) - RDDBoundAlignmentRecordRDD(rdd, sd, rgd, + RDDBoundAlignmentRecordRDD(rdd, sd, rgd, pgs, optPartitionMap = extractPartitionMap(pathName)) } } @@ -1583,9 +1630,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // convert avro to sequence dictionary val rgd = loadAvroRecordGroupDictionary(pathName) + // load processing step descriptions + val pgs = loadAvroPrograms(pathName) + (optPredicate, optProjection) match { case (None, None) => { - ParquetUnboundFragmentRDD(sc, pathName, sd, rgd) + ParquetUnboundFragmentRDD(sc, pathName, sd, rgd, pgs) } case (_, _) => { // load from disk @@ -1594,6 +1644,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log new RDDBoundFragmentRDD(rdd, sd, rgd, + pgs, optPartitionMap = extractPartitionMap(pathName)) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 81ed431b8f..a628e71c51 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -38,6 +38,7 @@ import org.bdgenomics.adam.models.{ } import org.bdgenomics.formats.avro.{ Contig, + ProcessingStep, RecordGroup => RecordGroupMetadata, Sample } @@ -1330,6 +1331,11 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Geno */ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: AvroReadGroupGenomicRDD[T, U, V]] extends AvroGenomicRDD[T, U, V] { + /** + * The processing steps that have been applied to this GenomicRDD. + */ + val processingSteps: Seq[ProcessingStep] + /** * A dictionary describing the read groups attached to this GenomicRDD. */ @@ -1346,6 +1352,12 @@ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: Produc Contig.SCHEMA$, contigs) + // save processing metadata + saveAvro("%s/_processing.avro".format(filePath), + rdd.context, + ProcessingStep.SCHEMA$, + processingSteps) + // convert record group to avro and save val rgMetadata = recordGroups.recordGroups .map(_.toMetadata) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index 9e60301432..a80c11e5e2 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -99,7 +99,10 @@ object FragmentRDD { * @return Returns a FragmentRDD with an empty record group dictionary and sequence dictionary. */ private[rdd] def fromRdd(rdd: RDD[Fragment]): FragmentRDD = { - FragmentRDD(rdd, SequenceDictionary.empty, RecordGroupDictionary.empty) + FragmentRDD(rdd, + SequenceDictionary.empty, + RecordGroupDictionary.empty, + Seq.empty) } /** @@ -108,13 +111,19 @@ object FragmentRDD { * @param rdd The underlying Franment RDD. * @param sequences The sequence dictionary for the RDD. * @param recordGroupDictionary The record group dictionary for the RDD. + * @param processingSteps The processing steps that have been applied to this data. * @return A new FragmentRDD. */ def apply(rdd: RDD[Fragment], sequences: SequenceDictionary, - recordGroupDictionary: RecordGroupDictionary): FragmentRDD = { - - new RDDBoundFragmentRDD(rdd, sequences, recordGroupDictionary, None) + recordGroupDictionary: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]): FragmentRDD = { + + new RDDBoundFragmentRDD(rdd, + sequences, + recordGroupDictionary, + processingSteps, + None) } /** @@ -123,11 +132,14 @@ object FragmentRDD { * @param ds The underlying Dataset of Fragment data. * @param sequences The genomic sequences this data was aligned to, if any. * @param recordGroups The record groups these Fragments came from. + * @param processingSteps The processing steps that have been applied to this data. + * @return A new FragmentRDD. */ def apply(ds: Dataset[FragmentProduct], sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary): FragmentRDD = { - DatasetBoundFragmentRDD(ds, sequences, recordGroups) + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]): FragmentRDD = { + DatasetBoundFragmentRDD(ds, sequences, recordGroups, processingSteps) } } @@ -135,7 +147,8 @@ case class ParquetUnboundFragmentRDD private[rdd] ( @transient private val sc: SparkContext, private val parquetFilename: String, sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary) extends FragmentRDD { + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]) extends FragmentRDD { lazy val rdd: RDD[Fragment] = { sc.loadParquet(parquetFilename) @@ -153,7 +166,8 @@ case class ParquetUnboundFragmentRDD private[rdd] ( case class DatasetBoundFragmentRDD private[rdd] ( dataset: Dataset[FragmentProduct], sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary) extends FragmentRDD { + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]) extends FragmentRDD { lazy val rdd = dataset.rdd.map(_.toAvro) @@ -183,6 +197,7 @@ case class RDDBoundFragmentRDD private[rdd] ( rdd: RDD[Fragment], sequences: SequenceDictionary, recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep], optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]]) extends FragmentRDD { /** @@ -211,14 +226,19 @@ sealed abstract class FragmentRDD extends AvroReadGroupGenomicRDD[Fragment, Frag */ protected def replaceRdd(newRdd: RDD[Fragment], newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): FragmentRDD = { - RDDBoundFragmentRDD(newRdd, sequences, recordGroups, newPartitionMap) + RDDBoundFragmentRDD(newRdd, + sequences, + recordGroups, + processingSteps, + newPartitionMap) } def union(rdds: FragmentRDD*): FragmentRDD = { val iterableRdds = rdds.toSeq FragmentRDD(rdd.context.union(rdd, iterableRdds.map(_.rdd): _*), iterableRdds.map(_.sequences).fold(sequences)(_ ++ _), - iterableRdds.map(_.recordGroups).fold(recordGroups)(_ ++ _)) + iterableRdds.map(_.recordGroups).fold(recordGroups)(_ ++ _), + iterableRdds.map(_.processingSteps).fold(processingSteps)(_ ++ _)) } /** @@ -231,7 +251,10 @@ sealed abstract class FragmentRDD extends AvroReadGroupGenomicRDD[Fragment, Frag */ def transformDataset( tFn: Dataset[FragmentProduct] => Dataset[FragmentProduct]): FragmentRDD = { - DatasetBoundFragmentRDD(tFn(dataset), sequences, recordGroups) + DatasetBoundFragmentRDD(tFn(dataset), + sequences, + recordGroups, + processingSteps) } /** @@ -246,7 +269,10 @@ sealed abstract class FragmentRDD extends AvroReadGroupGenomicRDD[Fragment, Frag val newRdd = rdd.flatMap(converter.convertFragment) // are we aligned? - AlignmentRecordRDD(newRdd, sequences, recordGroups) + AlignmentRecordRDD(newRdd, + sequences, + recordGroups, + processingSteps) } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 2d0c911c58..d58dda3c3e 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -98,6 +98,24 @@ private[adam] class AlignmentRecordArraySerializer extends IntervalArraySerializ object AlignmentRecordRDD extends Serializable { + /** + * Converts a processing step back to the SAM representation. + * + * @param ps The processing step to convert. + * @return Returns an HTSJDK program group. + */ + private[adam] def processingStepToSam( + ps: ProcessingStep): SAMProgramRecord = { + require(ps.getId != null, + "Processing stage ID cannot be null (%s).".format(ps)) + val pg = new SAMProgramRecord(ps.getId) + Option(ps.getPreviousId).foreach(pg.setPreviousProgramGroupId(_)) + Option(ps.getProgramName).foreach(pg.setProgramName) + Option(ps.getVersion).foreach(pg.setProgramVersion) + Option(ps.getCommandLine).foreach(pg.setCommandLine) + pg + } + /** * Builds an AlignmentRecordRDD for unaligned reads. * @@ -108,6 +126,7 @@ object AlignmentRecordRDD extends Serializable { RDDBoundAlignmentRecordRDD(rdd, SequenceDictionary.empty, RecordGroupDictionary.empty, + Seq.empty, None) } @@ -149,14 +168,23 @@ object AlignmentRecordRDD extends Serializable { */ def apply(rdd: RDD[AlignmentRecord], sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary): AlignmentRecordRDD = { - RDDBoundAlignmentRecordRDD(rdd, sequences, recordGroups, None) + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]): AlignmentRecordRDD = { + RDDBoundAlignmentRecordRDD(rdd, + sequences, + recordGroups, + processingSteps, + None) } def apply(ds: Dataset[AlignmentRecordProduct], sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary): AlignmentRecordRDD = { - DatasetBoundAlignmentRecordRDD(ds, sequences, recordGroups) + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]): AlignmentRecordRDD = { + DatasetBoundAlignmentRecordRDD(ds, + sequences, + recordGroups, + processingSteps) } } @@ -164,7 +192,8 @@ case class ParquetUnboundAlignmentRecordRDD private[rdd] ( @transient private val sc: SparkContext, private val parquetFilename: String, sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary) extends AlignmentRecordRDD { + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]) extends AlignmentRecordRDD { lazy val optPartitionMap = sc.extractPartitionMap(parquetFilename) @@ -182,7 +211,8 @@ case class ParquetUnboundAlignmentRecordRDD private[rdd] ( case class DatasetBoundAlignmentRecordRDD private[rdd] ( dataset: Dataset[AlignmentRecordProduct], sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary) extends AlignmentRecordRDD { + recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep]) extends AlignmentRecordRDD { lazy val rdd = dataset.rdd.map(_.toAvro) @@ -212,6 +242,7 @@ case class RDDBoundAlignmentRecordRDD private[rdd] ( rdd: RDD[AlignmentRecord], sequences: SequenceDictionary, recordGroups: RecordGroupDictionary, + processingSteps: Seq[ProcessingStep], optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]]) extends AlignmentRecordRDD { /** @@ -261,7 +292,10 @@ sealed abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[Alignme */ def transformDataset( tFn: Dataset[AlignmentRecordProduct] => Dataset[AlignmentRecordProduct]): AlignmentRecordRDD = { - DatasetBoundAlignmentRecordRDD(dataset, sequences, recordGroups) + DatasetBoundAlignmentRecordRDD(dataset, + sequences, + recordGroups, + processingSteps) .transformDataset(tFn) } @@ -278,12 +312,17 @@ sealed abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[Alignme RDDBoundAlignmentRecordRDD(newRdd, newSequences, recordGroups, + processingSteps, partitionMap) } protected def replaceRdd(newRdd: RDD[AlignmentRecord], newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): AlignmentRecordRDD = { - RDDBoundAlignmentRecordRDD(newRdd, sequences, recordGroups, newPartitionMap) + RDDBoundAlignmentRecordRDD(newRdd, + sequences, + recordGroups, + processingSteps, + newPartitionMap) } protected def buildTree(rdd: RDD[(ReferenceRegion, AlignmentRecord)])( @@ -295,7 +334,8 @@ sealed abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[Alignme val iterableRdds = rdds.toSeq AlignmentRecordRDD(rdd.context.union(rdd, iterableRdds.map(_.rdd): _*), iterableRdds.map(_.sequences).fold(sequences)(_ ++ _), - iterableRdds.map(_.recordGroups).fold(recordGroups)(_ ++ _)) + iterableRdds.map(_.recordGroups).fold(recordGroups)(_ ++ _), + iterableRdds.map(_.processingSteps).fold(processingSteps)(_ ++ _)) } /** @@ -307,7 +347,8 @@ sealed abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[Alignme def toFragments(): FragmentRDD = { FragmentRDD(groupReadsByFragment().map(_.toFragment), sequences, - recordGroups) + recordGroups, + processingSteps) } /** @@ -331,7 +372,8 @@ sealed abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[Alignme private[rdd] def querynameSortedToFragments: FragmentRDD = { FragmentRDD(locallyGroupReadsByFragment().map(_.toFragment), sequences, - recordGroups) + recordGroups, + processingSteps) } /** @@ -512,6 +554,12 @@ sealed abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[Alignme header.setSortOrder(SAMFileHeader.SortOrder.unsorted) } + // get program records and attach to header + val pgRecords = processingSteps.map(r => { + AlignmentRecordRDD.processingStepToSam(r) + }) + header.setProgramRecords(pgRecords) + // broadcast for efficiency val hdrBcast = rdd.context.broadcast(SAMFileHeaderWritable(header)) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala index dc87780c3f..5e7fa556ef 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala @@ -251,6 +251,8 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging { new AvroSerializer[org.bdgenomics.formats.avro.NucleotideContigFragment]) kryo.register(classOf[org.bdgenomics.formats.avro.OntologyTerm], new AvroSerializer[org.bdgenomics.formats.avro.OntologyTerm]) + kryo.register(classOf[org.bdgenomics.formats.avro.ProcessingStep], + new AvroSerializer[org.bdgenomics.formats.avro.ProcessingStep]) kryo.register(classOf[org.bdgenomics.formats.avro.Read], new AvroSerializer[org.bdgenomics.formats.avro.Read]) kryo.register(classOf[org.bdgenomics.formats.avro.RecordGroup], diff --git a/adam-core/src/test/resources/small.sam b/adam-core/src/test/resources/small.sam index 7ed3d8de21..d29f8454a4 100644 --- a/adam-core/src/test/resources/small.sam +++ b/adam-core/src/test/resources/small.sam @@ -1,5 +1,7 @@ @SQ SN:1 LN:249250621 @SQ SN:2 LN:243199373 +@PG ID:p1 PN:myProg CL:"myProg 123" VN:1.0.0 +@PG ID:p2 PN:myProg CL:"myProg 456" VN:1.0.0 PP:p1 simread:1:26472783:false 16 1 26472784 60 75M * 0 0 GTATAAGAGCAGCCTTATTCCTATTTATAATCAGGGTGAAACACCTGTGCCAATGCCAAGACAGGGGTGCCAAGA * NM:i:0 AS:i:75 XS:i:0 simread:1:240997787:true 0 1 240997788 60 75M * 0 0 CTTTATTTTTATTTTTAAGGTTTTTTTTGTTTGTTTGTTTTGAGATGGAGTCTCGCTCCACCGCCCAGACTGGAG * NM:i:0 AS:i:75 XS:i:39 simread:1:189606653:true 0 1 189606654 60 75M * 0 0 TGTATCTTCCTCCCCTGCTGTATGTTTCCTGCCCTCAAACATCACACTCCACGTTCTTCAGCTTTAGGACTTGGA * NM:i:0 AS:i:75 XS:i:0 diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/ProgramRecordSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/ProgramRecordSuite.scala deleted file mode 100644 index 6b2fd7e5ae..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/models/ProgramRecordSuite.scala +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.models - -import htsjdk.samtools.SAMProgramRecord -import org.scalatest.FunSuite - -class ProgramRecordSuite extends FunSuite { - - test("convert a sam program record with no optional fields into a program record and v/v") { - val spr = new SAMProgramRecord("myProgramRecord") - val pr = ProgramRecord(spr) - - assert(pr.id === "myProgramRecord") - assert(pr.commandLine.isEmpty) - assert(pr.name.isEmpty) - assert(pr.version.isEmpty) - assert(pr.previousID.isEmpty) - - val convert = pr.toSAMProgramRecord() - assert(spr.equals(convert)) - } - - test("convert a sam program record into a program record and v/v") { - val spr = new SAMProgramRecord("myProgramRecord") - spr.setCommandLine("command") - spr.setProgramName("myCommand") - spr.setProgramVersion("0.0.0") - spr.setPreviousProgramGroupId("myPreviousProgramRecord") - - val pr = ProgramRecord(spr) - - assert(pr.id === "myProgramRecord") - assert(pr.commandLine.get === "command") - assert(pr.name.get === "myCommand") - assert(pr.version.get === "0.0.0") - assert(pr.previousID.get === "myPreviousProgramRecord") - - val convert = pr.toSAMProgramRecord() - assert(spr.equals(convert)) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index e9e9578e2f..ba09e7bed8 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -17,7 +17,11 @@ */ package org.bdgenomics.adam.rdd -import htsjdk.samtools.{ SAMFormatException, ValidationStringency } +import htsjdk.samtools.{ + SAMFormatException, + SAMProgramRecord, + ValidationStringency +} import java.io.{ File, FileNotFoundException } import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -31,6 +35,7 @@ import org.bdgenomics.adam.util.PhredUtils._ import org.bdgenomics.adam.util.ADAMFunSuite import org.bdgenomics.formats.avro._ import org.seqdoop.hadoop_bam.CRAMInputFormat +import org.seqdoop.hadoop_bam.util.SAMHeaderReader case class TestSaveArgs(var outputPath: String) extends ADAMSaveAnyArgs { var sortFastqOutput = false @@ -631,4 +636,39 @@ class ADAMContextSuite extends ADAMFunSuite { assert(sequences("chr17_gl000206_random").isDefined) assert(sequences("chr17_gl000206_random").get.length === 41001L) } + + sparkTest("convert program record") { + val pr = new SAMProgramRecord("pgId") + pr.setPreviousProgramGroupId("ppgId") + pr.setProgramName("myProg") + pr.setProgramVersion("1.2.3") + pr.setCommandLine("myProg aCommand") + val ps = ADAMContext.convertSAMProgramRecord(pr) + assert(ps.getId === "pgId") + assert(ps.getProgramName === "myProg") + assert(ps.getVersion === "1.2.3") + assert(ps.getCommandLine === "myProg aCommand") + assert(ps.getPreviousId === "ppgId") + } + + sparkTest("load program record from sam file") { + val input = testFile("small.sam") + val samHeader = SAMHeaderReader.readSAMHeaderFrom(new Path(input), + sc.hadoopConfiguration) + val programs = sc.loadBamPrograms(samHeader) + assert(programs.size === 2) + val firstPg = programs.filter(_.getPreviousId == null) + assert(firstPg.size === 1) + assert(firstPg.head.getId === "p1") + assert(firstPg.head.getProgramName === "myProg") + assert(firstPg.head.getCommandLine === "\"myProg 123\"") + assert(firstPg.head.getVersion === "1.0.0") + val secondPg = programs.filter(_.getPreviousId != null) + assert(secondPg.size === 1) + assert(secondPg.head.getId === "p2") + assert(secondPg.head.getPreviousId === "p1") + assert(secondPg.head.getProgramName === "myProg") + assert(secondPg.head.getCommandLine === "\"myProg 456\"") + assert(secondPg.head.getVersion === "1.0.0") + } } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala index bf95adf921..80099bc6e6 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala @@ -49,7 +49,7 @@ class ParallelFileMergerSuite extends ADAMFunSuite { val files = Seq(testFile("unmapped.sam"), testFile("small.sam")) .map(new Path(_)) - val fileSizes = Seq(29408, 3093) + val fileSizes = Seq(29408, 3189) val filesWithSizes = files.zip(fileSizes) val fs = FileSystem.get(sc.hadoopConfiguration) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 28cec251f5..b8aedb21af 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -74,7 +74,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { val contigNames = rdd.flatMap(r => Option(r.getContigName)).distinct.collect val sd = new SequenceDictionary(contigNames.map(v => SequenceRecord(v, 1000000L)).toVector) - val sortedReads = AlignmentRecordRDD(rdd, sd, RecordGroupDictionary.empty) + val sortedReads = AlignmentRecordRDD(rdd, sd, RecordGroupDictionary.empty, Seq.empty) .sortReadsByReferencePosition() .rdd .collect() @@ -155,7 +155,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { }).toVector) val rdd = sc.parallelize(reads) - val sortedReads = AlignmentRecordRDD(rdd, sd, RecordGroupDictionary.empty) + val sortedReads = AlignmentRecordRDD(rdd, sd, RecordGroupDictionary.empty, Seq.empty) .sortReadsByReferencePositionAndIndex() .rdd .collect() @@ -1026,4 +1026,26 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { import sqlContext.implicits._ assert(kmerCounts.toDF().where($"kmer" === "CCAAGA" && $"count" === 3).count === 1) } + + test("cannot have a null processing step ID") { + intercept[IllegalArgumentException] { + AlignmentRecordRDD.processingStepToSam(ProcessingStep.newBuilder.build) + } + } + + test("convert a processing description to htsjdk") { + val htsjdkPg = AlignmentRecordRDD.processingStepToSam( + ProcessingStep.newBuilder() + .setId("pg") + .setProgramName("myProgram") + .setVersion("1") + .setPreviousId("ppg") + .setCommandLine("myProgram run") + .build) + assert(htsjdkPg.getId === "pg") + assert(htsjdkPg.getCommandLine === "myProgram run") + assert(htsjdkPg.getProgramName === "myProgram") + assert(htsjdkPg.getProgramVersion === "1") + assert(htsjdkPg.getPreviousProgramGroupId === "ppg") + } } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala index 8d69ea0763..bd6c19b411 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala @@ -98,7 +98,7 @@ class MarkDuplicatesSuite extends ADAMFunSuite { } private def markDuplicates(reads: AlignmentRecord*): Array[AlignmentRecord] = { - AlignmentRecordRDD(sc.parallelize(reads), SequenceDictionary.empty, rgd) + AlignmentRecordRDD(sc.parallelize(reads), SequenceDictionary.empty, rgd, Seq.empty) .markDuplicates() .rdd .collect() @@ -207,7 +207,7 @@ class MarkDuplicatesSuite extends ADAMFunSuite { } private def markDuplicateFragments(reads: AlignmentRecord*): Array[AlignmentRecord] = { - AlignmentRecordRDD(sc.parallelize(reads), SequenceDictionary.empty, rgd) + AlignmentRecordRDD(sc.parallelize(reads), SequenceDictionary.empty, rgd, Seq.empty) .toFragments .markDuplicates() .toReads diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala index 5e982874a9..5d8d7138d3 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/realignment/RealignIndelsSuite.scala @@ -549,7 +549,8 @@ class RealignIndelsSuite extends ADAMFunSuite { scRead, ecRead)), new SequenceDictionary(Vector(SequenceRecord("1", 20L))), - RecordGroupDictionary.empty) + RecordGroupDictionary.empty, + Seq.empty) val realignedReads = rdd.realignIndels(lodThreshold = 0.0) .rdd .collect