Skip to content

Commit

Permalink
[ADAM-905] Move to bdg-formats 0.7.0. Resolves bigdatagenomics#905.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Dec 25, 2015
1 parent 94e92dd commit f734a78
Show file tree
Hide file tree
Showing 28 changed files with 263 additions and 235 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.{ Logging, SparkContext }
import org.bdgenomics.adam.models.{ RecordGroupDictionary, SequenceDictionary }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -47,6 +48,10 @@ class Fragments2Reads(protected val args: Fragments2ReadsArgs) extends BDGSparkC
val companion = Fragments2Reads

def run(sc: SparkContext) {
sc.loadFragments(args.inputPath).toReads.adamSave(args)
sc.loadFragments(args.inputPath)
.toReads
.adamSave(args,
SequenceDictionary.empty,
RecordGroupDictionary.empty)
}
}
39 changes: 27 additions & 12 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Expand Up @@ -24,7 +24,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.SnpTable
import org.bdgenomics.adam.models.{
RecordGroupDictionary,
SequenceDictionary,
SnpTable
}
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
Expand Down Expand Up @@ -117,7 +121,8 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

val stringency = ValidationStringency.valueOf(args.stringency)

def apply(rdd: RDD[AlignmentRecord]): RDD[AlignmentRecord] = {
def apply(rdd: RDD[AlignmentRecord],
rgd: RecordGroupDictionary): RDD[AlignmentRecord] = {

var adamRecords = rdd
val sc = rdd.context
Expand All @@ -132,7 +137,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

if (args.markDuplicates) {
log.info("Marking duplicates")
adamRecords = adamRecords.adamMarkDuplicates()
adamRecords = adamRecords.adamMarkDuplicates(rgd)
}

if (args.locallyRealign) {
Expand Down Expand Up @@ -230,13 +235,15 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
"-aligned_read_predicate and -limit_projection only apply to Parquet files, but a non-Parquet force load flag was passed.")
}

val rdd =
val (rdd, sd, rgd) =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
sc.loadFastq(args.inputPath, Option(args.pairedFastqFile), Option(args.fastqRecordGroup), stringency)
(sc.loadFastq(args.inputPath, Option(args.pairedFastqFile), Option(args.fastqRecordGroup), stringency),
SequenceDictionary.empty, RecordGroupDictionary.empty)
} else if (args.forceLoadIFastq) {
sc.loadInterleavedFastq(args.inputPath)
(sc.loadInterleavedFastq(args.inputPath),
SequenceDictionary.empty, RecordGroupDictionary.empty)
} else if (args.forceLoadParquet ||
args.limitProjection ||
args.useAlignedReadPredicate) {
Expand Down Expand Up @@ -269,16 +276,17 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
} else {
None
}
sc.loadParquetAlignments(args.inputPath,
(sc.loadParquetAlignments(args.inputPath,
predicate = pred,
projection = proj)
projection = proj),
SequenceDictionary.empty, RecordGroupDictionary.empty)
} else {
sc.loadAlignments(
(sc.loadAlignments(
args.inputPath,
filePath2Opt = Option(args.pairedFastqFile),
recordGroupOpt = Option(args.fastqRecordGroup),
stringency = stringency
)
), SequenceDictionary.empty, RecordGroupDictionary.empty)
}

// Optionally load a second RDD and concatenate it with the first.
Expand All @@ -287,7 +295,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
val concatRddOpt =
Option(args.concatFilename).map(concatFilename =>
if (args.forceLoadBam) {
sc.loadBam(concatFilename)
sc.loadBam(concatFilename)._1
} else if (args.forceLoadIFastq) {
sc.loadInterleavedFastq(concatFilename)
} else if (args.forceLoadParquet) {
Expand All @@ -300,10 +308,17 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
}
)

val sdFinal = if (args.sortReads) {
sd.stripIndices
.sorted
} else {
sd
}

this.apply(concatRddOpt match {
case Some(concatRdd) => rdd ++ concatRdd
case None => rdd
}).adamSave(args, args.sortReads)
}, rgd).adamSave(args, sdFinal, rgd, args.sortReads)
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = CreateKnownSnpsTable.time {
Expand Down
9 changes: 5 additions & 4 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ RecordGroupDictionary, SequenceDictionary }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.formats.avro.AlignmentRecord
Expand Down Expand Up @@ -126,8 +127,8 @@ class View(val args: ViewArgs) extends BDGSparkCommand[ViewArgs] {
getFilter(0x8, read => read.getReadPaired && !read.getMateMapped),
getFilter(0x10, _.getReadNegativeStrand),
getFilter(0x20, _.getMateNegativeStrand),
getFilter(0x40, _.getReadNum == 0),
getFilter(0x80, _.getReadNum == 1),
getFilter(0x40, _.getReadInFragment == 0),
getFilter(0x80, _.getReadInFragment == 1),
getFilter(0x100, !_.getPrimaryAlignment),
getFilter(0x200, _.getFailedVendorQualityChecks),
getFilter(0x400, _.getDuplicateRead),
Expand Down Expand Up @@ -159,12 +160,12 @@ class View(val args: ViewArgs) extends BDGSparkCommand[ViewArgs] {
val reads: RDD[AlignmentRecord] = applyFilters(sc.loadAlignments(args.inputPath))

if (args.outputPath != null)
reads.adamAlignedRecordSave(args)
reads.adamAlignedRecordSave(args, SequenceDictionary.empty, RecordGroupDictionary.empty)
else {
if (args.printCount) {
println(reads.count())
} else {
println(reads.adamSAMString)
println(reads.adamSAMString(SequenceDictionary.empty, RecordGroupDictionary.empty))
}
}
}
Expand Down
Expand Up @@ -26,34 +26,34 @@ class TransformSuite extends ADAMFunSuite {
val inputPath = resourcePath("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
Transform(Array("-single", inputPath, actualPath)).run(sc)
Transform(Array("-force_load_bam", "-single", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam to ordered sam") {
val inputPath = resourcePath("unordered.sam")
val actualPath = tmpFile("ordered.sam")
val expectedPath = resourcePath("ordered.sam")
Transform(Array("-single", "-sort_reads", inputPath, actualPath)).run(sc)
Transform(Array("-force_load_bam", "-single", "-sort_reads", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam, to adam, to sam") {
ignore("unordered sam, to adam, to sam") {
val inputPath = resourcePath("unordered.sam")
val intermediateAdamPath = tmpFile("unordered.adam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
Transform(Array(inputPath, intermediateAdamPath)).run(sc)
Transform(Array("-force_load_bam", inputPath, intermediateAdamPath)).run(sc)
Transform(Array("-single", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam, to adam, to ordered sam") {
ignore("unordered sam, to adam, to ordered sam") {
val inputPath = resourcePath("unordered.sam")
val intermediateAdamPath = tmpFile("unordered.adam")
val actualPath = tmpFile("ordered.sam")
val expectedPath = resourcePath("ordered.sam")
Transform(Array(inputPath, intermediateAdamPath)).run(sc)
Transform(Array("-force_load_bam", inputPath, intermediateAdamPath)).run(sc)
Transform(Array("-single", "-sort_reads", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}
Expand Down
Expand Up @@ -42,7 +42,9 @@ class ViewSuite extends ADAMFunSuite {
)
)

reads = transform.apply(sc.loadAlignments(inputSamPath)).collect()
val (rdd, _, rgd) = sc.loadBam(inputSamPath)

reads = transform.apply(rdd, rgd).collect()
readsCount = reads.size.toInt
}

Expand Down
Expand Up @@ -21,7 +21,7 @@ import htsjdk.samtools.{ SAMFileHeader, SAMRecord }
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment, Sequence }
import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment }
import scala.collection.JavaConversions._

class AlignmentRecordConverter extends Serializable {
Expand All @@ -46,7 +46,7 @@ class AlignmentRecordConverter extends Serializable {
if (maybeAddSuffix &&
!AlignmentRecordConverter.readNameHasPairedSuffix(adamRecord) &&
adamRecord.getReadPaired) {
"/%d".format(adamRecord.getReadNum + 1)
"/%d".format(adamRecord.getReadInFragment + 1)
} else {
""
}
Expand Down Expand Up @@ -92,10 +92,9 @@ class AlignmentRecordConverter extends Serializable {
* @param header SAM file header to use.
* @return Returns the record converted to SAMtools format. Can be used for output to SAM/BAM.
*/
def convert(adamRecord: AlignmentRecord, header: SAMFileHeaderWritable): SAMRecord = ConvertToSAMRecord.time {

// get read group dictionary from header
val rgDict = header.header.getSequenceDictionary
def convert(adamRecord: AlignmentRecord,
header: SAMFileHeaderWritable,
rgd: RecordGroupDictionary): SAMRecord = ConvertToSAMRecord.time {

// attach header
val builder: SAMRecord = new SAMRecord(header.header)
Expand All @@ -110,13 +109,12 @@ class AlignmentRecordConverter extends Serializable {

// set read group flags
Option(adamRecord.getRecordGroupName)
.map(_.toString)
.map(rgDict.getSequenceIndex)
.foreach(v => builder.setAttribute("RG", v.toString))
Option(adamRecord.getRecordGroupLibrary)
.foreach(v => builder.setAttribute("LB", v.toString))
Option(adamRecord.getRecordGroupPlatformUnit)
.foreach(v => builder.setAttribute("PU", v.toString))
.foreach(v => {
builder.setAttribute("RG", rgd.getIndex(v).toString)
val rg = rgd(v)
rg.library.foreach(v => builder.setAttribute("LB", v.toString))
rg.platformUnit.foreach(v => builder.setAttribute("PU", v.toString))
})

// set the reference name, and alignment position, for mate
Option(adamRecord.getMateContig)
Expand All @@ -142,9 +140,9 @@ class AlignmentRecordConverter extends Serializable {
.foreach(v => builder.setMateUnmappedFlag(!v.booleanValue))
Option(adamRecord.getProperPair)
.foreach(v => builder.setProperPairFlag(v.booleanValue))
Option(adamRecord.getReadNum == 0)
Option(adamRecord.getReadInFragment == 0)
.foreach(v => builder.setFirstOfPairFlag(v.booleanValue))
Option(adamRecord.getReadNum == 1)
Option(adamRecord.getReadInFragment == 1)
.foreach(v => builder.setSecondOfPairFlag(v.booleanValue))
}
})
Expand Down
Expand Up @@ -23,8 +23,7 @@ import org.apache.spark.Logging
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Fragment,
Sequence
Fragment
}

class FastqRecordConverter extends Serializable with Logging {
Expand Down Expand Up @@ -56,7 +55,7 @@ class FastqRecordConverter extends Serializable with Logging {
.setQual(firstReadQualities)
.setReadPaired(true)
.setProperPair(true)
.setReadNum(0)
.setReadInFragment(0)
.setReadNegativeStrand(null)
.setMateNegativeStrand(null)
.setPrimaryAlignment(null)
Expand All @@ -69,7 +68,7 @@ class FastqRecordConverter extends Serializable with Logging {
.setQual(secondReadQualities)
.setReadPaired(true)
.setProperPair(true)
.setReadNum(1)
.setReadInFragment(1)
.setReadNegativeStrand(null)
.setMateNegativeStrand(null)
.setPrimaryAlignment(null)
Expand Down Expand Up @@ -104,12 +103,12 @@ class FastqRecordConverter extends Serializable with Logging {
// build and return record
Fragment.newBuilder()
.setReadName(firstReadName)
.setSequences(List(Sequence.newBuilder()
.setBases(firstReadSequence)
.setQualities(firstReadQualities)
.build(), Sequence.newBuilder()
.setBases(secondReadSequence)
.setQualities(secondReadQualities)
.setAlignments(List(AlignmentRecord.newBuilder()
.setSequence(firstReadSequence)
.setQual(firstReadQualities)
.build(), AlignmentRecord.newBuilder()
.setSequence(secondReadSequence)
.setQual(secondReadQualities)
.build()))
.build()
}
Expand Down Expand Up @@ -171,7 +170,7 @@ class FastqRecordConverter extends Serializable with Logging {
.setQual(readQualities)
.setReadPaired(setFirstOfPair || setSecondOfPair)
.setProperPair(null)
.setReadNum(
.setReadInFragment(
if (setFirstOfPair) 0
else if (setSecondOfPair) 1
else null
Expand Down
Expand Up @@ -150,10 +150,10 @@ class SAMRecordConverter extends Serializable with Logging {
builder.setProperPair(true)
}
if (samRecord.getFirstOfPairFlag) {
builder.setReadNum(0)
builder.setReadInFragment(0)
}
if (samRecord.getSecondOfPairFlag) {
builder.setReadNum(1)
builder.setReadInFragment(1)
}
}
if (samRecord.getDuplicateReadFlag) {
Expand Down Expand Up @@ -185,17 +185,7 @@ class SAMRecordConverter extends Serializable with Logging {

val recordGroup: SAMReadGroupRecord = samRecord.getReadGroup
if (recordGroup != null) {
Option(recordGroup.getRunDate).foreach(date => builder.setRecordGroupRunDateEpoch(date.getTime))

builder.setRecordGroupName(recordGroup.getReadGroupId)
.setRecordGroupSequencingCenter(recordGroup.getSequencingCenter)
.setRecordGroupDescription(recordGroup.getDescription)
.setRecordGroupFlowOrder(recordGroup.getFlowOrder)
.setRecordGroupKeySequence(recordGroup.getKeySequence)
.setRecordGroupLibrary(recordGroup.getLibrary)
.setRecordGroupPredictedMedianInsertSize(recordGroup.getPredictedMedianInsertSize)
.setRecordGroupPlatform(recordGroup.getPlatform)
.setRecordGroupPlatformUnit(recordGroup.getPlatformUnit)
.setRecordGroupSample(recordGroup.getSample)
}

Expand Down
Expand Up @@ -96,17 +96,17 @@ class ReadBucketSerializer extends Serializer[ReadBucket] {
object ReadBucket {
implicit def singleReadBucketToReadBucket(bucket: SingleReadBucket): ReadBucket = {
// check that reads are either first or second read from fragment
bucket.primaryMapped.foreach(r => require(r.getReadNum >= 0 && r.getReadNum <= 1,
"Read %s is not first or second read from pair (num = %d).".format(r, r.getReadNum)))
bucket.secondaryMapped.foreach(r => require(r.getReadNum >= 0 && r.getReadNum <= 1,
"Read %s is not first or second read from pair (num = %d).".format(r, r.getReadNum)))
bucket.unmapped.foreach(r => require(r.getReadNum >= 0 && r.getReadNum <= 1,
"Read %s is not first or second read from pair (num = %d).".format(r, r.getReadNum)))
bucket.primaryMapped.foreach(r => require(r.getReadInFragment >= 0 && r.getReadInFragment <= 1,
"Read %s is not first or second read from pair (num = %d).".format(r, r.getReadInFragment)))
bucket.secondaryMapped.foreach(r => require(r.getReadInFragment >= 0 && r.getReadInFragment <= 1,
"Read %s is not first or second read from pair (num = %d).".format(r, r.getReadInFragment)))
bucket.unmapped.foreach(r => require(r.getReadInFragment >= 0 && r.getReadInFragment <= 1,
"Read %s is not first or second read from pair (num = %d).".format(r, r.getReadInFragment)))

val (pairedPrimary, unpairedPrimary) = bucket.primaryMapped.partition(_.getReadPaired)
val (pairedFirstPrimary, pairedSecondPrimary) = pairedPrimary.partition(_.getReadNum == 0)
val (pairedFirstPrimary, pairedSecondPrimary) = pairedPrimary.partition(_.getReadInFragment == 0)
val (pairedSecondary, unpairedSecondary) = bucket.secondaryMapped.partition(_.getReadPaired)
val (pairedFirstSecondary, pairedSecondSecondary) = pairedSecondary.partition(_.getReadNum == 0)
val (pairedFirstSecondary, pairedSecondSecondary) = pairedSecondary.partition(_.getReadInFragment == 0)

new ReadBucket(unpairedPrimary,
pairedFirstPrimary,
Expand Down

0 comments on commit f734a78

Please sign in to comment.