Skip to content

Commit

Permalink
[ADAM-1302] Support duplicate marking on Fragments.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Dec 10, 2016
1 parent c8a3e43 commit 53eb0d2
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.adam.cli
import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand All @@ -46,6 +47,8 @@ class Fragments2ReadsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetAr
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false
@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads")
var markDuplicates: Boolean = false

// this is required because of the ADAMSaveAnyArgs trait... fix this trait???
var sortFastqOutput = false
Expand All @@ -54,11 +57,22 @@ class Fragments2ReadsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetAr
class Fragments2Reads(protected val args: Fragments2ReadsArgs) extends BDGSparkCommand[Fragments2ReadsArgs] with Logging {
val companion = Fragments2Reads

def maybeDedupe(reads: FragmentRDD): FragmentRDD = {
if (args.markDuplicates) {
reads.markDuplicates()
} else {
reads
}
}

def run(sc: SparkContext) {
val rdd = sc.loadFragments(args.inputPath)

// should we dedupe the reads?
val maybeDedupedReads = maybeDedupe(rdd)

// save rdd as reads
val readRdd = rdd.toReads
val readRdd = maybeDedupedReads.toReads

// prep to save
val finalRdd = if (args.sortReads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (filesAreQuerynameSorted(filePath)) {
log.info(s"Loading $filePath as queryname sorted SAM/BAM and converting to Fragments.")
sc.hadoopConfiguration.setBoolean(BAMInputFormat.KEEP_PAIRED_READS_TOGETHER_PROPERTY,
true)
true)
loadBam(filePath).querynameSortedToFragments
} else {
log.info(s"Loading $filePath as SAM/BAM and converting to Fragments.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.rdd.fragment

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
RecordGroupDictionary,
ReferenceRegion,
Expand All @@ -28,6 +29,7 @@ import org.bdgenomics.adam.rdd.{ AvroReadGroupGenomicRDD, JavaSaveArgs }
import org.bdgenomics.adam.rdd.read.{
AlignedReadRDD,
AlignmentRecordRDD,
MarkDuplicates,
UnalignedReadRDD
}
import org.bdgenomics.formats.avro._
Expand Down Expand Up @@ -91,6 +93,16 @@ case class FragmentRDD(rdd: RDD[Fragment],
}
}

/**
* Marks reads as possible fragment duplicates.
*
* @return A new RDD where reads have the duplicate read flag set. Duplicate
* reads are NOT filtered out.
*/
def markDuplicates(): FragmentRDD = MarkDuplicatesInDriver.time {
replaceRdd(MarkDuplicates(this))
}

/**
* Saves Fragments to Parquet.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.bdgenomics.adam.models.{
ReferencePosition
}
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment }

private[rdd] object MarkDuplicates extends Serializable with Logging {

Expand Down Expand Up @@ -64,10 +65,21 @@ private[rdd] object MarkDuplicates extends Serializable with Logging {

def apply(rdd: AlignmentRecordRDD): RDD[AlignmentRecord] = {

markBuckets(rdd.groupReadsByFragment(), rdd.recordGroups)
.flatMap(_.allReads)
}

def apply(rdd: FragmentRDD): RDD[Fragment] = {

markBuckets(rdd.rdd.map(f => SingleReadBucket(f)), rdd.recordGroups)
.map(_.toFragment)
}

private def checkRecordGroups(recordGroups: RecordGroupDictionary) {
// do we have record groups where the library name is not set? if so, print a warning message
// to the user, as all record groups without a library name will be treated as coming from
// a single library
val emptyRgs = rdd.recordGroups.recordGroups
val emptyRgs = recordGroups.recordGroups
.filter(_.library.isEmpty)

emptyRgs.foreach(rg => {
Expand All @@ -78,6 +90,11 @@ private[rdd] object MarkDuplicates extends Serializable with Logging {
if (emptyRgs.nonEmpty) {
log.warn("For duplicate marking, all reads whose library is unknown will be treated as coming from the same library.")
}
}

private def markBuckets(rdd: RDD[SingleReadBucket],
recordGroups: RecordGroupDictionary): RDD[SingleReadBucket] = {
checkRecordGroups(recordGroups)

// Group by library and left position
def leftPositionAndLibrary(p: (ReferencePositionPair, SingleReadBucket),
Expand All @@ -94,9 +111,8 @@ private[rdd] object MarkDuplicates extends Serializable with Logging {
p._1.read2refPos
}

rdd.groupReadsByFragment()
.keyBy(ReferencePositionPair(_))
.groupBy(leftPositionAndLibrary(_, rdd.recordGroups))
rdd.keyBy(ReferencePositionPair(_))
.groupBy(leftPositionAndLibrary(_, recordGroups))
.flatMap(kv => PerformDuplicateMarking.time {

val leftPos: Option[ReferencePosition] = kv._1._1
Expand Down Expand Up @@ -140,10 +156,8 @@ private[rdd] object MarkDuplicates extends Serializable with Logging {
})
}

readsAtLeftPos.flatMap(read => { read._2.allReads })

readsAtLeftPos.map(_._2)
})

}

private object ScoreOrdering extends Ordering[(ReferencePositionPair, SingleReadBucket)] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ private[read] object SingleReadBucket extends Logging {
fromGroupedReads(reads)
})
}

def apply(fragment: Fragment): SingleReadBucket = {
fromGroupedReads(fragment.getAlignments.toIterable)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,107 @@ class MarkDuplicatesSuite extends ADAMFunSuite {
assert(dups.forall(p => p.getReadName.startsWith("poor")))
}

private def markDuplicateFragments(reads: AlignmentRecord*): Array[AlignmentRecord] = {
AlignedReadRDD(sc.parallelize(reads), SequenceDictionary.empty, rgd)
.toFragments
.markDuplicates()
.toReads
.rdd
.collect()
}

sparkTest("single fragment") {
val read = createMappedRead("0", 100, 200)
val marked = markDuplicateFragments(read)
// Can't have duplicates with a single read, should return the read unchanged.
assert(marked(0) == read)
}

sparkTest("fragments at different positions") {
val read1 = createMappedRead("0", 42, 142)
val read2 = createMappedRead("0", 43, 143)
val marked = markDuplicateFragments(read1, read2)
// Reads shouldn't be modified
assert(marked.contains(read1) && marked.contains(read2))
}

sparkTest("fragments at the same position") {
val poorReads = for (i <- 0 until 10) yield {
createMappedRead("1", 42, 142, avgPhredScore = 20, readName = "poor%d".format(i))
}
val bestRead = createMappedRead("1", 42, 142, avgPhredScore = 30, readName = "best")
val marked = markDuplicateFragments(List(bestRead) ++ poorReads: _*)
val (dups, nonDup) = marked.partition(p => p.getDuplicateRead)
assert(nonDup.size == 1 && nonDup(0) == bestRead)
assert(dups.forall(p => p.getReadName.startsWith("poor")))
}

sparkTest("fragments at the same position with clipping") {
val poorClippedReads = for (i <- 0 until 5) yield {
createMappedRead("1", 44, 142, numClippedBases = 2, avgPhredScore = 20, readName = "poorClipped%d".format(i))
}
val poorUnclippedReads = for (i <- 0 until 5) yield {
createMappedRead("1", 42, 142, avgPhredScore = 20, readName = "poorUnclipped%d".format(i))
}
val bestRead = createMappedRead("1", 42, 142, avgPhredScore = 30, readName = "best")
val marked = markDuplicateFragments(List(bestRead) ++ poorClippedReads ++ poorUnclippedReads: _*)
val (dups, nonDup) = marked.partition(p => p.getDuplicateRead)
assert(nonDup.size == 1 && nonDup(0) == bestRead)
assert(dups.forall(p => p.getReadName.startsWith("poor")))
}

sparkTest("fragments on reverse strand") {
val poorReads = for (i <- 0 until 7) yield {
createMappedRead("10", 42, 142, isNegativeStrand = true, avgPhredScore = 20, readName = "poor%d".format(i))
}
val bestRead = createMappedRead("10", 42, 142, isNegativeStrand = true, avgPhredScore = 30, readName = "best")
val marked = markDuplicateFragments(List(bestRead) ++ poorReads: _*)
val (dups, nonDup) = marked.partition(p => p.getDuplicateRead)
assert(nonDup.size == 1 && nonDup(0) == bestRead)
assert(dups.forall(p => p.getReadName.startsWith("poor")))
}

sparkTest("unmapped fragments") {
val unmappedReads = for (i <- 0 until 10) yield createUnmappedRead()
val marked = markDuplicateFragments(unmappedReads: _*)
assert(marked.size == unmappedReads.size)
// Unmapped reads should never be marked duplicates
assert(marked.forall(p => !p.getDuplicateRead))
}

sparkTest("read pairs as fragments") {
val poorPairs = for (
i <- 0 until 10;
read <- createPair("0", 10, 110, "0", 110, 210, avgPhredScore = 20, readName = "poor%d".format(i))
) yield read
val bestPair = createPair("0", 10, 110, "0", 110, 210, avgPhredScore = 30, readName = "best")
val marked = markDuplicateFragments(bestPair ++ poorPairs: _*)
val (dups, nonDups) = marked.partition(_.getDuplicateRead)
assert(nonDups.size == 2 && nonDups.forall(p => p.getReadName.toString == "best"))
assert(dups.forall(p => p.getReadName.startsWith("poor")))
}

sparkTest("read pairs with fragments as fragments") {
val fragments = for (i <- 0 until 10) yield {
createMappedRead("2", 33, 133, avgPhredScore = 40, readName = "fragment%d".format(i))
}
// Even though the phred score is lower, pairs always score higher than fragments
val pairs = createPair("2", 33, 133, "2", 100, 200, avgPhredScore = 20, readName = "pair")
val marked = markDuplicateFragments(fragments ++ pairs: _*)
val (dups, nonDups) = marked.partition(_.getDuplicateRead)
assert(nonDups.size == 2 && nonDups.forall(p => p.getReadName.toString == "pair"))
assert(dups.size == 10 && dups.forall(p => p.getReadName.startsWith("fragment")))
}

sparkTest("chimeric fragments") {
val poorPairs = for (
i <- 0 until 10;
read <- createPair("ref0", 10, 110, "ref1", 110, 210, avgPhredScore = 20, readName = "poor%d".format(i))
) yield read
val bestPair = createPair("ref0", 10, 110, "ref1", 110, 210, avgPhredScore = 30, readName = "best")
val marked = markDuplicateFragments(bestPair ++ poorPairs: _*)
val (dups, nonDups) = marked.partition(_.getDuplicateRead)
assert(nonDups.size == 2 && nonDups.forall(p => p.getReadName.toString == "best"))
assert(dups.forall(p => p.getReadName.startsWith("poor")))
}
}

0 comments on commit 53eb0d2

Please sign in to comment.