Skip to content

Commit

Permalink
Add support for stdout/stderr gathering
Browse files Browse the repository at this point in the history
  • Loading branch information
scottfrazer committed Aug 5, 2015
1 parent 11c0d49 commit 69f80f5
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 45 deletions.
44 changes: 39 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ Workflow engine using [WDL](https://github.com/broadinstitute/wdl/blob/wdl2/SPEC
* [POST /workflows/:version](#post-workflowsversion)
* [GET /workflows/:version/:id/status](#get-workflowsversionidstatus)
* [GET /workflows/:version/:id/outputs](#get-workflowsversionidoutputs)
* [POST /workflows/:version/:id/abort](#post-workflowsversionidabort)
* [GET /workflows/:version/:id/outputs/:call](#get-workflowsversionidoutputscall)
* [GET /workflows/:version/:id/logs/:call](#get-workflowsversionidlogscall)
* [POST /workflows/:version/:id/abort](#post-workflowsversionidabort)
* [Developer](#developer)
* [Generate WDL Parser](#generate-wdl-parser)
* [Generating and Hosting ScalaDoc](#generating-and-hosting-scaladoc)
Expand Down Expand Up @@ -411,7 +412,7 @@ $ java -jar target/scala-2.11/cromwell-0.7.jar run hello.wdl hello.json
... truncated ...
{
"test.hello.response": "Hello world!",
"test.hello.response": "Hello boston!"
"test.hello2.response": "Hello boston!"
}
```

Expand Down Expand Up @@ -745,8 +746,43 @@ Server: spray-can/1.3.3
}
}
```
## GET /workflows/:version/:id/logs/:call

This will return paths to the standard out and standard error files that were generated during the execution
of a particular fully-qualified name for a call.

cURL:

```
$ curl http://localhost:8000/workflows/v1/b3e45584-9450-4e73-9523-fc3ccf749848/logs/three_step.wc
```

HTTPie:

```
$ http http://localhost:8000/workflows/v1/b3e45584-9450-4e73-9523-fc3ccf749848/logs/three_step.wc
```

Response:
```
HTTP/1.1 200 OK
Content-Length: 379
Content-Type: application/json; charset=UTF-8
Date: Mon, 03 Aug 2015 17:11:28 GMT
Server: spray-can/1.3.3
{
"id": "b3e45584-9450-4e73-9523-fc3ccf749848",
"logs": {
"three_step.wc": {
"stderr": "/Users/sfrazer/projects/cromwell/cromwell-executions/test/b3e45584-9450-4e73-9523-fc3ccf749848/call-hello/stderr6126967977036995110.tmp",
"stdout": "/Users/sfrazer/projects/cromwell/cromwell-executions/test/b3e45584-9450-4e73-9523-fc3ccf749848/call-hello/stdout6128485235785447571.tmp"
}
}
}
```

### POST /workflows/:version/:id/abort
## POST /workflows/:version/:id/abort

cURL:

Expand Down Expand Up @@ -774,8 +810,6 @@ Server: spray-can/1.3.3
}
```



# Developer

## Generate WDL Parser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,10 @@ object WdlValueJsonFormatter extends DefaultJsonProtocol {
def read(value: JsValue) = ???
}
}

object WdlFileJsonFormatter extends DefaultJsonProtocol {
implicit object WdlFileJsonFormat extends RootJsonFormat[WdlFile] {
def write(value: WdlFile) = JsString(value.value)
def read(value: JsValue) = ???
}
}
7 changes: 7 additions & 0 deletions src/main/scala/cromwell/engine/backend/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object Backend {
class StdoutStderrException(message: String) extends RuntimeException(message)
def from(backendConf: Config): Backend = {
backendConf.getString("backend").toLowerCase match {
case "local" => new LocalBackend
Expand Down Expand Up @@ -63,6 +64,12 @@ trait Backend {
*/
def handleCallRestarts(restartableWorkflows: Seq[RestartableWorkflow], dataAccess: DataAccess)(implicit ec: ExecutionContext): Future[Any]

/**
* Return CallStandardOutput which contains the stdout/stderr of the particular call
*/

def stdoutStderr(workflowId: WorkflowId, workflowName: String, callName: String): StdoutStderr

/**
* Presuming successful completion of the specified call, evaluate its outputs.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/cromwell/engine/backend/StdoutStderr.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package cromwell.engine.backend

import cromwell.binding.values.WdlFile

case class StdoutStderr(stdout: WdlFile, stderr: WdlFile)
20 changes: 15 additions & 5 deletions src/main/scala/cromwell/engine/backend/jes/JesBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import cromwell.binding.WdlExpression._
import cromwell.binding._
import cromwell.binding.types.WdlFileType
import cromwell.binding.values._
import cromwell.engine.backend.Backend
import cromwell.engine.WorkflowId
import cromwell.engine.backend.{StdoutStderr, Backend}
import cromwell.engine.backend.Backend.RestartableWorkflow
import cromwell.engine.backend.jes.JesBackend._
import cromwell.engine.db.DataAccess
import cromwell.parser.BackendType
import cromwell.util.TryUtil
import cromwell.util.google.GoogleCloudStoragePath
import scala.concurrent.{Future, ExecutionContext}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import JesBackend._

object JesBackend {
private lazy val JesConf = ConfigFactory.load.getConfig("backend").getConfig("jes")
Expand All @@ -39,10 +41,12 @@ object JesBackend {
val LocalStdoutValue = "job.stdout.txt"
val LocalStderrValue = "job.stderr.txt"

private def callGcsPath(workflowId: String, workflowName: String, callName: String): String =
s"$CromwellExecutionBucket/$workflowName/$workflowId/call-$callName"

// Decoration around WorkflowDescriptor to generate bucket names and the like
implicit class JesWorkflowDescriptor(val descriptor: WorkflowDescriptor) extends AnyVal {
def bucket = s"$CromwellExecutionBucket/${descriptor.name}/${descriptor.id.toString}"
def callDir(call: Call) = s"$bucket/call-${call.name}"
def callDir(call: Call) = callGcsPath(descriptor.id.toString, descriptor.name, call.name)
}

def stderrJesOutput(callGcsPath: String): JesOutput = JesOutput(LocalStderrParamName, s"$callGcsPath/$LocalStderrValue", Paths.get(LocalStderrValue))
Expand Down Expand Up @@ -131,6 +135,12 @@ class JesBackend extends Backend with LazyLogging {
JesOutput(value, engineFunctions.gcsPathFromAnyString(value).toString, Paths.get(value))
}

override def stdoutStderr(workflowId: WorkflowId, workflowName: String, callName: String): StdoutStderr =
StdoutStderr(
stdout = WdlFile(callGcsPath(workflowId.toString, workflowName, callName) + s"/$LocalStdoutValue"),
stderr = WdlFile(callGcsPath(workflowId.toString, workflowName, callName) + s"/$LocalStderrValue")
)

override def executeCommand(instantiatedCommandLine: String,
workflowDescriptor: WorkflowDescriptor,
call: Call,
Expand Down
30 changes: 27 additions & 3 deletions src/main/scala/cromwell/engine/backend/local/LocalBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import com.typesafe.scalalogging.LazyLogging
import cromwell.binding.WdlExpression.ScopedLookupFunction
import cromwell.binding._
import cromwell.binding.values.{WdlArray, WdlFile, WdlValue}
import cromwell.engine.backend.Backend
import cromwell.engine.backend.Backend.RestartableWorkflow
import cromwell.engine.backend.{StdoutStderr, Backend}
import cromwell.engine.backend.Backend.{RestartableWorkflow, StdoutStderrException}
import cromwell.engine.db.{CallStatus, DataAccess}
import cromwell.engine.{ExecutionStatus, WorkflowId}
import cromwell.parser.BackendType
import cromwell.util.FileUtil
import cromwell.util.FileUtil._
import org.apache.commons.io.FileUtils

import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
import scala.sys.process._
import scala.util.{Failure, Try}
import scala.util.{Failure, Success, Try}

object LocalBackend {

Expand All @@ -36,6 +37,23 @@ object LocalBackend {
}
}

def findTempFile(root: Path, prefix: String) = {
val regex = s"$prefix.*\\.tmp$$".r.unanchored
val filesWithPrefix = Try(Files.newDirectoryStream(root)).map(
stream => stream.iterator().toIterator.toSeq.collect {
case path:Path if regex.findFirstIn(path.toAbsolutePath.toString).isDefined => path
}
)
filesWithPrefix match {
case Success(paths) => paths match {
case Seq(head) => WdlFile(head.toAbsolutePath.toString)
case Seq() => throw new StdoutStderrException(s"No $prefix file found")
case s:Seq[Path] if s.size > 1 => throw new StdoutStderrException(s"Multiple files matched with prefix $prefix:\n${s.map(_.toString).mkString(", ")}")
}
case Failure(ex) => throw ex
}
}

/**
* {{{cromwell-executions + workflow.name + workflow.id = cromwell-executions/three-step/0f00-ba4}}}
*/
Expand All @@ -60,6 +78,12 @@ class LocalBackend extends Backend with LazyLogging {

import LocalBackend._

override def stdoutStderr(workflowId: WorkflowId, workflowName: String, callName: String): StdoutStderr =
StdoutStderr(
stdout = findTempFile(hostCallPath(workflowName, workflowId, callName), prefix = "stdout"),
stderr = findTempFile(hostCallPath(workflowName, workflowId, callName), prefix = "stderr")
)

/**
* Executes the specified command line, using the supplied lookup function for expression evaluation.
* Returns a `Map[String, Try[WdlValue]]` of output names to values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import com.typesafe.config.ConfigFactory
import cromwell.binding
import cromwell.binding._
import cromwell.engine._
import cromwell.engine.backend.Backend
import cromwell.engine.backend.Backend.RestartableWorkflow
import cromwell.engine.backend.{Backend, StdoutStderr}
import cromwell.engine.db.DataAccess
import cromwell.engine.db.DataAccess.WorkflowInfo
import cromwell.engine.workflow.WorkflowActor.{Restart, Start}
Expand All @@ -32,6 +32,7 @@ object WorkflowManagerActor {
case class WorkflowStatus(id: WorkflowId) extends WorkflowManagerActorMessage
case class WorkflowOutputs(id: WorkflowId) extends WorkflowManagerActorMessage
case class CallOutputs(id: WorkflowId, callFqn: FullyQualifiedName) extends WorkflowManagerActorMessage
case class CallStdoutStderr(id: WorkflowId, callFqn: FullyQualifiedName) extends WorkflowManagerActorMessage
case object Shutdown extends WorkflowManagerActorMessage
case class SubscribeToWorkflow(id: WorkflowId) extends WorkflowManagerActorMessage
case class WorkflowAbort(id: WorkflowId) extends WorkflowManagerActorMessage
Expand Down Expand Up @@ -71,6 +72,7 @@ class WorkflowManagerActor(dataAccess: DataAccess, backend: Backend) extends Act
case Shutdown => context.system.shutdown()
case WorkflowOutputs(id) => workflowOutputs(id) pipeTo sender
case CallOutputs(workflowId, callName) => callOutputs(workflowId, callName) pipeTo sender
case CallStdoutStderr(workflowId, callName) => callStdoutStderr(workflowId, callName) pipeTo sender
case CurrentState(actor, state: WorkflowState) => updateWorkflowState(actor, state)
case Transition(actor, oldState, newState: WorkflowState) => updateWorkflowState(actor, newState)
case SubscribeToWorkflow(id) =>
Expand Down Expand Up @@ -115,6 +117,23 @@ class WorkflowManagerActor(dataAccess: DataAccess, backend: Backend) extends Act
}
}

/* Return value here is a tuple: (workflow name, call name) */
private def assertCallFqnWellFormed(callFqn: FullyQualifiedName): Future[(String, String)] = {
callFqn.split("\\.").toSeq match {
case s:Seq[String] if s.size >= 2 => Future.successful(s.head, s.last)
case _ => throw new UnsupportedOperationException("Expected a fully qualified name to have at least two parts")
}
}

private def callStdoutStderr(workflowId: WorkflowId, callFqn: String): Future[StdoutStderr] = {
for {
_ <- assertWorkflowExistence(workflowId)
_ <- assertCallExistence(workflowId, callFqn)
(wf, call) <- assertCallFqnWellFormed(callFqn)
callStandardOutput <- Future.successful(backend.stdoutStderr(workflowId, wf, call))
} yield callStandardOutput
}

private def submitWorkflow(wdlSource: WdlSource, wdlJson: WdlJson, inputs: WorkflowRawInputs,
maybeWorkflowId: Option[WorkflowId]): Future[WorkflowId] = {
val workflowId = maybeWorkflowId.getOrElse(UUID.randomUUID())
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/cromwell/webservice/ApiDataModels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.webservice
import com.wordnik.swagger.annotations.{ApiModel, ApiModelProperty}
import cromwell.binding.FullyQualifiedName
import cromwell.binding.values.WdlValue
import cromwell.engine.backend.StdoutStderr

import scala.annotation.meta.field

Expand Down Expand Up @@ -50,3 +51,10 @@ case class CallOutputResponse(
outputs: Map[FullyQualifiedName, WdlValue]
)

@ApiModel(value = "CallStdoutStderr")
case class CallStdoutStderrResponse(
@(ApiModelProperty@field)(required = true, value = "The identifier of the workflow")
id: String,
@(ApiModelProperty@field)(required = true, value = "The fully qualified name of the call")
logs: Map[String, StdoutStderr]
)
13 changes: 13 additions & 0 deletions src/main/scala/cromwell/webservice/CromwellApiHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.pattern.ask
import akka.util.Timeout
import cromwell.binding.{WdlJson, WdlSource, WorkflowRawInputs}
import cromwell.engine._
import cromwell.engine.backend.{Backend, StdoutStderr}
import cromwell.engine.workflow.WorkflowManagerActor
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowManagerActorMessage
import cromwell.parser.WdlParser.SyntaxError
Expand Down Expand Up @@ -37,6 +38,8 @@ object CromwellApiHandler {
case class WorkflowAbort(id: WorkflowId) extends WorkflowManagerMessage

case class CallOutputs(id: WorkflowId, callFqn: String) extends WorkflowManagerMessage

case class CallStdoutStderr(id: WorkflowId, callFqn: String) extends WorkflowManagerMessage
}

class CromwellApiHandler(workflowManager: ActorRef) extends Actor {
Expand Down Expand Up @@ -108,6 +111,16 @@ class CromwellApiHandler(workflowManager: ActorRef) extends Actor {
case Failure(ex: WorkflowManagerActor.CallNotFoundException) => context.parent ! RequestComplete(StatusCodes.NotFound, s"Call $callFqn not found for workflow '$id'.")
case Failure(ex) => context.parent ! RequestComplete(StatusCodes.InternalServerError, ex.getMessage)
}

case CallStdoutStderr(id, callFqn) =>
val eventualCallLogs = ask(workflowManager, WorkflowManagerActor.CallStdoutStderr(id, callFqn)).mapTo[StdoutStderr]
eventualCallLogs onComplete {
case Success(logs) => context.parent ! RequestComplete(StatusCodes.OK, CallStdoutStderrResponse(id.toString, Map(callFqn -> logs)))
case Failure(ex: WorkflowManagerActor.WorkflowNotFoundException) => context.parent ! RequestComplete(StatusCodes.NotFound, ex.getMessage)
case Failure(ex: WorkflowManagerActor.CallNotFoundException) => context.parent ! RequestComplete(StatusCodes.NotFound, ex.getMessage)
case Failure(ex: Backend.StdoutStderrException) => context.parent ! RequestComplete(StatusCodes.InternalServerError, ex.getMessage)
case Failure(ex) => context.parent ! RequestComplete(StatusCodes.InternalServerError, ex.getMessage)
}
}

/**
Expand Down
36 changes: 34 additions & 2 deletions src/main/scala/cromwell/webservice/CromwellApiService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object CromwellApiService {
trait CromwellApiService extends HttpService with PerRequestCreator {
val workflowManager: ActorRef

val workflowRoutes = queryRoute ~ workflowOutputsRoute ~ submitRoute ~ callOutputsRoute ~ abortRoute
val workflowRoutes = queryRoute ~ workflowOutputsRoute ~ submitRoute ~ callOutputsRoute ~ callStdoutStderrRoute ~ abortRoute

@Path("/{version}/{id}/status")
@ApiOperation(
Expand Down Expand Up @@ -231,7 +231,39 @@ trait CromwellApiService extends HttpService with PerRequestCreator {
case Success(w) =>
// This currently does not attempt to parse the call name for conformation to any pattern.
requestContext => perRequest(requestContext, CromwellApiHandler.props(workflowManager), CromwellApiHandler.CallOutputs(w, callFqn))
case Failure(ex) =>
case Failure(_) =>
complete(StatusCodes.BadRequest, s"Invalid workflow ID: '$workflowId'.")
}
}

@Path("/{version}/{workflowId}/logs/{callFqn}")
@ApiOperation(
value = "Query for the standard output and standard error of a 'call' from its fully qualified name (e.g. my_workflow.my_call).",
nickname = "call-logs",
httpMethod = "GET",
produces = "application/json"
//,response = classOf[CallOutputs]
)
@ApiImplicitParams(Array(
new ApiImplicitParam(name = "version", required = true, dataType = "string", paramType = "path", value = "API Version", allowableValues = CromwellApiService.VersionAllowableValues),
new ApiImplicitParam(name = "workflowId", required = true, dataType = "string", paramType = "path", value = "Workflow ID"),
new ApiImplicitParam(name = "callFqn", required = true, dataType = "string", paramType = "path", value = "Call fully qualified name")
))
@ApiResponses(Array(
// See the note above regarding Swagger brokenness for why there is no response classOf here.
new ApiResponse(code = 200, message = "Successful Request"),
new ApiResponse(code = 404, message = "Workflow ID Not Found"),
new ApiResponse(code = 404, message = "Call Fully Qualified Name Not Found"),
new ApiResponse(code = 400, message = "Malformed Workflow ID"),
new ApiResponse(code = 500, message = "Internal Error")
))
def callStdoutStderrRoute =
path("workflows" / Segment / Segment / "logs" / Segment) { (version, workflowId, callFqn) =>
Try(UUID.fromString(workflowId)) match {
case Success(w) =>
// This currently does not attempt to parse the call name for conformation to any pattern.
requestContext => perRequest(requestContext, CromwellApiHandler.props(workflowManager), CromwellApiHandler.CallStdoutStderr(w, callFqn))
case Failure(_) =>
complete(StatusCodes.BadRequest, s"Invalid workflow ID: '$workflowId'.")
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package cromwell.webservice

import spray.json.DefaultJsonProtocol
import cromwell.binding.values.WdlValueJsonFormatter._

import cromwell.binding.values.WdlFileJsonFormatter._
import cromwell.engine.backend.StdoutStderr
import spray.json.DefaultJsonProtocol

object WorkflowJsonSupport extends DefaultJsonProtocol {
implicit val workflowStatusResponseProtocol = jsonFormat2(WorkflowStatusResponse)
implicit val workflowAbortResponseProtocol = jsonFormat2(WorkflowAbortResponse)
implicit val workflowSubmitResponseProtocol = jsonFormat2(WorkflowSubmitResponse)
implicit val workflowOutputResponseProtocol = jsonFormat2(WorkflowOutputResponse)
implicit val callOutputResponseProtocol = jsonFormat3(CallOutputResponse)
implicit val callLogsResponseProtocol = jsonFormat2(StdoutStderr)
implicit val callStdoutStderrResponse = jsonFormat2(CallStdoutStderrResponse)
}

0 comments on commit 69f80f5

Please sign in to comment.