Skip to content

Commit

Permalink
Merge pull request #9 from ryan-williams/spark-bam
Browse files Browse the repository at this point in the history
upgrade deps, use spark-bam release
  • Loading branch information
ryan-williams committed Nov 27, 2017
2 parents 961b7b7 + ddb69cf commit e2d1940
Show file tree
Hide file tree
Showing 24 changed files with 233 additions and 219 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Expand Up @@ -4,9 +4,9 @@ jdk:
- oraclejdk8

scala:
- 2.11.8
- 2.11.11

script: sbt ++$TRAVIS_SCALA_VERSION clean test
script: sbt ++$TRAVIS_SCALA_VERSION clean coverageTest

cache:
directories:
Expand Down
34 changes: 17 additions & 17 deletions build.sbt
@@ -1,33 +1,33 @@
organization := "org.hammerlab.genomics"
name := "readsets"
version := "1.0.5"
version := "1.1.0-SNAPSHOT"

addSparkDeps

deps ++= Seq(
libs.value('adam_core),
libs.value('args4j),
libs.value('args4s),
libs.value('hadoop_bam),
libs.value('iterators),
libs.value('htsjdk),
libs.value('loci),
libs.value('magic_rdds),
libs.value('paths),
libs.value('slf4j),
libs.value('spark_util)
adam % "0.23.2",
args4j,
args4s % "1.3.0",
bytes % "1.1.0",
iterators % "2.0.0",
htsjdk,
loci % "2.0.1",
paths % "1.4.0",
slf4j,
spark_bam % "1.0.0",
spark_util % "2.0.1"
)

compileAndTestDeps ++= Seq(
libs.value('reads),
libs.value('reference)
reads % "1.0.6",
reference % "1.4.0"
)

testDeps += genomic_utils % "1.3.1"

// org.hammerlab.genomics:reads::tests uses org.hammerlab.genomics:utils::{compile,test}, but test-JAR deps don't
// propagate trans-deps like non-classified ones.

testDeps += libs.value('genomic_utils)
testJarTestDeps += libs.value('genomic_utils)
testTestDeps += genomic_utils % "1.3.1"

publishTestJar
takeFirstLog4JProperties
2 changes: 1 addition & 1 deletion project/plugins.sbt
@@ -1 +1 @@
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.6")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.4.3")
115 changes: 48 additions & 67 deletions src/main/scala/org/hammerlab/genomics/readsets/ReadSets.scala
@@ -1,25 +1,22 @@
package org.hammerlab.genomics.readsets

import org.apache.hadoop.fs.{ Path HPath }
import grizzled.slf4j.Logging
import hammerlab.path._
import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.SequenceDictionary
import org.bdgenomics.adam.rdd.ADAMContext
import org.hammerlab.bam
import org.hammerlab.genomics.loci.parsing.All
import org.hammerlab.genomics.loci.set.LociSet
import org.hammerlab.genomics.reads.Read
import org.hammerlab.genomics.readsets.args.base.Base
import org.hammerlab.genomics.readsets.io.{ Input, InputConfig, Sample }
import org.hammerlab.genomics.readsets.io.{ Config, Input, Sample }
import org.hammerlab.genomics.readsets.rdd.ReadsRDD
import org.hammerlab.genomics.reference.{ ContigLengths, ContigName, Locus }
import org.hammerlab.paths.Path
import org.seqdoop.hadoop_bam.util.SAMHeaderReader.readSAMHeaderFrom
import org.seqdoop.hadoop_bam.{ AnySAMInputFormat, BAMInputFormat, SAMRecordWritable }

import scala.collection.JavaConversions.seqAsJavaList
import org.hammerlab.hadoop.Configuration
import spark_bam._


/**
Expand All @@ -28,17 +25,13 @@ import scala.collection.JavaConversions.seqAsJavaList
*/
case class ReadSets(readsRDDs: PerSample[ReadsRDD],
sequenceDictionary: SequenceDictionary,
contigLengths: ContigLengths)
extends PerSample[ReadsRDD] {
contigLengths: ContigLengths) {

def samples: PerSample[Sample] = readsRDDs.map(_.sample)

def numSamples: NumSamples = readsRDDs.length
def sampleNames: PerSample[String] = samples.map(_.name)

override def length: NumSamples = readsRDDs.length
override def apply(sampleId: SampleId): ReadsRDD = readsRDDs(sampleId)

def sc = readsRDDs.head.reads.sparkContext

lazy val mappedReadsRDDs = readsRDDs.map(_.mappedReads)
Expand All @@ -56,6 +49,8 @@ case class ReadSets(readsRDDs: PerSample[ReadsRDD],

object ReadSets extends Logging {

implicit def toRDDs(readsets: ReadSets): PerSample[ReadsRDD] = readsets.readsRDDs

def apply(sc: SparkContext, args: Base)(implicit cf: ContigName.Factory): (ReadSets, LociSet) = {
val config = args.parseConfig(sc.hadoopConfiguration)
val readsets = apply(sc, args.inputs, config, !args.noSequenceDictionary)
Expand All @@ -67,25 +62,26 @@ object ReadSets extends Logging {
*/
def apply(sc: SparkContext,
inputs: Inputs,
config: InputConfig,
config: Config,
contigLengthsFromDictionary: Boolean = true)(implicit cf: ContigName.Factory): ReadSets =
apply(sc, inputs.map((_, config)), contigLengthsFromDictionary)

/**
* Load reads from multiple files, allowing different filters to be applied to each file.
*/
def apply(sc: SparkContext,
inputsAndFilters: PerSample[(Input, InputConfig)],
inputsAndFilters: PerSample[(Input, Config)],
contigLengthsFromDictionary: Boolean)(implicit cf: ContigName.Factory): ReadSets = {

val (inputs, _) = inputsAndFilters.unzip

val (readsRDDs, sequenceDictionaries) =
(for {
(Input(id, _, path), config) <- inputsAndFilters
(Input(id, _, path), config) inputsAndFilters
} yield
load(path, sc, id, config)
).unzip
)
.unzip

val sequenceDictionary = mergeSequenceDictionaries(inputs, sequenceDictionaries)

Expand All @@ -95,14 +91,14 @@ object ReadSets extends Logging {
else
sc.union(readsRDDs)
.flatMap(_.asMappedRead)
.map(read => read.contigName -> read.end)
.map(read read.contigName read.end)
.reduceByKey(_ max _)
.collectAsMap()
.toMap

ReadSets(
(for {
(reads, input) <- readsRDDs.zip(inputs)
(reads, input) readsRDDs.zip(inputs)
} yield
ReadsRDD(reads, input)
)
Expand Down Expand Up @@ -131,7 +127,7 @@ object ReadSets extends Logging {
private[readsets] def load(path: Path,
sc: SparkContext,
sampleId: Int,
config: InputConfig)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {
config: Config)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {

val (allReads, sequenceDictionary) =
if (path.toString.endsWith(".bam") || path.toString.endsWith(".sam"))
Expand All @@ -148,53 +144,38 @@ object ReadSets extends Logging {
private def loadFromBAM(path: Path,
sc: SparkContext,
sampleId: Int,
config: InputConfig)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {
config: Config)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {

val basename = path.basename
val shortName = basename.substring(0, math.min(basename.length, 100))
implicit val conf: Configuration = sc.hadoopConfiguration

val conf = sc.hadoopConfiguration
val samHeader = readSAMHeaderFrom(new HPath(path.toUri), conf)
val sequenceDictionary = SequenceDictionary(samHeader)
val contigLengths = bam.header.ContigLengths(path)

config
.maxSplitSizeOpt
.foreach(
maxSplitSize =>
conf.set(FileInputFormat.SPLIT_MAXSIZE, maxSplitSize.toString)
)
val sequenceDictionary = SequenceDictionary(contigLengths)

config
.overlapsLociOpt
.fold(conf.unset(BAMInputFormat.INTERVALS_PROPERTY))(
overlapsLoci
path.extension match {
case "bam"
val contigLengths = getContigLengths(sequenceDictionary)

val bamIndexIntervals =
val reads =
config
.overlapsLoci
.filterNot(_ == All)
.map(
loci
sc
.loadBamIntervals(
path,
LociSet(
overlapsLoci,
contigLengths
)
.toHtsJDKIntervals

BAMInputFormat.setIntervals(conf, bamIndexIntervals)
case "sam"
warn(s"Loading SAM file: $path with intervals specified. This requires parsing the entire file.")
case _
throw new IllegalArgumentException(s"File $path is not a BAM or SAM file")
}
)

val reads: RDD[Read] =
sc
.newAPIHadoopFile[LongWritable, SAMRecordWritable, AnySAMInputFormat](path.toString)
.setName(s"Hadoop file: $shortName")
.values
.setName(s"Hadoop reads: $shortName")
.map(r => Read(r.get))
.setName(s"Guac reads: $shortName")
loci,
contigLengths.values.toMap
),
splitSize = config.maxSplitSize
)
)
.getOrElse(
sc
.loadReads(
path,
splitSize = config.maxSplitSize
)
)
.map(Read(_))

(reads, sequenceDictionary)
}
Expand All @@ -203,7 +184,7 @@ object ReadSets extends Logging {
private def loadFromADAM(path: Path,
sc: SparkContext,
sampleId: Int,
config: InputConfig)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {
config: Config)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {

logger.info(s"Using ADAM to read: $path")

Expand Down Expand Up @@ -276,7 +257,7 @@ object ReadSets extends Logging {
/**
* Apply filters to an RDD of reads.
*/
private def filterRDD(reads: RDD[Read], config: InputConfig, sequenceDictionary: SequenceDictionary): RDD[Read] = {
private def filterRDD(reads: RDD[Read], config: Config, sequenceDictionary: SequenceDictionary): RDD[Read] = {
/* Note that the InputFilter properties are public, and some loaders directly apply
* the filters as the reads are loaded, instead of filtering an existing RDD as we do here. If more filters
* are added, be sure to update those implementations.
Expand All @@ -287,7 +268,7 @@ object ReadSets extends Logging {
var result = reads

config
.overlapsLociOpt
.overlapsLoci
.foreach { overlapsLoci
val contigLengths = getContigLengths(sequenceDictionary)
val loci = LociSet(overlapsLoci, contigLengths)
Expand All @@ -299,7 +280,7 @@ object ReadSets extends Logging {
if (config.passedVendorQualityChecks) result = result.filter(!_.failedVendorQualityChecks)
if (config.isPaired) result = result.filter(_.isPaired)

config.minAlignmentQualityOpt.foreach(
config.minAlignmentQuality.foreach(
minAlignmentQuality
result =
result.filter(
Expand Down
19 changes: 0 additions & 19 deletions src/main/scala/org/hammerlab/genomics/readsets/Registrar.scala

This file was deleted.

@@ -1,8 +1,8 @@
package org.hammerlab.genomics.readsets.args.base

import hammerlab.path._
import org.hammerlab.genomics.readsets.Inputs
import org.hammerlab.genomics.readsets.io.{ Input, ReadFilters, Sample }
import org.hammerlab.paths.Path

trait InputArgs {
def paths: Array[Path]
Expand Down
@@ -1,6 +1,6 @@
package org.hammerlab.genomics.readsets.args.base

import org.hammerlab.paths.Path
import hammerlab.path._

trait HasReference {
def referencePath: Path
Expand Down
@@ -1,7 +1,7 @@
package org.hammerlab.genomics.readsets.args.base

import org.hammerlab.genomics.readsets.args.path.{ PathPrefix, UnprefixedPath }
import org.hammerlab.paths.Path
import hammerlab.path._

trait PrefixedPathsBase
extends Base {
Expand Down
@@ -1,8 +1,8 @@
package org.hammerlab.genomics.readsets.args.impl

import hammerlab.path._
import org.hammerlab.genomics.readsets.args.base.{ HasReference, PrefixedPathsBase }
import org.hammerlab.genomics.readsets.args.path.{ UnprefixedPath, UnprefixedPathHandler }
import org.hammerlab.paths.Path
import org.kohsuke.args4j.{ Option Args4jOption }

trait ReferenceArgs
Expand Down
@@ -1,9 +1,9 @@
package org.hammerlab.genomics.readsets.args.impl

import hammerlab.path._
import org.hammerlab.genomics.readsets.args.base.{ Base, PrefixedPathsBase }
import org.hammerlab.genomics.readsets.args.path.{ UnprefixedPath, UnprefixedPathHandler }
import org.hammerlab.genomics.readsets.io.ReadFilterArgs
import org.hammerlab.paths.Path
import org.kohsuke.args4j.{ Option Args4jOption }

/** Argument for accepting a single set of reads (for e.g. germline variant calling). */
Expand Down
Expand Up @@ -6,7 +6,7 @@ import org.kohsuke.args4j.{ CmdLineParser, OptionDef }

/**
* Type-class for a path-prefix that can be added by default to [[UnprefixedPath]]s to generate
* [[org.hammerlab.paths.Path]]s.
* [[hammerlab.path.Path]]s.
*/
case class PathPrefix(value: String)

Expand Down
@@ -1,13 +1,13 @@
package org.hammerlab.genomics.readsets.args.path

import hammerlab.path._
import org.hammerlab.args4s.{ Handler, OptionHandler }
import org.hammerlab.paths.Path
import org.kohsuke.args4j.spi.Setter
import org.kohsuke.args4j.{ CmdLineParser, OptionDef }

/**
* Type-class for a path-string that must be joined against an optional [[PathPrefix]] to generate a
* [[org.hammerlab.paths.Path]].
* [[hammerlab.path.Path]].
*/
case class UnprefixedPath(value: String) {
def buildPath(implicit prefixOpt: Option[PathPrefix]): Path = {
Expand Down

0 comments on commit e2d1940

Please sign in to comment.