Skip to content

Commit

Permalink
Merge b8c3c0b into 0ab8543
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Mar 21, 2018
2 parents 0ab8543 + b8c3c0b commit 2a6eac1
Show file tree
Hide file tree
Showing 23 changed files with 2,026 additions and 449 deletions.
88 changes: 44 additions & 44 deletions cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala
Expand Up @@ -18,90 +18,90 @@
package org.bdgenomics.cannoli.cli

import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.SparkContext
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.variant.{
VariantContextRDD,
VCFInFormatter,
VCFOutFormatter
}
import org.bdgenomics.adam.sql.{ VariantContext => VariantContextProduct }
import org.bdgenomics.cannoli.builder.CommandBuilders
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
import scala.collection.JavaConversions._

/**
* Bcftools function arguments.
*/
class BcftoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-bcftools_path", usage = "Path to the BCFtools executable. Defaults to bcftools.")
var bcftoolsPath: String = "bcftools"
@Args4jOption(required = false, name = "-executable", usage = "Path to the BCFtools executable. Defaults to bcftools.")
var executable: String = "bcftools"

@Args4jOption(required = true, name = "-bcftools_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.")
var referencePath: String = null
@Args4jOption(required = false, name = "-image", usage = "Container image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.")
var image: String = "quay.io/biocontainers/bcftools:1.6--0"

@Args4jOption(required = false, name = "-sudo", usage = "Run via sudo.")
var sudo: Boolean = false

@Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.")
var dockerImage: String = "quay.io/biocontainers/bcftools:1.6--0"
@Args4jOption(required = false, name = "-add_files", usage = "If true, use the SparkFiles mechanism to distribute files to executors.")
var addFiles: Boolean = false

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools. If false, uses the BCFtools executable path.")
@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools.")
var useDocker: Boolean = false

@Args4jOption(required = false, name = "-use_singularity", usage = "If true, uses Singularity to launch BCFtools.")
var useSingularity: Boolean = false

@Args4jOption(required = true, name = "-reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.")
var referencePath: String = null
}

/**
* Bcftools wrapper as a function VariantContextRDD → VariantContextRDD,
* for use in cannoli-shell or notebooks.
*
* @param args Bcftools function arguments.
* @param files Files to make locally available to the commands being run.
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process.
* @param sc Spark context.
*/
class BcftoolsFn(
val args: BcftoolsFnArgs,
val files: Seq[String],
val environment: Map[String, String],
val sc: SparkContext) extends Function1[VariantContextRDD, VariantContextRDD] with Logging {

/**
* @param args Bcftools function arguments.
* @param sc Spark context.
*/
def this(args: BcftoolsFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc)

/**
* @param args Bcftools function arguments.
* @param files Files to make locally available to the commands being run.
* @param sc Spark context.
*/
def this(args: BcftoolsFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc)
sc: SparkContext) extends CannoliFn[VariantContextRDD, VariantContextRDD](sc) with Logging {

override def apply(variantContexts: VariantContextRDD): VariantContextRDD = {

val bcftoolsCommand = if (args.useDocker) {
Seq("docker",
"run",
"--interactive",
"--rm",
args.dockerImage,
"bcftools",
"norm",
"--fasta-ref",
args.referencePath)
} else {
Seq(args.bcftoolsPath,
"norm",
"--fasta-ref",
args.referencePath)
val builder = CommandBuilders.create(args.useDocker, args.useSingularity)
.setExecutable(args.executable)
.add("norm")
.add("--fasta-ref")
.add(if (args.addFiles) "$0" else absolute(args.referencePath))

if (args.addFiles) {
builder.addFile(args.referencePath)
builder.addFile(args.referencePath + ".fai")
}

if (args.useDocker || args.useSingularity) {
builder
.setImage(args.image)
.setSudo(args.sudo)
.addMount(if (args.addFiles) "$root" else root(args.referencePath))
}

log.info("Piping {} to bcftools with command: {} files: {} environment: {}",
Array(variantContexts, bcftoolsCommand, files, environment))
log.info("Piping {} to bcftools with command: {} files: {}",
variantContexts, builder.build(), builder.getFiles())

implicit val tFormatter = VCFInFormatter
implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration)

variantContexts.pipe(bcftoolsCommand, files, environment)
variantContexts.pipe[VariantContext, VariantContextProduct, VariantContextRDD, VCFInFormatter](
cmd = builder.build(),
files = builder.getFiles()
)
}
}

Expand Down
122 changes: 71 additions & 51 deletions cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala
Expand Up @@ -17,17 +17,23 @@
*/
package org.bdgenomics.cannoli.cli

import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.SparkContext
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.feature.{
FeatureRDD,
BEDInFormatter,
BEDOutFormatter
}
import org.bdgenomics.adam.sql.{ Feature => FeatureProduct }
import org.bdgenomics.cannoli.builder.CommandBuilders
import org.bdgenomics.formats.avro.Feature;
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
import scala.collection.JavaConversions._

/**
* Bedtools function arguments.
Expand All @@ -42,14 +48,23 @@ class BedtoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-sorted", usage = "Bedtools intersect -sorted option. Inputs must be sorted by chromosome and then by start position.")
var sorted: Boolean = false

@Args4jOption(required = false, name = "-bedtools_path", usage = "Path to the Bedtools executable. Defaults to bedtools.")
var bedtoolsPath: String = "bedtools"
@Args4jOption(required = false, name = "-executable", usage = "Path to the Bedtools executable. Defaults to bedtools.")
var executable: String = "bedtools"

@Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bedtools:2.27.1--0.")
var dockerImage: String = "quay.io/biocontainers/bedtools:2.27.1--0"
@Args4jOption(required = false, name = "-image", usage = "Container image to use. Defaults to quay.io/biocontainers/bedtools:2.27.1--0.")
var image: String = "quay.io/biocontainers/bedtools:2.27.1--0"

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools. If false, uses the Bedtools executable path.")
@Args4jOption(required = false, name = "-sudo", usage = "Run via sudo.")
var sudo: Boolean = false

@Args4jOption(required = false, name = "-add_files", usage = "If true, use the SparkFiles mechanism to distribute files to executors.")
var addFiles: Boolean = false

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools.")
var useDocker: Boolean = false

@Args4jOption(required = false, name = "-use_singularity", usage = "If true, uses Singularity to launch Bedtools.")
var useSingularity: Boolean = false
}

/**
Expand All @@ -61,66 +76,52 @@ class BedtoolsFnArgs extends Args4jBase {
* args.b = "foo.bed"
* args.useDocker = true
* val features = ...
* val pipedFeatures = new BedtoolsFn(args).apply(features)
* val pipedFeatures = new BedtoolsFn(args, sc).apply(features)
* </code>
*
* @param args Bedtools function arguments.
* @param files Files to make locally available to the commands being run.
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process.
* @param sc Spark context.
*/
class BedtoolsFn(
val args: BedtoolsFnArgs,
val files: Seq[String],
val environment: Map[String, String]) extends Function1[FeatureRDD, FeatureRDD] with Logging {

/**
* @param args Bedtools function arguments.
*/
def this(args: BedtoolsFnArgs) = this(args, Seq.empty, Map.empty)

/**
* @param args Bedtools function arguments.
* @param files Files to make locally available to the commands being run.
*/
def this(args: BedtoolsFnArgs, files: Seq[String]) = this(args, files, Map.empty)
sc: SparkContext) extends CannoliFn[FeatureRDD, FeatureRDD](sc) with Logging {

override def apply(features: FeatureRDD): FeatureRDD = {
val optA = Option(args.a)
val optB = Option(args.b)
require(optA.size + optB.size == 1,
"Strictly one of {-a,-b} should be left unspecified to accept piped input.")

val bedtoolsCommand = if (args.useDocker) {
Seq("docker",
"run",
"--rm",
args.dockerImage,
"bedtools",
"intersect",
"-a",
optA.getOrElse("stdin"),
"-b",
optB.getOrElse("stdin"),
if (args.sorted) "-sorted" else ""
)
} else {
Seq(args.bedtoolsPath,
"intersect",
"-a",
optA.getOrElse("stdin"),
"-b",
optB.getOrElse("stdin"),
if (args.sorted) "-sorted" else ""
)
val file = List(optA, optB).flatten.get(0)

var builder = CommandBuilders.create(args.useDocker, args.useSingularity)
.setExecutable(args.executable)
.add("intersect")
.add("-a")
.add(optA.fold("stdin")(f => if (args.addFiles) "$0" else absolute(f)))
.add("-b")
.add(optB.fold("stdin")(f => if (args.addFiles) "$0" else absolute(f)))

if (args.sorted) builder.add("-sorted")
if (args.addFiles) builder.addFile(file)

if (args.useDocker || args.useSingularity) {
builder
.setImage(args.image)
.setSudo(args.sudo)
.addMount(if (args.addFiles) "$root" else root(file))
}

log.info("Piping {} to bedtools with command: {} files: {} environment: {}",
Array(features, bedtoolsCommand, files, environment))
log.info("Piping {} to bedtools with command: {} files: {}",
features, builder.build(), builder.getFiles())

implicit val tFormatter = BEDInFormatter
implicit val uFormatter = new BEDOutFormatter
features.pipe(bedtoolsCommand, files, environment)

features.pipe[Feature, FeatureProduct, FeatureRDD, BEDInFormatter](
cmd = builder.build(),
files = builder.getFiles()
)
}
}

Expand All @@ -137,12 +138,18 @@ object Bedtools extends BDGCommandCompanion {
* Bedtools command line arguments.
*/
class BedtoolsArgs extends BedtoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0)
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe features from (e.g., .bed, .gff/.gtf, .gff3, .interval_list, .narrowPeak). If extension is not detected, Parquet is assumed.", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1)
@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe features to. If extension is not detected, Parquet is assumed.", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-limit_projection", usage = "If input is Parquet, limit to BED format-only fields by projection.")
var limitProjection: Boolean = false

@Args4jOption(required = false, name = "-partitions", usage = "Number of partitions to use when loading a text file.")
var partitions: Int = _

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.")
var asSingleFile: Boolean = false

Expand All @@ -163,8 +170,21 @@ class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[Bedtool
val companion = Bedtools

override def run(sc: SparkContext) {
val features = sc.loadFeatures(args.inputPath)
val pipedFeatures = new BedtoolsFn(args).apply(features)
val projection = Projection(
FeatureField.contigName,
FeatureField.start,
FeatureField.end,
FeatureField.name,
FeatureField.score,
FeatureField.strand
)

val features = sc.loadFeatures(
args.inputPath,
optMinPartitions = Option(args.partitions),
optProjection = if (args.limitProjection) Some(projection) else None
)
val pipedFeatures = new BedtoolsFn(args, sc).apply(features)

pipedFeatures.save(args.outputPath,
asSingleFile = args.asSingleFile,
Expand Down

0 comments on commit 2a6eac1

Please sign in to comment.