Skip to content

Commit

Permalink
[ADAM-1539] Support region predicate in TransformAlignments.
Browse files Browse the repository at this point in the history
Resolves #1539.
  • Loading branch information
fnothaft committed Jul 7, 2017
1 parent bd6df07 commit 9f339d7
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.SnpTable
import org.bdgenomics.adam.models.{ ReferenceRegion, SnpTable }
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Filter }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
Expand All @@ -49,8 +49,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var outputPath: String = null
@Args4jOption(required = false, name = "-limit_projection", usage = "Only project necessary fields. Only works for Parquet files.")
var limitProjection: Boolean = false
@Args4jOption(required = false, name = "-aligned_read_predicate", usage = "Only load aligned reads. Only works for Parquet files.")
@Args4jOption(required = false, name = "-aligned_read_predicate", usage = "Only load aligned reads. Only works for Parquet files. Exclusive of region predicate.")
var useAlignedReadPredicate: Boolean = false
@Args4jOption(required = false, name = "-region_predicate", usage = "Only load a specific range of regions. Mutually exclusive with aligned read predicate.")
var regionPredicate: String = null
@Args4jOption(required = false, name = "-sort_reads", usage = "Sort the reads by referenceId and read position")
var sortReads: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
Expand Down Expand Up @@ -426,10 +428,20 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
"-limit_projection only applies to Parquet files, but a non-Parquet input path was specified."
)
}
if (args.useAlignedReadPredicate && args.regionPredicate != null) {
throw new IllegalArgumentException(
"-aligned_read_predicate and -region_predicate are mutually exclusive"
)
}

val aRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
if (args.regionPredicate != null) {
val loci = ReferenceRegion.fromString(args.regionPredicate)
sc.loadIndexedBam(args.inputPath, loci)
} else {
sc.loadBam(args.inputPath)
}
} else if (args.forceLoadFastq) {
sc.loadFastq(args.inputPath, Option(args.pairedFastqFile), Option(args.fastqRecordGroup), stringency)
} else if (args.forceLoadIFastq) {
Expand All @@ -439,6 +451,10 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
args.limitProjection) {
val pred = if (args.useAlignedReadPredicate) {
Some(BooleanColumn("readMapped") === true)
} else if (args.regionPredicate != null) {
Some(ReferenceRegion.createPredicate(
ReferenceRegion.fromString(args.regionPredicate).toSeq: _*
))
} else {
None
}
Expand All @@ -454,12 +470,19 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
optPredicate = pred,
optProjection = proj)
} else {
sc.loadAlignments(
val loadedReads = sc.loadAlignments(
args.inputPath,
optPathName2 = Option(args.pairedFastqFile),
optRecordGroup = Option(args.fastqRecordGroup),
stringency = stringency
)

if (args.regionPredicate != null) {
val loci = ReferenceRegion.fromString(args.regionPredicate)
loadedReads.filterByOverlappingRegions(loci)
} else {
loadedReads
}
}
val rdd = aRdd.rdd
val sd = aRdd.sequences
Expand Down
Binary file added adam-cli/src/test/resources/sorted.bam
Binary file not shown.
Binary file added adam-cli/src/test/resources/sorted.bam.bai
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,24 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
assert(qualityScoreCounts(30) === 92899)
assert(qualityScoreCounts(10) === 7101)
}

sparkTest("run locus predicate") {
// alas, copy resource does not work here...
val inputPath = testFile("sorted.bam")
val outputPath1 = tmpFile("predicate.1.adam")
val outputPath2 = tmpFile("predicate.2.adam")
val outputPath3 = tmpFile("predicate.3.adam")
TransformAlignments(Array(inputPath, outputPath1,
"-locus_predicate", "1:0-200,chr2:0-1000",
"-force_load_bam")).run(sc)
TransformAlignments(Array(outputPath1, outputPath2,
"-locus_predicate", "chr2:0-1000",
"-force_load_parquet")).run(sc)
TransformAlignments(Array(outputPath2, outputPath3,
"-locus_predicate", "chr2:0-400")).run(sc)

assert(sc.loadAlignments(outputPath1).rdd.count === 3)
assert(sc.loadAlignments(outputPath2).rdd.count === 2)
assert(sc.loadAlignments(outputPath3).rdd.count === 1)
}
}
5 changes: 5 additions & 0 deletions adam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@
<artifactId>utils-intervalrdd_${scala.version.prefix}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>genomic-loci_${scala.version.prefix}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.interval.array.Interval
import org.hammerlab.genomics.loci.parsing.{
All,
LociRange,
LociRanges,
ParsedLoci
}
import scala.math.{ max, min }

trait ReferenceOrdering[T <: ReferenceRegion] extends Ordering[T] {
Expand Down Expand Up @@ -86,6 +92,35 @@ object ReferenceRegion {
implicit def orderingForPositions = RegionOrdering
implicit def orderingForOptionalPositions = OptionalRegionOrdering

/**
* Parses a set of comma delimited loci from a string.
*
* Acceptable strings include:
* - ctg:start-end
* - ctg:pos
*
* @param loci The string describing the loci to create reference regions for.
* @return Returns an iterable collection of reference regions.
*/
def fromString(loci: String): Iterable[ReferenceRegion] = {

ParsedLoci(loci) match {
case All => {
throw new IllegalArgumentException("Unsupported value 'all'")
}
case LociRanges(ranges) => {
ranges.map(range => range match {
case LociRange(contigName, start, None) => {
ReferencePosition(contigName, start)
}
case LociRange(contigName, start, Some(end)) => {
ReferenceRegion(contigName, start, end)
}
})
}
}
}

/**
* Creates a reference region that starts at the beginning of a contig.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,9 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {

val regions = getReferenceRegions(elem)

querys.map(query => {
querys.exists(query => {
regions.exists(_.overlaps(query))
}).fold(false)((a, b) => a || b)
})
}), optPartitionMap)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@ class ReferenceRegionSuite extends FunSuite {
}
}

test("can't parse all locus") {
intercept[IllegalArgumentException] {
ReferenceRegion.fromString("all")
}
intercept[IllegalArgumentException] {
ReferenceRegion.fromString("1:100-200,all")
}
}

test("parse string into reference regions") {
val loci = ReferenceRegion.fromString("1:100,2:1000-2000")
assert(loci.size === 2)
val ctg1 = loci.filter(_.referenceName == "1")
assert(ctg1.size === 1)
assert(ctg1.head.start === 100L)
assert(ctg1.head.end === 101L)
val ctg2 = loci.filter(_.referenceName == "2")
assert(ctg2.size === 1)
assert(ctg2.head.start === 1000L)
assert(ctg2.head.end === 2000L)
}

test("contains(: ReferenceRegion)") {
assert(region("chr0", 10, 100).contains(region("chr0", 50, 70)))
assert(region("chr0", 10, 100).contains(region("chr0", 10, 100)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,4 +495,16 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
assert(collect(0).getSequence() === "ACTGTAC")
assert(collect(1).getSequence() === "GTACTCTCATG")
}

sparkTest("save as parquet and apply predicate pushdown") {
val fragments1 = sc.loadFasta(testFile("HLA_DQB1_05_01_01_02.fa"), 1000L)
assert(fragments1.rdd.count === 8)
val output = tmpFile("contigs.adam")
fragments1.saveAsParquet(output)
val fragments2 = sc.loadContigFragments(output)
assert(fragments2.rdd.count === 8)
val fragments3 = sc.loadContigFragments(output,
optPredicate = Some(ReferenceRegion("HLA-DQB1*05:01:01:02", 500L, 1500L).toPredicate))
assert(fragments3.rdd.count === 2)
}
}
8 changes: 8 additions & 0 deletions docs/source/50_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ These options fall into several general categories:
not load the `attributes` or `origQual` fields of the `AlignmentRecord`.
* `-aligned_read_predicate`: If loading as Parquet, only loads aligned
reads.
* `-region_predicate`: A string indicating that reads should be filtered on
overlapping a genomic position or range. This argument takes a comma
separated list, where each element in the list takes the form:
* `contig:pos` for a single genomic position, or
* `contig:start-end` for a genomic range with closed start and open end
E.g., `-region_predicate 1:100,2:1000-2000` would filter all reads that
overlapped either position 100 on `1` or the range from 1,000 to 2,000
on `2`.
* `-concat`: Provides a path to an optional second file to load, which is
then concatenated to the file given as the `INPUT` path.
* Duplicate marking options: Duplicate marking is run with the
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>genomic-loci_${scala.version.prefix}</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>org.bdgenomics.utils</groupId>
<artifactId>utils-cli_${scala.version.prefix}</artifactId>
Expand Down

0 comments on commit 9f339d7

Please sign in to comment.