Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-2188] Add partition cli args to TransformVariants,Features. #2192

Merged
merged 1 commit into from
Jul 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.adam.cli
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption }

Expand Down Expand Up @@ -56,22 +57,45 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000
}

class TransformFeatures(val args: TransformFeaturesArgs)
extends BDGSparkCommand[TransformFeaturesArgs] {

val companion = TransformFeatures

def isFeatureExt(pathName: String): Boolean = {
isBedExt(pathName) ||
isGff3Ext(pathName) ||
isGtfExt(pathName) ||
isIntervalListExt(pathName) ||
isNarrowPeakExt(pathName)
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

val optSequenceDictionary = Option(args.referencePath).map(sc.loadSequenceDictionary(_))

sc.loadFeatures(
val features = sc.loadFeatures(
args.featuresPath,
optSequenceDictionary = optSequenceDictionary,
optMinPartitions = Option(args.numPartitions)
).save(args.outputPath, args.single, args.disableFastConcat)
optMinPartitions = Option(args.numPartitions))

if (isFeatureExt(args.outputPath)) {
features.save(args.outputPath, args.single, args.disableFastConcat)
} else {
if (args.partitionByStartPos) {
features.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
features.saveAsParquet(args.outputPath)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.adam.rdd.variant.GenotypeDataset
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.formats.avro.Genotype
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption }
Expand Down Expand Up @@ -144,15 +145,14 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
optProjection = None,
stringency = stringency)

if (args.outputPath.endsWith(".vcf")) {
if (isVcfExt(args.outputPath)) {
maybeSort(maybeCoalesce(genotypes).toVariantContexts).saveAsVcf(args)
} else {
if (args.partitionByStartPos) {
maybeSort(maybeCoalesce(genotypes)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args)
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.adam.rdd.variant.VariantDataset
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.formats.avro.Variant
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption }
Expand Down Expand Up @@ -65,6 +66,12 @@ class TransformVariantsArgs extends Args4jBase with ADAMSaveAnyArgs with Parquet
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.")
var stringency: String = "STRICT"

Expand Down Expand Up @@ -129,10 +136,14 @@ class TransformVariants(val args: TransformVariantsArgs)
optProjection = None,
stringency = stringency)

if (args.outputPath.endsWith(".vcf")) {
if (isVcfExt(args.outputPath)) {
maybeSort(maybeCoalesce(variants).toVariantContexts).saveAsVcf(args, stringency)
} else {
maybeSort(maybeCoalesce(variants)).saveAsParquet(args)
if (args.partitionByStartPos) {
maybeSort(maybeCoalesce(variants)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
maybeSort(maybeCoalesce(variants)).saveAsParquet(args)
}
}
}
}