Skip to content

Commit

Permalink
Merge pull request #23 from broadinstitute/mlc_stdout_stderr_symbols
Browse files Browse the repository at this point in the history
stdout and stderr as symbols. Closes broadinstitute/cromwell#430
  • Loading branch information
mcovarr committed Feb 24, 2016
2 parents 593840e + 2296bd5 commit 02bb433
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 82 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/cromwell/backend/Backend.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.backend

import cromwell.backend.model.{ExecutionHash, Subscription}
import cromwell.backend.model.Subscription
import cromwell.caching.ExecutionHash

import scala.concurrent.Future

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/cromwell/backend/BackendActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package cromwell.backend
import akka.actor.{Actor, ActorLogging}
import akka.event.LoggingReceive
import cromwell.backend.BackendActor._
import cromwell.backend.model.{ExecutionHash, Subscription}
import cromwell.backend.model.Subscription
import cromwell.caching.ExecutionHash

import scala.util._

Expand Down
35 changes: 0 additions & 35 deletions src/main/scala/cromwell/backend/model/ExecutionResult.scala

This file was deleted.

40 changes: 23 additions & 17 deletions src/main/scala/cromwell/backend/model/TaskStatus.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package cromwell.backend.model

import cromwell.backend.model.Status.Status

/**
* Represents status of an execution.
*/
object Status extends Enumeration {
type Status = Value
val Created, Running, Succeeded, Failed, Canceled = Value
}
import cromwell.caching.ExecutionHash
import wdl4s.values.WdlValue

import scala.concurrent.Future


sealed trait TaskStatus extends ExecutionEvent

/**
* Defines a task intermediate status.
*/
case class TaskStatus(status: Status) extends ExecutionEvent
sealed trait NonTerminalTaskStatus extends TaskStatus
final case class CreatedTaskStatus(stdout: String, stderr: String) extends NonTerminalTaskStatus
case object RunningTaskStatus extends NonTerminalTaskStatus

/**
* Defines a task final status with the resulting data.
*/
case class TaskFinalStatus(status: Status, result: ExecutionResult) extends ExecutionEvent
sealed trait TerminalTaskStatus extends TaskStatus
final case class SucceededTaskStatus(outputs: Map[String, WdlValue], returnCode: Int, hash: ExecutionHash) extends TerminalTaskStatus
case object CanceledTaskStatus extends TerminalTaskStatus

sealed trait FailedTaskStatus extends TerminalTaskStatus
final case class FailedWithoutReturnCodeTaskStatus(error: Throwable) extends FailedTaskStatus
final case class FailedWithReturnCodeTaskStatus(error: Throwable, returnCode: Int) extends FailedTaskStatus

object Implicits {
implicit class EnhancedTerminalTaskStatus(val status: TerminalTaskStatus) extends AnyVal {
def future: Future[TerminalTaskStatus] = Future.successful(status)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.typesafe.scalalogging.StrictLogging
import cromwell.backend.BackendActor
import cromwell.backend.model._
import cromwell.backend.provider.local.FileExtensions._
import cromwell.caching.computeWdlValueHash
import cromwell.caching.{ExecutionHash, computeWdlValueHash}
import org.apache.commons.codec.digest.DigestUtils
import wdl4s.WdlExpression
import wdl4s.types.{WdlArrayType, WdlFileType, WdlType}
Expand All @@ -19,11 +19,11 @@ import wdl4s.values.{WdlArray, WdlSingleFile, WdlValue}
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, Future}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
import scala.util.{Failure, Success, Try}

object LocalBackendActor {
// Folders
Expand Down Expand Up @@ -76,6 +76,7 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
val continueOnRc = getRuntimeAttribute(task.runtimeAttributes, ContinueOnRcFlag)
val failOnStderr = getRuntimeAttribute(task.runtimeAttributes, FailOnStderrFlag)
val expressionEval = new WorkflowEngineFunctions(executionDir)
implicit val ec: ExecutionContext = context.system.dispatcher

/**
* Prepare the task and context for execution.
Expand All @@ -88,9 +89,9 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
val command = initiateCommand()
logger.debug(s"Creating bash script for executing command: $command.")
writeBashScript(command, executionDir)
notifyToSubscribers(new TaskStatus(Status.Created))
notifySubscribers(CreatedTaskStatus(stdout.toString, stderr.toString))
} catch {
case ex: Exception => notifyToSubscribers(new TaskFinalStatus(Status.Failed, FailureResult(ex)))
case ex: Exception => notifySubscribers(FailedWithoutReturnCodeTaskStatus(ex))
}
}

Expand All @@ -99,14 +100,17 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
*/
override def stop(): Unit = {
processAbortFunc.get.apply()
notifyToSubscribers(new TaskStatus(Status.Canceled))
notifySubscribers(CanceledTaskStatus)
}

/**
* Executes task in given context.
*/
override def execute(): Unit = {
notifyToSubscribers(executeTask())
executeTask() onComplete {
case Success(s) => notifySubscribers(s)
case Failure(e) => notifySubscribers(FailedWithoutReturnCodeTaskStatus(e))
}
}

/**
Expand Down Expand Up @@ -157,7 +161,7 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
*
* @param message A task status event.
*/
private def notifyToSubscribers(message: ExecutionEvent): Unit = {
private def notifySubscribers(message: ExecutionEvent): Unit = {
subscriptions.filter(subs => subs.eventType.isInstanceOf[ExecutionEvent]).foreach(
subs => subs.subscriber ! message)
}
Expand Down Expand Up @@ -313,15 +317,16 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
*
* @return A TaskStatus with the final status of the task.
*/
private def executeTask(): TaskFinalStatus = {
val commandToExecute: Seq[String] = dockerImage match {
private def executeTask(): Future[TerminalTaskStatus] = {
import Implicits._
def commandToExecute: Seq[String] = dockerImage match {
case Some(image) => buildDockerRunCommand(image).split(" ").toSeq
case None => argv
}

val process = commandToExecute.run(ProcessLogger(stdoutWriter writeWithNewline, stderrTailed writeWithNewline))
processAbortFunc = Option(() => process.destroy())
notifyToSubscribers(new TaskStatus(Status.Running))
notifySubscribers(RunningTaskStatus)
val backendCommandString = argv.map(s => "\"" + s + "\"").mkString(" ")
logger.debug(s"command: $backendCommandString")
val processReturnCode = process.exitValue() // blocks until process finishes
Expand All @@ -333,18 +338,15 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
lazy val rc = Try(returnCode.contentAsString.stripLineEnd.toInt)

if (processReturnCode != 0) {
TaskFinalStatus(Status.Failed, FailureTaskResult(
new IllegalStateException("Execution process failed."), processReturnCode, stderr.toString))
FailedWithReturnCodeTaskStatus(new IllegalStateException("Execution process failed."), processReturnCode).future
} else if (rc.isFailure) {
// case where docker fails.
TaskFinalStatus(Status.Failed, FailureTaskResult(rc.failed.get, processReturnCode, stderr.toString))
FailedWithReturnCodeTaskStatus(rc.failed.get, processReturnCode).future
} else if (failOnErrFlag && stderrFileLength > 0) {
// rc status is validated in previous step so it is safe to use .get
TaskFinalStatus(Status.Failed, FailureTaskResult(
new IllegalStateException("StdErr file is not empty."), rc.get, stderr.toString))
FailedWithReturnCodeTaskStatus(new IllegalStateException("StdErr file is not empty."), rc.get).future
} else if (rc.get != 0 && !isInContinueOnReturnCode(rc.get)) {
TaskFinalStatus(Status.Failed, FailureTaskResult(
new IllegalStateException(s"Return code is nonzero. Value: ${rc.get}"), rc.get, stderr.toString))
FailedWithReturnCodeTaskStatus(new IllegalStateException(s"Return code is nonzero. Value: ${rc.get}"), rc.get).future
} else {
def lookupFunction: String => WdlValue = WdlExpression.standardLookupFunction(task.inputs, task.declarations, expressionEval)
val outputExpressions = task.outputs.map(
Expand All @@ -360,19 +362,21 @@ class LocalBackendActor(task: TaskDescriptor) extends BackendActor with StrictLo
* @param outputExpressions Outputs.
* @return TaskStatus with final status of the task.
*/
private def processOutputResult(processReturnCode: Int, outputExpressions: Map[String, OutputStmtEval]): TaskFinalStatus = {
if (outputExpressions.values.exists(_.rhs.isFailure)) {
TaskFinalStatus(Status.Failed, FailureTaskResult(new IllegalStateException("Failed to evaluate output expressions.",
outputExpressions.values.collectFirst { case v if v.rhs.isFailure => v.rhs.failed.get } get), processReturnCode, stderr.toString))
private def processOutputResult(processReturnCode: Int, outputExpressions: Map[String, OutputStmtEval]): Future[TerminalTaskStatus] = {
import Implicits._
val expressions = outputExpressions.values

if (expressions.exists(_.rhs.isFailure)) {
// FIXME this is only getting the first failure
FailedWithReturnCodeTaskStatus(new IllegalStateException("Failed to evaluate output expressions.",
expressions.find(_.rhs.isFailure).get.rhs.failed.get), processReturnCode).future
} else {
try {
// FIXME this code is not properly structured to deal with asynchronous hash computation. The Await.result is
// gross but safe since the code isn't actually pulling down Docker image hashes, so the `Future` we're getting
// back is currently just a `Future.successful`.
TaskFinalStatus(
Status.Succeeded, SuccessfulTaskResult(outputExpressions.mapValues(resolveOutputValue), Await.result(computeHash, Duration.Inf)))
computeHash map { hash =>
SucceededTaskStatus(outputExpressions mapValues { resolveOutputValue }, processReturnCode, hash)
}
} catch {
case ex: Exception => TaskFinalStatus(Status.Failed, FailureTaskResult(ex, processReturnCode, stderr.toString))
case ex: Exception => FailedWithReturnCodeTaskStatus(ex, processReturnCode).future
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/cromwell/caching/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import scala.collection.immutable.ListMap

package object caching {

/**
* Hash of task execution.
* @param overallHash Contains a hash from TaskDescriptor.
* @param dockerHash Contains docker image hash.
*/
case class ExecutionHash(overallHash: String, dockerHash: Option[String] = None)

def computeWdlValueHash(wdlValue: WdlValue): String = {
wdlValue match {
case w: WdlObject => w.value mapValues computeWdlValueHash mkString ""
Expand Down

0 comments on commit 02bb433

Please sign in to comment.