Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-1875] Add ability to timeout a piped command. #1881

Merged
merged 1 commit into from Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 16 additions & 9 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -454,21 +454,27 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process. Default is empty.
* @param flankSize Number of bases to flank each command invocation by.
* @param optTimeout An optional parameter specifying how long to let a single
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc that this is in seconds, or alternatively allow the caller to specify the TimeUnit

* partition run for, in seconds. If the partition times out, the partial
* results will be returned, and no exception will be logged. The partition
* will log that the command timed out.
* @return Returns a new GenomicRDD of type Y.
*
* @tparam X The type of the record created by the piped command.
* @tparam Y A GenomicRDD containing X's.
* @tparam V The InFormatter to use for formatting the data being piped to the
* command.
*/
def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](cmd: String,
files: Seq[String] = Seq.empty,
environment: Map[String, String] = Map.empty,
flankSize: Int = 0)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {
def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](
cmd: String,
files: Seq[String] = Seq.empty,
environment: Map[String, String] = Map.empty,
flankSize: Int = 0,
optTimeout: Option[Int] = None)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {

// TODO: support broadcasting files
files.foreach(f => {
Expand Down Expand Up @@ -554,7 +560,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
new OutFormatterRunner[X, OutFormatter[X]](xFormatter,
is,
process,
finalCmd)
finalCmd,
optTimeout)
} else {
Iterator[X]()
}
Expand Down
Expand Up @@ -19,20 +19,50 @@ package org.bdgenomics.adam.rdd

import java.io.InputStream
import java.lang.Process
import java.util.concurrent.Callable
import java.util.concurrent.{ Callable, TimeUnit }
import org.bdgenomics.utils.misc.Logging

private[rdd] class OutFormatterRunner[T, U <: OutFormatter[T]](formatter: U,
is: InputStream,
process: Process,
finalCmd: List[String]) extends Iterator[T] {
finalCmd: List[String],
optTimeout: Option[Int]) extends Iterator[T] with Logging {

private val startTime = System.currentTimeMillis()
private val iter = formatter.read(is)

private def hasTimedOut(): Boolean = {
optTimeout.map(timeoutSec => {
val currTime = System.currentTimeMillis()
(currTime - startTime) >= (timeoutSec * 1000L)
}).getOrElse(false)
}

private def timeLeft(timeout: Int): Long = {
val currTime = System.currentTimeMillis()
(timeout * 1000L) - currTime
}

def hasNext: Boolean = {
if (iter.hasNext) {
if (hasTimedOut()) {
log.warn("Piped command %s timed out after %d seconds.".format(
finalCmd, optTimeout.get))
process.destroy()
false
} else if (iter.hasNext) {
true
} else {
val exitCode = process.waitFor()
val exitCode = optTimeout.fold(process.waitFor())(timeout => {
val exited = process.waitFor(timeLeft(timeout), TimeUnit.MILLISECONDS)
if (exited) {
process.exitValue()
} else {
log.warn("Piped command %s timed out after %d seconds.".format(
finalCmd, timeout))
process.destroy()
0
}
})
if (exitCode != 0) {
throw new RuntimeException("Piped command %s exited with error code %d.".format(
finalCmd, exitCode))
Expand Down
21 changes: 21 additions & 0 deletions adam-core/src/test/resources/timeout.py
@@ -0,0 +1,21 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import time

# read lines from stdin
lines = sys.stdin.readlines()

def print_lines(skip_header=False):
for line in lines:
if not (skip_header and line.startswith('@')):
print(line.strip().rstrip())

print_lines()
sys.stdout.flush()

time.sleep(10)

print_lines(skip_header=True)
sys.stdout.flush()
Expand Up @@ -830,6 +830,51 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
assert(records === newRecords)
}

sparkTest("lose all records when a command times out") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = ardd.pipe("sleep 10", optTimeout = Some(5))
val newRecords = pipedRdd.rdd.count
assert(newRecords === 0)
}

sparkTest("lose no records without a timeout") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

// this script reads the reads into a temp file, which is then read to
// stdout, then we sleep for 10 sec, then we read to stdout again
val scriptPath = testFile("timeout.py")
val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0",
files = Seq(scriptPath))
val newRecords = pipedRdd.rdd.count
assert(newRecords === (2 * ardd.rdd.count))
}

sparkTest("lose some records when a command times out") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

// this script reads the reads into a temp file, which is then read to
// stdout, then we sleep for 10 sec, then we read to stdout again
val scriptPath = testFile("timeout.py")
val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0",
optTimeout = Some(5),
files = Seq(scriptPath))
val newRecords = pipedRdd.rdd.count
assert(newRecords === ardd.rdd.count)
}

sparkTest("don't lose any reads when piping as SAM using java pipe") {
val reads12Path = testFile("reads12.sam")
val ardd = sc.loadBam(reads12Path)
Expand Down