Skip to content

Commit

Permalink
Add partition cli args to TransformVariants,Features.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jul 27, 2019
1 parent 4df2320 commit e4efc04
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.adam.cli
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }

Expand Down Expand Up @@ -56,22 +57,45 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000
}

class TransformFeatures(val args: TransformFeaturesArgs)
extends BDGSparkCommand[TransformFeaturesArgs] {

val companion = TransformFeatures

def isFeatureExt(pathName: String): Boolean = {
isBedExt(pathName) ||
isGff3Ext(pathName) ||
isGtfExt(pathName) ||
isIntervalListExt(pathName) ||
isNarrowPeakExt(pathName)
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

val optSequenceDictionary = Option(args.referencePath).map(sc.loadSequenceDictionary(_))

sc.loadFeatures(
val features = sc.loadFeatures(
args.featuresPath,
optSequenceDictionary = optSequenceDictionary,
optMinPartitions = Option(args.numPartitions)
).save(args.outputPath, args.single, args.disableFastConcat)
optMinPartitions = Option(args.numPartitions))

if (isFeatureExt(args.outputPath)) {
features.save(args.outputPath, args.single, args.disableFastConcat)
} else {
if (args.partitionByStartPos) {
features.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
features.saveAsParquet(args.outputPath)
}
}
}
}
Expand Up @@ -25,6 +25,7 @@ import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.adam.rdd.variant.GenotypeDataset
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.formats.avro.Genotype
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }
Expand Down Expand Up @@ -144,15 +145,14 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
optProjection = None,
stringency = stringency)

if (args.outputPath.endsWith(".vcf")) {
if (isVcfExt(args.outputPath)) {
maybeSort(maybeCoalesce(genotypes).toVariantContexts).saveAsVcf(args)
} else {
if (args.partitionByStartPos) {
maybeSort(maybeCoalesce(genotypes)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args)
}

}
}
}
Expand Up @@ -24,6 +24,7 @@ import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.adam.rdd.variant.VariantDataset
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.formats.avro.Variant
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }
Expand Down Expand Up @@ -65,6 +66,12 @@ class TransformVariantsArgs extends Args4jBase with ADAMSaveAnyArgs with Parquet
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.")
var stringency: String = "STRICT"

Expand Down Expand Up @@ -129,10 +136,14 @@ class TransformVariants(val args: TransformVariantsArgs)
optProjection = None,
stringency = stringency)

if (args.outputPath.endsWith(".vcf")) {
if (isVcfExt(args.outputPath)) {
maybeSort(maybeCoalesce(variants).toVariantContexts).saveAsVcf(args, stringency)
} else {
maybeSort(maybeCoalesce(variants)).saveAsParquet(args)
if (args.partitionByStartPos) {
maybeSort(maybeCoalesce(variants)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
maybeSort(maybeCoalesce(variants)).saveAsParquet(args)
}
}
}
}

0 comments on commit e4efc04

Please sign in to comment.