Skip to content

Commit

Permalink
Merge ec4806f into b84dd11
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Nov 7, 2017
2 parents b84dd11 + ec4806f commit a27e59b
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 1 deletion.
Expand Up @@ -48,7 +48,9 @@ object ADAMMain {
Fasta2ADAM,
ADAM2Fasta,
ADAM2Fastq,
TransformFragments
Adam2Vcf,
TransformFragments,
Vcf2Adam
)
),
CommandGroup(
Expand Down
125 changes: 125 additions & 0 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Adam2Vcf.scala
@@ -0,0 +1,125 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicRDD }
import org.bdgenomics.adam.rdd.variant.GenotypeRDD
import org.bdgenomics.formats.avro.{ Genotype, Variant }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }

object Adam2Vcf extends BDGCommandCompanion {
val commandName = "adam2vcf"
val commandDescription = "Convert variants and genotypes in ADAM format to VCF"

def apply(cmdLine: Array[String]) = {
new Adam2Vcf(Args4j[Adam2VcfArgs](cmdLine))
}
}

class Adam2VcfArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "VARIANTS", usage = "The variants file to convert (e.g., .vcf, .vcf.gz, .vcf.bgzf, .vcf.bgz). If extension is not detected, Parquet is assumed.", index = 0)
var variantsInputPath: String = null

@Argument(required = true, metaVar = "GENOTYPES", usage = "The genotypes file to convert (e.g., .vcf, .vcf.gz, .vcf.bgzf, .vcf.bgz). If extension is not detected, Parquet is assumed.", index = 1)
var genotypesInputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write VCF.", index = 2)
var outputPath: String = null

@Args4jOption(required = false, name = "-sort_on_save", usage = "Sort VCF output by contig index.")
var sort: Boolean = false

@Args4jOption(required = false, name = "-sort_lexicographically_on_save", usage = "Sort VCF output by lexicographic order. Conflicts with -sort_on_save.")
var sortLexicographically: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Save as a single VCF file.")
var asSingleFile: Boolean = false

@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.")
var deferMerging: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.")
var stringency: String = "STRICT"

// must be defined due to ADAMSaveAnyArgs, but unused here
var sortFastqOutput: Boolean = false
}

/**
* Convert variants and genotypes in ADAM format to VCF.
*/
class Adam2Vcf(val args: Adam2VcfArgs)
extends BDGSparkCommand[Adam2VcfArgs] {
val companion = Adam2Vcf
val stringency = ValidationStringency.valueOf(args.stringency)

/**
* Sort the specified GenomicRDD if requested.
*
* @param rdd GenomicRDD to sort.
* @return The specified GenomicRDD sorted if requested.
*/
private def maybeSort[U <: GenomicRDD[_, U]](rdd: U): U = {
if (args.sort) {
log.info("Sorting before saving")
rdd.sort()
} else if (args.sortLexicographically) {
log.info("Sorting lexicographically before saving")
rdd.sortLexicographically()
} else {
rdd
}
}

def run(sc: SparkContext) {
require(!(args.sort && args.sortLexicographically),
"Cannot set both -sort_on_save and -sort_lexicographically_on_save.")

val variants = sc.loadVariants(
args.variantsInputPath,
optPredicate = None,
optProjection = None,
stringency = stringency)

val genotypes = sc.loadGenotypes(
args.genotypesInputPath,
optPredicate = None,
optProjection = None,
stringency = stringency)

val join = variants.shuffleRegionJoin(genotypes)
val updatedGenotypes = join.rdd.map(pair => {
val v = Variant.newBuilder(pair._2.getVariant)
.setAnnotation(pair._1.getAnnotation)
.build()

Genotype.newBuilder(pair._2)
.setVariant(v)
.build()
})
val updatedGenotypeRdd = GenotypeRDD.apply(updatedGenotypes, genotypes.sequences, genotypes.samples, genotypes.headerLines)
maybeSort(updatedGenotypeRdd.toVariantContexts).saveAsVcf(args)
}
}
126 changes: 126 additions & 0 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Vcf2Adam.scala
@@ -0,0 +1,126 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicRDD }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }

object Vcf2Adam extends BDGCommandCompanion {
val commandName = "vcf2adam"
val commandDescription = "Transform VCF into genotypes and variants in ADAM format"

def apply(cmdLine: Array[String]) = {
new Vcf2Adam(Args4j[Vcf2AdamArgs](cmdLine))
}
}

class Vcf2AdamArgs extends Args4jBase with ParquetSaveArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The VCF file to transform (e.g., .vcf, .vcf.gz, .vcf.bgzf, .vcf.bgz).", index = 0)
var inputPath: String = null

// set to either variantsOutputPath or genotypesOutputPath as necessary
var outputPath: String = null

@Argument(required = true, metaVar = "VARIANTS", usage = "Location to write ADAM variants data in Parquet format.", index = 1)
var variantsOutputPath: String = null

@Argument(required = true, metaVar = "GENOTYPES", usage = "Location to write ADAM genotypes data in Parquet format.", index = 2)
var genotypesOutputPath: String = null

@Args4jOption(required = false, name = "-coalesce", usage = "Number of partitions written to the ADAM output directories.")
var coalesce: Int = -1

@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDDs have fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false

@Args4jOption(required = false, name = "-sort_on_save", usage = "Sort by contig index.")
var sort: Boolean = false

@Args4jOption(required = false, name = "-sort_lexicographically_on_save", usage = "Sort by lexicographic order. Conflicts with -sort_on_save.")
var sortLexicographically: Boolean = false

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.")
var stringency: String = "STRICT"
}

/**
* Transform VCF into genotypes and variants in ADAM format.
*/
class Vcf2Adam(val args: Vcf2AdamArgs)
extends BDGSparkCommand[Vcf2AdamArgs] {
val companion = Vcf2Adam
val stringency = ValidationStringency.valueOf(args.stringency)

/**
* Coalesce the specified GenomicRDD if requested.
*
* @param rdd GenomicRDD to coalesce.
* @return The specified GenomicRDD coalesced if requested.
*/
private def maybeCoalesce[U <: GenomicRDD[_, U]](rdd: U): U = {
if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
if (args.coalesce > rdd.rdd.partitions.length || args.forceShuffle) {
rdd.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
rdd.transform(_.coalesce(args.coalesce, shuffle = false))
}
} else {
rdd
}
}

/**
* Sort the specified GenomicRDD if requested.
*
* @param rdd GenomicRDD to sort.
* @return The specified GenomicRDD sorted if requested.
*/
private def maybeSort[U <: GenomicRDD[_, U]](rdd: U): U = {
if (args.sort) {
log.info("Sorting before saving")
rdd.sort()
} else if (args.sortLexicographically) {
log.info("Sorting lexicographically before saving")
rdd.sortLexicographically()
} else {
rdd
}
}

def run(sc: SparkContext) {
require(!(args.sort && args.sortLexicographically),
"Cannot set both -sort_on_save and -sort_lexicographically_on_save.")

val variantContexts = sc.loadVcf(
args.inputPath,
stringency = stringency)

// todo: cache variantContexts? sort variantContexts first?

args.outputPath = args.variantsOutputPath
maybeSort(maybeCoalesce(variantContexts.toVariants())).saveAsParquet(args)

args.outputPath = args.genotypesOutputPath
maybeSort(maybeCoalesce(variantContexts.toGenotypes())).saveAsParquet(args)
}
}
@@ -0,0 +1,36 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite

class Adam2VcfSuite extends ADAMFunSuite {
sparkTest("convert variants and genotypes in ADAM format to VCF") {
val inputPath = copyResource("small.vcf")
val variantsPath = tmpFile("variants.adam")
val genotypesPath = tmpFile("genotypes.adam")
val outputPath = tmpFile("out.vcf")
TransformVariants(Array(inputPath, variantsPath)).run(sc)
TransformGenotypes(Array(inputPath, genotypesPath)).run(sc)
Adam2Vcf(Array("-single", variantsPath, genotypesPath, outputPath)).run(sc)

val variantContexts = sc.loadVcf(outputPath)
assert(5 === variantContexts.rdd.count())
}
}
@@ -0,0 +1,36 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite

class Vcf2AdamSuite extends ADAMFunSuite {
sparkTest("convert VCF to variants and genotypes in ADAM format") {
val inputPath = copyResource("small.vcf")
val variantsPath = tmpFile("variants.adam")
val genotypesPath = tmpFile("genotypes.adam")
Vcf2Adam(Array(inputPath, variantsPath, genotypesPath)).run(sc)

val variants = sc.loadVariants(variantsPath)
assert(5 === variants.rdd.count())

val genotypes = sc.loadGenotypes(genotypesPath)
assert(15 === genotypes.rdd.count())
}
}

0 comments on commit a27e59b

Please sign in to comment.