Skip to content

Commit

Permalink
Check output path is writeable before running transformations.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Mar 14, 2019
1 parent 4d8dcfc commit 877232f
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 0 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
Expand Down Expand Up @@ -52,6 +53,7 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg
override val companion = ADAM2Fasta

override def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

log.info("Loading ADAM nucleotide contig fragments from disk.")
val contigFragments = sc.loadContigFragments(args.inputPath)
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.adam.cli
import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -60,6 +61,7 @@ class ADAM2Fastq(val args: ADAM2FastqArgs) extends BDGSparkCommand[ADAM2FastqArg
override val companion = ADAM2Fastq

override def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

val projectionOpt =
if (!args.disableProjection)
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand Down Expand Up @@ -47,6 +48,7 @@ class CountContigKmers(protected val args: CountContigKmersArgs) extends BDGSpar
val companion = CountContigKmers

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

// read from disk
val fragments = sc.loadContigFragments(args.inputPath)
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -50,6 +51,7 @@ class CountReadKmers(protected val args: CountReadKmersArgs) extends BDGSparkCom
val companion = CountReadKmers

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

// read from disk
var adamRecords = sc.loadAlignments(
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
Expand Down Expand Up @@ -51,6 +52,8 @@ class Fasta2ADAM(protected val args: Fasta2ADAMArgs) extends BDGSparkCommand[Fas
val companion = Fasta2ADAM

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

log.info("Loading FASTA data from disk.")
val adamFasta = sc.loadFasta(args.fastaFile, maximumLength = args.maximumLength)

Expand Down
@@ -0,0 +1,41 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileAlreadyExistsException
import org.bdgenomics.utils.misc.Logging

/**
* Utility methods for file systems.
*/
private[cli] object FileSystemUtils extends Logging {
private def exists(pathName: String, conf: Configuration): Boolean = {
val p = new Path(pathName)
val fs = p.getFileSystem(conf)
fs.exists(p)
}

// move to BDGSparkCommand in bdg-utils?
def checkWriteablePath(pathName: String, conf: Configuration): Unit = {
if (exists(pathName, conf)) {
throw new FileAlreadyExistsException("Cannot write to path name, %s already exists".format(pathName))
}
}
}
Expand Up @@ -21,6 +21,7 @@ import java.util
import org.apache.avro.generic.{ GenericDatumWriter, IndexedRecord }
import org.apache.avro.io.EncoderFactory
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.util.ParquetFileTraversable
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand Down Expand Up @@ -99,6 +100,7 @@ class PrintADAM(protected val args: PrintADAMArgs) extends BDGSparkCommand[Print

def run(sc: SparkContext) {
val output = Option(args.outputFile)
output.foreach(checkWriteablePath(_, sc.hadoopConfiguration))
args.filesToPrint.foreach(file => {
displayRaw(sc, file, pretty = args.prettyRaw, output = output)
})
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.projections.AlignmentRecordField._
import org.bdgenomics.adam.projections.Projection
import org.bdgenomics.adam.rdd.ADAMContext._
Expand Down Expand Up @@ -66,6 +67,8 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
val companion: BDGCommandCompanion = Reads2Coverage

def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

if (args.sortLexicographically) {
require(args.collapse,
"-sort_lexicographically can only be provided when collapsing (-collapse).")
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus._
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io.FastqRecordReader
import org.bdgenomics.adam.models.{ ReferenceRegion, SnpTable }
Expand Down Expand Up @@ -432,6 +433,8 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

// throw exception if aligned read predicate or limit projection flags are used improperly
if (args.useAlignedReadPredicate && forceNonParquet()) {
throw new IllegalArgumentException(
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }
Expand Down Expand Up @@ -59,6 +60,8 @@ class TransformFeatures(val args: TransformFeaturesArgs)
val companion = TransformFeatures

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

sc.loadFeatures(
args.featuresFile,
optMinPartitions = Option(args.numPartitions),
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.io.FastqRecordReader
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
Expand Down Expand Up @@ -95,6 +96,8 @@ class TransformFragments(protected val args: TransformFragmentsArgs) extends BDG
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

if (args.loadAsReads && args.saveAsReads) {
log.warn("If loading and saving as reads, consider using TransformAlignments instead.")
}
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
Expand Down Expand Up @@ -124,6 +125,8 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

require(!(args.sort && args.sortLexicographically),
"Cannot set both -sort_on_save and -sort_lexicographically_on_save.")

Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -114,6 +115,8 @@ class TransformVariants(val args: TransformVariantsArgs)
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

require(!(args.sort && args.sortLexicographically),
"Cannot set both -sort_on_save and -sort_lexicographically_on_save.")

Expand Down
2 changes: 2 additions & 0 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.formats.avro.AlignmentRecord
Expand Down Expand Up @@ -167,6 +168,7 @@ class View(val args: ViewArgs) extends BDGSparkCommand[ViewArgs] {
}

def run(sc: SparkContext) = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

val reads = sc.loadAlignments(args.inputPath)
.transform(applyFilters(_))
Expand Down

0 comments on commit 877232f

Please sign in to comment.