From e4efc04de62eed3b4b48cafb1357680322742683 Mon Sep 17 00:00:00 2001 From: Michael L Heuer Date: Sat, 27 Jul 2019 06:04:21 -0500 Subject: [PATCH] Add partition cli args to TransformVariants,Features. --- .../adam/cli/TransformFeatures.scala | 30 +++++++++++++++++-- .../adam/cli/TransformGenotypes.scala | 4 +-- .../adam/cli/TransformVariants.scala | 15 ++++++++-- 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala index b9c5946703..dc3baabe32 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala @@ -20,6 +20,7 @@ package org.bdgenomics.adam.cli import org.apache.spark.SparkContext import org.bdgenomics.adam.cli.FileSystemUtils._ import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.util.FileExtensions._ import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption } @@ -56,6 +57,12 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs { @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") var disableFastConcat: Boolean = false + + @Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.") + var partitionByStartPos: Boolean = false + + @Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).") + var partitionedBinSize = 1000000 } class TransformFeatures(val args: TransformFeaturesArgs) @@ -63,15 +70,32 @@ class TransformFeatures(val args: TransformFeaturesArgs) val companion = TransformFeatures + def isFeatureExt(pathName: String): Boolean = { + isBedExt(pathName) || + isGff3Ext(pathName) || + isGtfExt(pathName) || + isIntervalListExt(pathName) || + isNarrowPeakExt(pathName) + } + def run(sc: SparkContext) { checkWriteablePath(args.outputPath, sc.hadoopConfiguration) val optSequenceDictionary = Option(args.referencePath).map(sc.loadSequenceDictionary(_)) - sc.loadFeatures( + val features = sc.loadFeatures( args.featuresPath, optSequenceDictionary = optSequenceDictionary, - optMinPartitions = Option(args.numPartitions) - ).save(args.outputPath, args.single, args.disableFastConcat) + optMinPartitions = Option(args.numPartitions)) + + if (isFeatureExt(args.outputPath)) { + features.save(args.outputPath, args.single, args.disableFastConcat) + } else { + if (args.partitionByStartPos) { + features.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize) + } else { + features.saveAsParquet(args.outputPath) + } + } } } diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala index dbd4e7646a..0f302eb9b7 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala @@ -25,6 +25,7 @@ import org.bdgenomics.adam.converters.VariantContextConverter import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset } import org.bdgenomics.adam.rdd.variant.GenotypeDataset +import org.bdgenomics.adam.util.FileExtensions._ import org.bdgenomics.formats.avro.Genotype import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption } @@ -144,7 +145,7 @@ class TransformGenotypes(val args: TransformGenotypesArgs) optProjection = None, stringency = stringency) - if (args.outputPath.endsWith(".vcf")) { + if (isVcfExt(args.outputPath)) { maybeSort(maybeCoalesce(genotypes).toVariantContexts).saveAsVcf(args) } else { if (args.partitionByStartPos) { @@ -152,7 +153,6 @@ class TransformGenotypes(val args: TransformGenotypesArgs) } else { maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args) } - } } } diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformVariants.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformVariants.scala index fae12a89b9..eee2624307 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformVariants.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformVariants.scala @@ -24,6 +24,7 @@ import org.bdgenomics.adam.cli.FileSystemUtils._ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset } import org.bdgenomics.adam.rdd.variant.VariantDataset +import org.bdgenomics.adam.util.FileExtensions._ import org.bdgenomics.formats.avro.Variant import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption } @@ -65,6 +66,12 @@ class TransformVariantsArgs extends Args4jBase with ADAMSaveAnyArgs with Parquet @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") var disableFastConcat: Boolean = false + @Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.") + var partitionByStartPos: Boolean = false + + @Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).") + var partitionedBinSize = 1000000 + @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") var stringency: String = "STRICT" @@ -129,10 +136,14 @@ class TransformVariants(val args: TransformVariantsArgs) optProjection = None, stringency = stringency) - if (args.outputPath.endsWith(".vcf")) { + if (isVcfExt(args.outputPath)) { maybeSort(maybeCoalesce(variants).toVariantContexts).saveAsVcf(args, stringency) } else { - maybeSort(maybeCoalesce(variants)).saveAsParquet(args) + if (args.partitionByStartPos) { + maybeSort(maybeCoalesce(variants)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize) + } else { + maybeSort(maybeCoalesce(variants)).saveAsParquet(args) + } } } }