Skip to content

Commit

Permalink
[ADAM-1509] Adding root dir path to pipe API.
Browse files Browse the repository at this point in the history
Resolves #1509. This allows the directory where the SparkFiles are copied
to be accessed and included in a piped command.
  • Loading branch information
fnothaft committed Apr 30, 2017
1 parent 81edd53 commit 2ab0657
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.rdd

import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine }
import java.lang.Thread
import java.nio.file.Paths
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.metadata.CompressionCodecName
Expand Down Expand Up @@ -62,11 +63,15 @@ private[rdd] object GenomicRDD {
*/
def processCommand(cmd: String,
files: Seq[String]): List[String] = {
val filesWithIndex = files.zipWithIndex
val filesWithIndex: Seq[(String, String)] = files.zipWithIndex
.map(p => {
val (file, index) = p
("$%d".format(index), file)
}).reverse
val rootPath: (String, String) = ("$root",
Paths.get(SparkFiles.getRootDirectory())
.toAbsolutePath.toString)
val filesAndPath: Seq[(String, String)] = filesWithIndex ++ Seq(rootPath)

@tailrec def replaceEscapes(cmd: String,
iter: Iterator[(String, String)]): String = {
Expand All @@ -81,7 +86,7 @@ private[rdd] object GenomicRDD {

cmd.split(" ")
.map(s => {
replaceEscapes(s, filesWithIndex.toIterator)
replaceEscapes(s, filesAndPath.toIterator)
}).toList
}
}
Expand Down Expand Up @@ -182,7 +187,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* Pipes genomic data to a subprocess that runs in parallel using Spark.
*
* Files are substituted in to the command with a $x syntax. E.g., to invoke
* a command that uses the first file from the files Seq, use $0.
* a command that uses the first file from the files Seq, use $0. To access
* the path to the directory where the files are copied, use $root.
*
* Pipes require the presence of an InFormatterCompanion and an OutFormatter
* as implicit values. The InFormatterCompanion should be a singleton whose
Expand Down
Expand Up @@ -17,41 +17,41 @@
*/
package org.bdgenomics.adam.rdd

import org.scalatest.FunSuite
import org.bdgenomics.adam.util.ADAMFunSuite

class GenomicRDDSuite extends FunSuite {
class GenomicRDDSuite extends ADAMFunSuite {

test("processing a command that is just a single word should do nothing") {
sparkTest("processing a command that is just a single word should do nothing") {
val cmd = GenomicRDD.processCommand("ls", Seq.empty)

assert(cmd.size === 1)
assert(cmd.head === "ls")
}

test("processing a command that is a single substitution should succeed") {
sparkTest("processing a command that is a single substitution should succeed") {
val cmd = GenomicRDD.processCommand("$0", Seq("/bin/bash"))

assert(cmd.size === 1)
assert(cmd.head === "/bin/bash")
}

test("processing a command that is multiple words should split the string") {
sparkTest("processing a command that is multiple words should split the string") {
val cmd = GenomicRDD.processCommand("tee /dev/null", Seq.empty)

assert(cmd.size === 2)
assert(cmd(0) === "tee")
assert(cmd(1) === "/dev/null")
}

test("process a command that is multiple words with a replacement") {
sparkTest("process a command that is multiple words with a replacement") {
val cmd = GenomicRDD.processCommand("echo $0", Seq("/path/to/my/file"))

assert(cmd.size === 2)
assert(cmd(0) === "echo")
assert(cmd(1) === "/path/to/my/file")
}

test("process a command that is multiple words with multiple replacements") {
sparkTest("process a command that is multiple words with multiple replacements") {
val cmd = GenomicRDD.processCommand("aCommand $0 hello $1", Seq("/path/to/my/file",
"/path/to/another/file"))

Expand Down

0 comments on commit 2ab0657

Please sign in to comment.