Skip to content

Commit

Permalink
ReadSets-args refactoring
Browse files Browse the repository at this point in the history
Share sample-loading, filename/sample infra across all commands
  • Loading branch information
ryan-williams committed Aug 8, 2016
1 parent d160455 commit dfd797b
Show file tree
Hide file tree
Showing 22 changed files with 272 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.hammerlab.guacamole.logging.DelayedMessages
import org.hammerlab.guacamole.logging.LoggingUtils.progress
import org.hammerlab.guacamole.pileup.Pileup
import org.hammerlab.guacamole.reads.MappedRead
import org.hammerlab.guacamole.readsets.ReadSets
import org.hammerlab.guacamole.readsets.{ReadSets, SampleName}
import org.hammerlab.guacamole.readsets.args.GermlineCallerArgs
import org.hammerlab.guacamole.readsets.io.InputFilters
import org.hammerlab.guacamole.reference.ReferenceBroadcast
Expand Down Expand Up @@ -74,17 +74,20 @@ object GermlineAssemblyCaller {
.getPartitioner(mappedReads, args.assemblyWindowRange)
.partition(loci.result(contigLengths))

val genotypes: RDD[CalledAllele] = discoverGermlineVariants(
qualityReads,
kmerSize = args.kmerSize,
assemblyWindowRange = args.assemblyWindowRange,
minOccurrence = args.minOccurrence,
minAreaVaf = args.minAreaVaf / 100.0f,
reference = reference,
lociPartitions = lociPartitions,
minMeanKmerQuality = args.minMeanKmerQuality,
minPhredScaledLikelihood = args.minLikelihood,
shortcutAssembly = args.shortcutAssembly)
val genotypes: RDD[CalledAllele] =
discoverGermlineVariants(
qualityReads,
args.sampleName,
kmerSize = args.kmerSize,
assemblyWindowRange = args.assemblyWindowRange,
minOccurrence = args.minOccurrence,
minAreaVaf = args.minAreaVaf / 100.0f,
reference = reference,
lociPartitions = lociPartitions,
minMeanKmerQuality = args.minMeanKmerQuality,
minPhredScaledLikelihood = args.minLikelihood,
shortcutAssembly = args.shortcutAssembly
)

genotypes.persist()

Expand All @@ -98,6 +101,7 @@ object GermlineAssemblyCaller {
}

def discoverGermlineVariants(reads: RDD[MappedRead],
sampleName: SampleName,
kmerSize: Int,
assemblyWindowRange: Int,
minOccurrence: Int,
Expand Down Expand Up @@ -164,8 +168,6 @@ object GermlineAssemblyCaller {
)

if (paths.nonEmpty) {
val sampleName = currentLocusReads.head.sampleName

def buildVariant(variantLocus: Int,
referenceBases: Array[Byte],
alternateBases: Array[Byte]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package org.hammerlab.guacamole.commands
import htsjdk.samtools.SAMSequenceDictionary
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.hammerlab.guacamole.distributed.PileupFlatMapUtils.pileupFlatMapMultipleRDDs
import org.hammerlab.guacamole.jointcaller.evidence.{MultiSampleMultiAlleleEvidence, MultiSampleSingleAlleleEvidence}
import org.hammerlab.guacamole.jointcaller.{Input, InputCollection, Parameters, VCFOutput}
import org.hammerlab.guacamole.distributed.PileupFlatMapUtils.pileupFlatMapMultipleRDDs
import org.hammerlab.guacamole.loci.LociArgs
import org.hammerlab.guacamole.loci.partitioning.LociPartitionerArgs
import org.hammerlab.guacamole.loci.set.{LociParser, LociSet}
import org.hammerlab.guacamole.loci.set.LociSet
import org.hammerlab.guacamole.logging.LoggingUtils.progress
import org.hammerlab.guacamole.pileup.Pileup
import org.hammerlab.guacamole.readsets.args.NoSequenceDictionaryArgs
import org.hammerlab.guacamole.readsets.io.InputFilters
import org.hammerlab.guacamole.readsets.{PerSample, ReadSets}
import org.hammerlab.guacamole.reference.ReferenceBroadcast
import org.kohsuke.args4j.spi.StringArrayOptionHandler
Expand All @@ -21,8 +18,6 @@ import org.kohsuke.args4j.{Option => Args4jOption}
object SomaticJoint {
class Arguments
extends Parameters.CommandlineArguments
with LociPartitionerArgs
with NoSequenceDictionaryArgs
with InputCollection.Arguments {

@Args4jOption(name = "--out", usage = "Output path for all variants in VCF. Default: no output")
Expand Down Expand Up @@ -58,48 +53,27 @@ object SomaticJoint {
var headerMetadata: Array[String] = Array.empty
}

/**
* Load ReadSet instances from user-specified BAMs (specified as an InputCollection).
*/
def inputsToReadSets(sc: SparkContext,
inputs: InputCollection,
loci: LociParser,
contigLengthsFromDictionary: Boolean = true): ReadSets = {
ReadSets(
sc,
inputs.items.map(_.path),
InputFilters(overlapsLoci = loci),
contigLengthsFromDictionary = contigLengthsFromDictionary
)
}

object Caller extends SparkCommand[Arguments] {
override val name = "somatic-joint"
override val description = "call germline and somatic variants based on any number of samples from the same patient"

override def run(args: Arguments, sc: SparkContext): Unit = {
val inputs = InputCollection(args)

val (readsets, loci) = ReadSets(sc, args)

if (!args.quiet) {
println("Running on %d inputs:".format(inputs.items.length))
inputs.items.foreach(input => println(input))
println(s"Running on ${inputs.items.length} inputs:")
inputs.items.foreach(println)
}

val reference = ReferenceBroadcast(args.referenceFastaPath, sc, partialFasta = args.referenceFastaIsPartial)

val loci = args.parseLoci(sc.hadoopConfiguration)

val readsets = inputsToReadSets(sc, inputs, loci, !args.noSequenceDictionary)

val contigLengths = readsets.contigLengths

val forceCallLoci =
LociArgs.parseLoci(
args.forceCallLoci,
args.forceCallLociFile,
sc.hadoopConfiguration,
fallback = ""
).result(contigLengths)
).result(readsets.contigLengths)

if (forceCallLoci.nonEmpty) {
progress(
Expand All @@ -113,13 +87,15 @@ object SomaticJoint {

val parameters = Parameters(args)

val reference = ReferenceBroadcast(args.referenceFastaPath, sc, partialFasta = args.referenceFastaIsPartial)

val calls = makeCalls(
sc,
inputs,
readsets,
parameters,
reference,
loci.result(contigLengths),
loci,
forceCallLoci,
args.onlySomatic,
args.includeFiltered,
Expand Down Expand Up @@ -182,7 +158,7 @@ object SomaticJoint {
forceCallLoci: LociSet = LociSet(),
onlySomatic: Boolean = false,
includeFiltered: Boolean = false,
args: LociPartitionerArgs = new LociPartitionerArgs {}): RDD[MultiSampleMultiAlleleEvidence] = {
args: Arguments = new Arguments {}): RDD[MultiSampleMultiAlleleEvidence] = {

assume(loci.nonEmpty)

Expand Down Expand Up @@ -214,7 +190,7 @@ object SomaticJoint {
}

pileupFlatMapMultipleRDDs(
readsets.mappedReads,
readsets.mappedReadsRDDs,
lociPartitions,
skipEmpty = true, // TODO: shouldn't skip empty positions if we might force call them. Need an efficient way to handle this.
callPileups,
Expand Down
82 changes: 43 additions & 39 deletions src/main/scala/org/hammerlab/guacamole/commands/VAFHistogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.hammerlab.guacamole.distributed.PileupFlatMapUtils.pileupFlatMap
import org.hammerlab.guacamole.loci.partitioning.{LociPartitionerArgs, LociPartitioning}
import org.hammerlab.guacamole.loci.partitioning.LociPartitioning
import org.hammerlab.guacamole.logging.LoggingUtils.progress
import org.hammerlab.guacamole.pileup.Pileup
import org.hammerlab.guacamole.reads.MappedRead
import org.hammerlab.guacamole.readsets.io.{InputFilters, ReadLoadingConfig, ReadLoadingConfigArgs}
import org.hammerlab.guacamole.readsets.ReadSets
import org.hammerlab.guacamole.reference.{ContigName, Locus, ReferenceBroadcast, ReferenceGenome}
import org.kohsuke.args4j.{Argument, Option => Args4jOption}
import org.hammerlab.guacamole.readsets.args.{Arguments => ReadSetsArguments}
import org.hammerlab.guacamole.readsets.io.{Input, InputFilters, ReadLoadingConfig}
import org.hammerlab.guacamole.readsets.{ReadSets, SampleName}
import org.hammerlab.guacamole.reference.{ContigName, Locus, NumLoci, ReferenceBroadcast, ReferenceGenome}
import org.kohsuke.args4j.{Option => Args4jOption}

/**
* VariantLocus is a locus and the variant allele frequency at that locus
Expand Down Expand Up @@ -53,7 +54,7 @@ object VariantLocus {

object VAFHistogram {

protected class Arguments extends LociPartitionerArgs with ReadLoadingConfigArgs {
protected class Arguments extends ReadSetsArguments {

@Args4jOption(name = "--out", required = false, forbids = Array("--local-out"),
usage = "HDFS file path to save the variant allele frequency histogram")
Expand Down Expand Up @@ -90,11 +91,6 @@ object VAFHistogram {

@Args4jOption(name = "--reference-fasta", required = true, usage = "Local path to a reference FASTA file")
var referenceFastaPath: String = ""

@Argument(required = true, multiValued = true,
usage = "BAMs")
var bams: Array[String] = Array.empty

}

object Caller extends SparkCommand[Arguments] {
Expand All @@ -115,13 +111,15 @@ object VAFHistogram {
val readsets =
ReadSets(
sc,
args.bams,
args.inputs,
filters,
contigLengthsFromDictionary = true,
config = ReadLoadingConfig(args)
)

val ReadSets(readsRDDs, _, contigLengths) = readsets
val ReadSets(_, _, contigLengths) = readsets

val mappedReadsRDDs = readsets.mappedReadsRDDs

val lociPartitions =
args
Expand All @@ -134,39 +132,36 @@ object VAFHistogram {
val minVariantAlleleFrequency = args.minVAF

val variantLoci =
readsRDDs.map(reads =>
for {
(mappedReadsRDD, Input(sampleName, _)) <- mappedReadsRDDs.zip(args.inputs)
} yield
variantLociFromReads(
reads.mappedReads,
mappedReadsRDD,
sampleName,
reference,
lociPartitions,
samplePercent,
minReadDepth,
minVariantAlleleFrequency,
printStats = args.printStats
)
)

val bins = args.bins
val variantAlleleHistograms =
variantLoci.map(variantLoci => generateVAFHistogram(variantLoci, bins))

val sampleAndFileNames = args.bams.zip(readsRDDs.map(_.mappedReads.take(1)(0).sampleName))
val binSize = 100 / bins

def histogramToString(kv: (Int, Long)): String = {
s"${kv._1}, ${math.min(kv._1 + binSize, 100)}, ${kv._2}"
}
def histogramToString(bin: Int, numLoci: NumLoci): String =
s"$bin, ${math.min(bin + binSize, 100)}, $numLoci"

val histogramOutput =
sampleAndFileNames
.zip(variantAlleleHistograms)
.flatMap {
case ((filename, sampleName), histogram) =>
histogram
.toSeq
.sortBy(_._1)
.map(kv => s"$filename, $sampleName, ${histogramToString(kv)}")
}
for {
(Input(sampleName, filename), histogram) <- args.inputs.zip(variantAlleleHistograms)
sortedHist = histogram.toSeq.sortBy(_._1)
(bin, numLoci) <- sortedHist
} yield
s"$filename, $sampleName, ${histogramToString(bin, numLoci)}"

if (args.localOutputPath != "") {
val writer = new BufferedWriter(new FileWriter(args.localOutputPath))
Expand All @@ -183,9 +178,13 @@ object VAFHistogram {
sc.parallelize(histogramOutput).saveAsTextFile(args.output)
} else {
// Print histograms to standard out
variantAlleleHistograms.foreach(histogram =>
histogram.toSeq.sortBy(_._1).foreach(kv => println(histogramToString(kv)))
)
for {
histogram <- variantAlleleHistograms
sortedHist = histogram.toSeq.sortBy(_._1)
(bin, numLoci) <- sortedHist
} {
println(histogramToString(bin, numLoci))
}
}

if (args.cluster) {
Expand All @@ -203,13 +202,14 @@ object VAFHistogram {
* @param bins Number of bins to group the VAFs into
* @return Map of rounded variant allele frequency to number of loci with that value
*/
def generateVAFHistogram(variantAlleleFrequencies: RDD[VariantLocus], bins: Int): Map[Int, Long] = {
def generateVAFHistogram(variantAlleleFrequencies: RDD[VariantLocus], bins: Int): Map[Int, NumLoci] = {
assume(bins <= 100 && bins >= 1, "Bins should be between 1 and 100")

def roundToBin(variantAlleleFrequency: Float) = {
val variantPercent = (variantAlleleFrequency * 100).toInt
variantPercent - (variantPercent % (100 / bins))
}

variantAlleleFrequencies
.map(vaf => roundToBin(vaf.variantAlleleFrequency) -> 1L)
.reduceByKey(_ + _)
Expand All @@ -230,13 +230,14 @@ object VAFHistogram {
* @return RDD of VariantLocus, which contain the locus and non-zero variant allele frequency
*/
def variantLociFromReads(reads: RDD[MappedRead],
sampleName: SampleName,
reference: ReferenceGenome,
lociPartitions: LociPartitioning,
samplePercent: Int = 100,
minReadDepth: Int = 0,
minVariantAlleleFrequency: Int = 0,
printStats: Boolean = false): RDD[VariantLocus] = {
val sampleName = reads.take(1)(0).sampleName

val variantLoci =
pileupFlatMap[VariantLocus](
reads,
Expand Down Expand Up @@ -297,12 +298,15 @@ object VAFHistogram {
numClusters: Int,
maxIterations: Int = 50,
convergenceTol: Double = 1e-2): GaussianMixtureModel = {

val vafVectors = variantAlleleFrequencies.map(vaf => Vectors.dense(vaf.variantAlleleFrequency))
val model = new GaussianMixture()
.setK(numClusters)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)
.run(vafVectors)

val model =
new GaussianMixture()
.setK(numClusters)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)
.run(vafVectors)

for (i <- 0 until model.k) {
println(s"Cluster $i: mean=${model.gaussians(i).mu(0)}, std. deviation=${model.gaussians(i).sigma}, weight=${model.weights(i)}")
Expand Down
Loading

0 comments on commit dfd797b

Please sign in to comment.