Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CANNOLI-98] Adding container builder. #107

Merged
merged 2 commits into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 44 additions & 44 deletions cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,90 +18,90 @@
package org.bdgenomics.cannoli.cli

import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.SparkContext
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.variant.{
VariantContextRDD,
VCFInFormatter,
VCFOutFormatter
}
import org.bdgenomics.adam.sql.{ VariantContext => VariantContextProduct }
import org.bdgenomics.cannoli.builder.CommandBuilders
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
import scala.collection.JavaConversions._

/**
* Bcftools function arguments.
*/
class BcftoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-bcftools_path", usage = "Path to the BCFtools executable. Defaults to bcftools.")
var bcftoolsPath: String = "bcftools"
@Args4jOption(required = false, name = "-executable", usage = "Path to the BCFtools executable. Defaults to bcftools.")
var executable: String = "bcftools"

@Args4jOption(required = true, name = "-bcftools_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.")
var referencePath: String = null
@Args4jOption(required = false, name = "-image", usage = "Container image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.")
var image: String = "quay.io/biocontainers/bcftools:1.6--0"

@Args4jOption(required = false, name = "-sudo", usage = "Run via sudo.")
var sudo: Boolean = false

@Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.")
var dockerImage: String = "quay.io/biocontainers/bcftools:1.6--0"
@Args4jOption(required = false, name = "-add_files", usage = "If true, use the SparkFiles mechanism to distribute files to executors.")
var addFiles: Boolean = false

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools. If false, uses the BCFtools executable path.")
@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools.")
var useDocker: Boolean = false

@Args4jOption(required = false, name = "-use_singularity", usage = "If true, uses Singularity to launch BCFtools.")
var useSingularity: Boolean = false

@Args4jOption(required = true, name = "-reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.")
var referencePath: String = null
}

/**
* Bcftools wrapper as a function VariantContextRDD → VariantContextRDD,
* for use in cannoli-shell or notebooks.
*
* @param args Bcftools function arguments.
* @param files Files to make locally available to the commands being run.
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process.
* @param sc Spark context.
*/
class BcftoolsFn(
val args: BcftoolsFnArgs,
val files: Seq[String],
val environment: Map[String, String],
val sc: SparkContext) extends Function1[VariantContextRDD, VariantContextRDD] with Logging {

/**
* @param args Bcftools function arguments.
* @param sc Spark context.
*/
def this(args: BcftoolsFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc)

/**
* @param args Bcftools function arguments.
* @param files Files to make locally available to the commands being run.
* @param sc Spark context.
*/
def this(args: BcftoolsFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc)
sc: SparkContext) extends CannoliFn[VariantContextRDD, VariantContextRDD](sc) with Logging {

override def apply(variantContexts: VariantContextRDD): VariantContextRDD = {

val bcftoolsCommand = if (args.useDocker) {
Seq("docker",
"run",
"--interactive",
"--rm",
args.dockerImage,
"bcftools",
"norm",
"--fasta-ref",
args.referencePath)
} else {
Seq(args.bcftoolsPath,
"norm",
"--fasta-ref",
args.referencePath)
val builder = CommandBuilders.create(args.useDocker, args.useSingularity)
.setExecutable(args.executable)
.add("norm")
.add("--fasta-ref")
.add(if (args.addFiles) "$0" else absolute(args.referencePath))

if (args.addFiles) {
builder.addFile(args.referencePath)
builder.addFile(args.referencePath + ".fai")
}

if (args.useDocker || args.useSingularity) {
builder
.setImage(args.image)
.setSudo(args.sudo)
.addMount(if (args.addFiles) "$root" else root(args.referencePath))
}

log.info("Piping {} to bcftools with command: {} files: {} environment: {}",
Array(variantContexts, bcftoolsCommand, files, environment))
log.info("Piping {} to bcftools with command: {} files: {}",
variantContexts, builder.build(), builder.getFiles())

implicit val tFormatter = VCFInFormatter
implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration)

variantContexts.pipe(bcftoolsCommand, files, environment)
variantContexts.pipe[VariantContext, VariantContextProduct, VariantContextRDD, VCFInFormatter](
cmd = builder.build(),
files = builder.getFiles()
)
}
}

Expand Down
122 changes: 71 additions & 51 deletions cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
*/
package org.bdgenomics.cannoli.cli

import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.SparkContext
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.feature.{
FeatureRDD,
BEDInFormatter,
BEDOutFormatter
}
import org.bdgenomics.adam.sql.{ Feature => FeatureProduct }
import org.bdgenomics.cannoli.builder.CommandBuilders
import org.bdgenomics.formats.avro.Feature;
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
import scala.collection.JavaConversions._

/**
* Bedtools function arguments.
Expand All @@ -42,14 +48,23 @@ class BedtoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-sorted", usage = "Bedtools intersect -sorted option. Inputs must be sorted by chromosome and then by start position.")
var sorted: Boolean = false

@Args4jOption(required = false, name = "-bedtools_path", usage = "Path to the Bedtools executable. Defaults to bedtools.")
var bedtoolsPath: String = "bedtools"
@Args4jOption(required = false, name = "-executable", usage = "Path to the Bedtools executable. Defaults to bedtools.")
var executable: String = "bedtools"

@Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bedtools:2.27.1--0.")
var dockerImage: String = "quay.io/biocontainers/bedtools:2.27.1--0"
@Args4jOption(required = false, name = "-image", usage = "Container image to use. Defaults to quay.io/biocontainers/bedtools:2.27.1--0.")
var image: String = "quay.io/biocontainers/bedtools:2.27.1--0"

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools. If false, uses the Bedtools executable path.")
@Args4jOption(required = false, name = "-sudo", usage = "Run via sudo.")
var sudo: Boolean = false

@Args4jOption(required = false, name = "-add_files", usage = "If true, use the SparkFiles mechanism to distribute files to executors.")
var addFiles: Boolean = false

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools.")
var useDocker: Boolean = false

@Args4jOption(required = false, name = "-use_singularity", usage = "If true, uses Singularity to launch Bedtools.")
var useSingularity: Boolean = false
}

/**
Expand All @@ -61,66 +76,52 @@ class BedtoolsFnArgs extends Args4jBase {
* args.b = "foo.bed"
* args.useDocker = true
* val features = ...
* val pipedFeatures = new BedtoolsFn(args).apply(features)
* val pipedFeatures = new BedtoolsFn(args, sc).apply(features)
* </code>
*
* @param args Bedtools function arguments.
* @param files Files to make locally available to the commands being run.
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process.
* @param sc Spark context.
*/
class BedtoolsFn(
val args: BedtoolsFnArgs,
val files: Seq[String],
val environment: Map[String, String]) extends Function1[FeatureRDD, FeatureRDD] with Logging {

/**
* @param args Bedtools function arguments.
*/
def this(args: BedtoolsFnArgs) = this(args, Seq.empty, Map.empty)

/**
* @param args Bedtools function arguments.
* @param files Files to make locally available to the commands being run.
*/
def this(args: BedtoolsFnArgs, files: Seq[String]) = this(args, files, Map.empty)
sc: SparkContext) extends CannoliFn[FeatureRDD, FeatureRDD](sc) with Logging {

override def apply(features: FeatureRDD): FeatureRDD = {
val optA = Option(args.a)
val optB = Option(args.b)
require(optA.size + optB.size == 1,
"Strictly one of {-a,-b} should be left unspecified to accept piped input.")

val bedtoolsCommand = if (args.useDocker) {
Seq("docker",
"run",
"--rm",
args.dockerImage,
"bedtools",
"intersect",
"-a",
optA.getOrElse("stdin"),
"-b",
optB.getOrElse("stdin"),
if (args.sorted) "-sorted" else ""
)
} else {
Seq(args.bedtoolsPath,
"intersect",
"-a",
optA.getOrElse("stdin"),
"-b",
optB.getOrElse("stdin"),
if (args.sorted) "-sorted" else ""
)
val file = List(optA, optB).flatten.get(0)

var builder = CommandBuilders.create(args.useDocker, args.useSingularity)
.setExecutable(args.executable)
.add("intersect")
.add("-a")
.add(optA.fold("stdin")(f => if (args.addFiles) "$0" else absolute(f)))
.add("-b")
.add(optB.fold("stdin")(f => if (args.addFiles) "$0" else absolute(f)))

if (args.sorted) builder.add("-sorted")
if (args.addFiles) builder.addFile(file)

if (args.useDocker || args.useSingularity) {
builder
.setImage(args.image)
.setSudo(args.sudo)
.addMount(if (args.addFiles) "$root" else root(file))
}

log.info("Piping {} to bedtools with command: {} files: {} environment: {}",
Array(features, bedtoolsCommand, files, environment))
log.info("Piping {} to bedtools with command: {} files: {}",
features, builder.build(), builder.getFiles())

implicit val tFormatter = BEDInFormatter
implicit val uFormatter = new BEDOutFormatter
features.pipe(bedtoolsCommand, files, environment)

features.pipe[Feature, FeatureProduct, FeatureRDD, BEDInFormatter](
cmd = builder.build(),
files = builder.getFiles()
)
}
}

Expand All @@ -137,12 +138,18 @@ object Bedtools extends BDGCommandCompanion {
* Bedtools command line arguments.
*/
class BedtoolsArgs extends BedtoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0)
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe features from (e.g., .bed, .gff/.gtf, .gff3, .interval_list, .narrowPeak). If extension is not detected, Parquet is assumed.", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1)
@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe features to. If extension is not detected, Parquet is assumed.", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-limit_projection", usage = "If input is Parquet, limit to BED format-only fields by projection.")
var limitProjection: Boolean = false

@Args4jOption(required = false, name = "-partitions", usage = "Number of partitions to use when loading a text file.")
var partitions: Int = _

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.")
var asSingleFile: Boolean = false

Expand All @@ -163,8 +170,21 @@ class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[Bedtool
val companion = Bedtools

override def run(sc: SparkContext) {
val features = sc.loadFeatures(args.inputPath)
val pipedFeatures = new BedtoolsFn(args).apply(features)
val projection = Projection(
FeatureField.contigName,
FeatureField.start,
FeatureField.end,
FeatureField.name,
FeatureField.score,
FeatureField.strand
)

val features = sc.loadFeatures(
args.inputPath,
optMinPartitions = Option(args.partitions),
optProjection = if (args.limitProjection) Some(projection) else None
)
val pipedFeatures = new BedtoolsFn(args, sc).apply(features)

pipedFeatures.save(args.outputPath,
asSingleFile = args.asSingleFile,
Expand Down
Loading