Skip to content

Commit

Permalink
port args to caseapp
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Mar 20, 2018
1 parent 210ba06 commit 66daaec
Show file tree
Hide file tree
Showing 33 changed files with 322 additions and 430 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Expand Up @@ -4,7 +4,7 @@ jdk:
- oraclejdk8

scala:
- 2.11.11
- 2.11.12

script: sbt ++$TRAVIS_SCALA_VERSION clean coverageTest

Expand All @@ -14,7 +14,7 @@ cache:
- $HOME/.sbt/boot/
- $HOME/.zinc

after_success: sbt ++$TRAVIS_SCALA_VERSION travis-report
after_success: bash <(curl -s https://codecov.io/bash)

before_cache:
# Tricks to avoid unnecessary cache updates
Expand Down
38 changes: 16 additions & 22 deletions build.sbt
@@ -1,31 +1,25 @@
organization := "org.hammerlab.genomics"
name := "readsets"
r"1.2.0"
subgroup("genomics", "readsets")
v"1.2.1"
github.repo("genomic-readsets")

addSparkDeps

dep(
adam % "0.23.2" ,
args4j ,
args4s % "1.3.0" ,
bytes % "1.1.0" ,
iterators % "2.0.0" ,
genomic_utils % "1.3.1" % tests ,
htsjdk ,
loci % "2.0.1" ,
paths % "1.4.0" ,
reads % "1.0.6" + testtest ,
reference % "1.4.0" + testtest ,
slf4j ,
spark_bam % "1.1.0" ,
spark_util % "2.0.1"
)
adam % "0.23.2" ,
bytes % "1.2.0" ,
iterators % "2.1.0" ,
genomics. loci % "2.1.0" ,
genomics. reads % "1.0.7" + testtest ,
genomics.reference % "1.4.3" + testtest ,

dep(
// org.hammerlab.genomics:reads::tests uses org.hammerlab.genomics:utils::{compile,test}, but test-JAR deps don't
// propagate trans-deps like non-classified ones.
genomic_utils % "1.3.1" + testtest
// org.hammerlab.genomics:reads::tests uses org.hammerlab.genomics:utils::{compile,test}, but test-scoped deps don't
// transit like compile-scoped ones / like you'd expect them to.
genomics. utils % "1.3.1" % tests +testtest ,
htsjdk ,
paths % "1.5.0" ,
slf4j ,
spark_bam % "1.2.0-M1".snapshot ,
spark_util % "2.0.4"
)

publishTestJar
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
@@ -1 +1 @@
sbt.version=1.0.4
sbt.version=1.1.1
2 changes: 1 addition & 1 deletion project/plugins.sbt
@@ -1 +1 @@
addSbtPlugin("org.hammerlab.sbt" % "base" % "4.0.1")
addSbtPlugin("org.hammerlab.sbt" % "base" % "4.4.3")
107 changes: 69 additions & 38 deletions src/main/scala/org/hammerlab/genomics/readsets/ReadSets.scala
Expand Up @@ -11,14 +11,13 @@ import org.hammerlab.bam
import org.hammerlab.genomics.loci.parsing.All
import org.hammerlab.genomics.loci.set.LociSet
import org.hammerlab.genomics.reads.Read
import org.hammerlab.genomics.readsets.args.base.Base
import org.hammerlab.genomics.readsets.args.Base
import org.hammerlab.genomics.readsets.io.{ Config, Input, Sample }
import org.hammerlab.genomics.readsets.rdd.ReadsRDD
import org.hammerlab.genomics.reference.{ ContigLengths, ContigName, Locus }
import org.hammerlab.hadoop.Configuration
import org.hammerlab.spark.Context
import spark_bam._


/**
* A [[ReadSets]] contains reads from multiple inputs as well as [[SequenceDictionary]] / contig-length information
* merged from them.
Expand Down Expand Up @@ -51,35 +50,60 @@ object ReadSets extends Logging {

implicit def toRDDs(readsets: ReadSets): PerSample[ReadsRDD] = readsets.readsRDDs

def apply(sc: SparkContext, args: Base)(implicit cf: ContigName.Factory): (ReadSets, LociSet) = {
val config = args.parseConfig(sc.hadoopConfiguration)
val readsets = apply(sc, args.inputs, config, !args.noSequenceDictionary)
(readsets, LociSet(config.loci, readsets.contigLengths))
import hammerlab.shapeless._
def apply(args: Base)(
implicit
sc: SparkContext,
cf: ContigName.Factory
): (ReadSets, LociSet) = {
val config = args.readFilterArgs.parseConfig(sc.hadoopConfiguration)
val readsets =
apply(
args.inputs,
config,
!args.noSequenceDictionaryArgs.noSequenceDictionary
)
(
readsets,
LociSet(
config.loci,
readsets.contigLengths
)
)
}

/**
* Load reads from multiple files, merging their sequence dictionaries and verifying that they are consistent.
*/
def apply(sc: SparkContext,
inputs: Inputs,
def apply(inputs: Inputs,
config: Config,
contigLengthsFromDictionary: Boolean = true)(implicit cf: ContigName.Factory): ReadSets =
apply(sc, inputs.map((_, config)), contigLengthsFromDictionary)
contigLengthsFromDictionary: Boolean = true)(
implicit
sc: SparkContext,
cf: ContigName.Factory
): ReadSets =
apply(
inputs.map((_, config)),
contigLengthsFromDictionary
)

/**
* Load reads from multiple files, allowing different filters to be applied to each file.
*/
def apply(sc: SparkContext,
inputsAndFilters: PerSample[(Input, Config)],
contigLengthsFromDictionary: Boolean)(implicit cf: ContigName.Factory): ReadSets = {
def apply(inputsAndFilters: PerSample[(Input, Config)],
contigLengthsFromDictionary: Boolean)(
implicit
sc: SparkContext,
cf: ContigName.Factory
): ReadSets = {

val (inputs, _) = inputsAndFilters.unzip

val (readsRDDs, sequenceDictionaries) =
(for {
(Input(id, _, path), config) inputsAndFilters
} yield
load(path, sc, id, config)
load(path, id, config)
)
.unzip

Expand Down Expand Up @@ -125,15 +149,18 @@ object ReadSets extends Logging {
* @return
*/
private[readsets] def load(path: Path,
sc: SparkContext,
sampleId: Int,
config: Config)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {
config: Config)(
implicit
sc: SparkContext,
cf: ContigName.Factory
): (RDD[Read], SequenceDictionary) = {

val (allReads, sequenceDictionary) =
if (path.toString.endsWith(".bam") || path.toString.endsWith(".sam"))
loadFromBAM(path, sc, sampleId, config)
loadFromBAM(path, sampleId, config)
else
loadFromADAM(path, sc, sampleId, config)
loadFromADAM(path, sampleId, config)

val reads = filterRDD(allReads, config, sequenceDictionary)

Expand All @@ -142,55 +169,59 @@ object ReadSets extends Logging {

/** Returns an RDD of Reads and SequenceDictionary from reads in BAM format **/
private def loadFromBAM(path: Path,
sc: SparkContext,
sampleId: Int,
config: Config)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {

implicit val conf: Configuration = sc.hadoopConfiguration
config: Config)(
implicit
sc: Context,
cf: ContigName.Factory
): (RDD[Read], SequenceDictionary) = {

val contigLengths = bam.header.ContigLengths(path)

val sequenceDictionary = SequenceDictionary(contigLengths)

implicit val splitSize = config.maxSplitSize

val reads =
config
.overlapsLoci
.filterNot(_ == All)
.map(
.fold {
sc
.loadReads(
path,
splitSize = config.maxSplitSize
)
} {
loci
sc
.loadBamIntervals(
path,
LociSet(
loci,
contigLengths.values.toMap
),
splitSize = config.maxSplitSize
)
)
)
.getOrElse(
sc
.loadReads(
path,
splitSize = config.maxSplitSize
)
)
}
.map(Read(_))

(reads, sequenceDictionary)
}

/** Returns an RDD of Reads and SequenceDictionary from reads in ADAM format **/
private def loadFromADAM(path: Path,
sc: SparkContext,
sampleId: Int,
config: Config)(implicit cf: ContigName.Factory): (RDD[Read], SequenceDictionary) = {
config: Config)(
implicit
sc: SparkContext,
cf: ContigName.Factory
): (RDD[Read], SequenceDictionary) = {

logger.info(s"Using ADAM to read: $path")

val adamContext: ADAMContext = sc
import ADAMContext._

val alignmentRDD = adamContext.loadAlignments(path, stringency = ValidationStringency.LENIENT)
val alignmentRDD = sc.loadAlignments(path, stringency = ValidationStringency.LENIENT)

val sequenceDictionary = alignmentRDD.sequences

Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/org/hammerlab/genomics/readsets/args/Base.scala
@@ -0,0 +1,36 @@
package org.hammerlab.genomics.readsets.args

import hammerlab.path._
import org.hammerlab.genomics.readsets.Inputs
import org.hammerlab.genomics.readsets.args.NoSequenceDictionaryArgs
import org.hammerlab.genomics.readsets.io.{ Input, ReadFilterArgs, Sample }

trait InputArgs {
self: Base

def paths: Array[Path]

def sampleNames: Array[Sample.Name]

lazy val inputs: Inputs =
paths
.indices
.map {
i
Input(
i,
if (i < sampleNames.length)
sampleNames(i)
else
paths(i).toString,
paths(i)
)
}
}

trait Base
extends InputArgs{
def readFilterArgs: ReadFilterArgs
def noSequenceDictionaryArgs: NoSequenceDictionaryArgs
def inputs: Inputs
}
@@ -0,0 +1,10 @@
package org.hammerlab.genomics.readsets.args

import caseapp.{ HelpMessage M }

/** Argument for using / not using sequence dictionaries to get contigs and lengths. */
case class NoSequenceDictionaryArgs(
@M("If set, get contigs and lengths directly from reads instead of from sequence dictionary.")
noSequenceDictionary: Boolean = false
)

@@ -0,0 +1,9 @@
package org.hammerlab.genomics.readsets.args

import caseapp.{ HelpMessage M }
import org.hammerlab.genomics.readsets.args.path.Prefix

case class PathPrefixArg(
@M("When set, relative paths will be prefixed with this path")
dir: Option[Prefix] = None
)
@@ -0,0 +1,11 @@
package org.hammerlab.genomics.readsets.args

import caseapp.{ HelpMessage M, Recurse R }
import org.hammerlab.genomics.readsets.args.path.UnprefixedPath

case class PathsArgs(
@R prefix: PathPrefixArg,

@M("Paths to sets of reads: FILE1 FILE2 FILE3")
unprefixedPaths: Array[UnprefixedPath] = Array()
)
@@ -0,0 +1,12 @@
package org.hammerlab.genomics.readsets.args

import caseapp.{ HelpMessage M }
import org.hammerlab.genomics.readsets.args.path.UnprefixedPath

case class ReferenceArgs(
@M("Path to a reference FASTA file")
referencePath: UnprefixedPath,

@M("Treat the reference fasta as a \"partial FASTA\", comprised of segments (possibly in the interior) of contigs.")
partialReference: Boolean = false
)
@@ -0,0 +1,31 @@
package org.hammerlab.genomics.readsets.args

import caseapp.{ HelpMessage M, Recurse R }
import hammerlab.path._
import org.hammerlab.genomics.readsets.args.Base
import org.hammerlab.genomics.readsets.args.path.UnprefixedPath
import org.hammerlab.genomics.readsets.io.ReadFilterArgs
import org.hammerlab.genomics.readsets.io.Sample.Name

/** Argument for accepting a single set of reads (for e.g. germline variant calling). */
case class SingleSampleArgs(
@R readFilterArgs: ReadFilterArgs = ReadFilterArgs(),
@R noSequenceDictionaryArgs: NoSequenceDictionaryArgs = NoSequenceDictionaryArgs(),
@M("Path to aligned reads")
reads: Path
) extends Base {
override def paths: Array[Path] = Array( reads )
override def sampleNames: Array[Name] = Array("reads")
}

/** [[SingleSampleArgs]] implementation that supports path-prefixing. */
case class PrefixedSingleSampleArgs(
@R readFilterArgs: ReadFilterArgs,
@R noSequenceDictionaryArgs: NoSequenceDictionaryArgs,
@R prefix: PathPrefixArg,
@M("Path to aligned reads")
reads: UnprefixedPath
) extends Base {
override def paths: Array[Path] = Array( reads.buildPath(prefix.dir) )
override def sampleNames: Array[Name] = Array("reads")
}

0 comments on commit 66daaec

Please sign in to comment.