Skip to content

Commit

Permalink
Handle globs in cwl output expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
danbills committed Nov 8, 2017
1 parent 3797778 commit 05c8dfd
Show file tree
Hide file tree
Showing 21 changed files with 102 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object RuntimeEnvironmentBuilder {
def apply(runtimeAttributes: Map[String, WomValue], jobPaths: JobPaths): MinimumRuntimeSettings => RuntimeEnvironment = {
minimums =>

val outputPath: String = jobPaths.callRoot.pathAsString
val outputPath: String = jobPaths.callExecutionRoot.pathAsString

val tempPath: String = jobPaths.callRoot.pathAsString

Expand Down
12 changes: 7 additions & 5 deletions backend/src/main/scala/cromwell/backend/io/GlobFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ trait GlobFunctions extends IoFunctionSet {
}
}

def globDirectory(glob: String): String = globName(glob) + "/"
def globName(glob: String) = s"glob-${glob.md5Sum}"

def globDirectory(glob: String): String = GlobFunctions.globName(glob) + "/"
/**
* Returns a path to the glob.
*
Expand All @@ -40,11 +38,11 @@ trait GlobFunctions extends IoFunctionSet {
*
* The paths are currently read from a list file based on the pattern, and the path parameter is not used.
*
* @param path The path string returned by globPath. This isn't currently used.
* @param pattern The pattern of the glob. This is the same "glob" passed to globPath().
* @return The paths that match the pattern.
*/
override def glob(path: String, pattern: String): Seq[String] = {
override def glob(pattern: String): Seq[String] = {
import GlobFunctions._
val globPatternName = globName(pattern)
val listFilePath = callContext.root.resolve(s"${globName(pattern)}.list")
// This "lines" is technically a read file and hence should use the readFile IO method
Expand All @@ -53,3 +51,7 @@ trait GlobFunctions extends IoFunctionSet {
}
}
}

object GlobFunctions {
def globName(glob: String) = s"glob-${glob.md5Sum}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ import cromwell.backend.async.AsyncBackendJobExecutionActor._
import cromwell.backend.async.{AbortedExecutionHandle, AsyncBackendJobExecutionActor, ExecutionHandle, FailedNonRetryableExecutionHandle, FailedRetryableExecutionHandle, PendingExecutionHandle, ReturnCodeIsNotAnInt, StderrNonEmpty, SuccessfulExecutionHandle, WrongReturnCode}
import cromwell.backend.validation._
import cromwell.backend.wdl.OutputEvaluator._
import cromwell.backend.wdl.{Command, OutputEvaluator, WdlFileMapper}
import cromwell.backend.wdl.{Command, OutputEvaluator}
import cromwell.backend._
import cromwell.core.io.{AsyncIo, DefaultIoCommandBuilder}
import cromwell.core.path.Path
import cromwell.core.{CromwellAggregatedException, CromwellFatalExceptionMarker, ExecutionEvent}
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import cromwell.services.metadata.CallMetadataKeys
import common.exception.MessageAggregation
import common.util.TryUtil
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.io.GlobFunctions
import net.ceedubs.ficus.Ficus._
import wom.WomFileMapper
import wom.values._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Promise}
Expand Down Expand Up @@ -77,7 +82,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
override lazy val configurationDescriptor: BackendConfigurationDescriptor = standardParams.configurationDescriptor

override lazy val completionPromise: Promise[BackendJobExecutionResponse] = standardParams.completionPromise

override lazy val ioActor = standardParams.ioActor

/** Backend initialization data created by the a factory initializer. */
Expand Down Expand Up @@ -116,9 +121,11 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta

/** @see [[Command.instantiate]] */
final lazy val commandLinePreProcessor: WomEvaluatedCallInputs => Try[WomEvaluatedCallInputs] = {
inputs => TryUtil.sequenceMap(inputs mapValues WdlFileMapper.mapWdlFiles(preProcessWdlFile)) recoverWith {
case e => Failure(new IOException(e.getMessage) with CromwellFatalExceptionMarker)
}
inputs =>
TryUtil.sequenceMap(inputs mapValues WomFileMapper.mapWomFiles(preProcessWdlFile)).
recoverWith {
case e => Failure(new IOException(e.getMessage) with CromwellFatalExceptionMarker)
}
}

/**
Expand All @@ -132,7 +139,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta

/** @see [[Command.instantiate]] */
final lazy val commandLineValueMapper: WomValue => WomValue = {
womValue => WdlFileMapper.mapWdlFiles(mapCommandLineWdlFile)(womValue).get
womValue => WomFileMapper.mapWomFiles(mapCommandLineWdlFile)(womValue).get
}

/**
Expand All @@ -156,22 +163,28 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* @param globFiles The globs.
* @return The shell scripting.
*/
def globManipulations(globFiles: Traversable[WomGlobFile]): String = globFiles map globManipulation mkString "\n"
def globScripts(globFiles: Traversable[WomGlobFile]): String =
globFiles map globScript mkString "\n"

/**
* Returns the shell scripting for hard linking a glob results using ln.
*
* @param globFile The glob.
* @return The shell scripting.
*/
def globManipulation(globFile: WomGlobFile): String = {
def globScript(globFile: WomGlobFile): String = {
val parentDirectory = globParentDirectory(globFile)
val globDir = backendEngineFunctions.globName(globFile.value)
val globDir = GlobFunctions.globName(globFile.value)
val globDirectory = parentDirectory./(globDir)
val globList = parentDirectory./(s"$globDir.list")

s"""|mkdir $globDirectory
s"""|# make the directory which will keep the matching files
|mkdir $globDirectory
|
|# symlink all the files into the glob directory
|( ln -L ${globFile.value} $globDirectory 2> /dev/null ) || ( ln ${globFile.value} $globDirectory )
|
|# list all the files that match the glob into a file called glob-[md5 of glob].list
|ls -1 $globDirectory > $globList
|""".stripMargin
}
Expand All @@ -187,7 +200,8 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
val rcPath = cwd./(jobPaths.returnCodeFilename)
val rcTmpPath = rcPath.plusExt("tmp")

val globFiles: ErrorOr[List[WomGlobFile]] = backendEngineFunctions.findGlobOutputs(call, jobDescriptor)
val globFiles: ErrorOr[List[WomGlobFile]] =
backendEngineFunctions.findGlobOutputs(call, jobDescriptor)

globFiles.map(globFiles =>
s"""|#!/bin/bash
Expand All @@ -211,7 +225,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
|echo $$? > $rcTmpPath
|(
|cd $cwd
|${globManipulations(globFiles)}
|${globScripts(globFiles)}
|)
|(
|cd $cwd
Expand Down Expand Up @@ -300,9 +314,9 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* This method is needed in case Cromwell restarts while a workflow was being aborted. The engine cannot guarantee
* that all backend actors will have time to receive and process the abort command before the server shuts down.
* For that reason, upon restart, this method should always try to abort the job.
*
*
* The default implementation returns a JobReconnectionNotSupportedException failure which will result in a job failure.
*
*
* @param jobId The previously recorded job id.
* @return the execution handle for the job.
*/
Expand All @@ -312,9 +326,9 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* This is in spirit similar to recover except it does not defaults back to running the job if not implemented.
* This is used after a server restart to reconnect to jobs while the workflow was Failing (because another job failed). The workflow is bound
* to fail eventually for that reason but in the meantime we want to reconnect to running jobs to update their status.
*
*
* The default implementation returns a JobReconnectionNotSupportedException failure which will result in a job failure.
*
*
* @param jobId The previously recorded job id.
* @return the execution handle for the job.
*/
Expand Down Expand Up @@ -435,7 +449,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* @return The Try wrapped and mapped wdl value.
*/
final def outputValueMapper(womValue: WomValue): Try[WomValue] = {
WdlFileMapper.mapWdlFiles(mapOutputWdlFile)(womValue)
WomFileMapper.mapWomFiles(mapOutputWdlFile)(womValue)
}

/**
Expand Down Expand Up @@ -686,7 +700,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
def handleExecutionResult(status: StandardAsyncRunStatus,
oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = {
lazy val stderrAsOption: Option[Path] = Option(jobPaths.stderr)

val stderrSizeAndReturnCode = for {
returnCodeAsString <- contentAsStringAsync(jobPaths.returnCode)
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
Expand All @@ -697,7 +711,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
stderrSizeAndReturnCode flatMap {
case (stderrSize, returnCodeAsString) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

if (isSuccess(status)) {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
Expand All @@ -715,7 +729,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
handleExecutionFailure(status, oldHandle, tryReturnCodeAsInt.toOption)
}
} recoverWith {
case exception =>
case exception =>
if (isSuccess(status)) Future.successful(FailedNonRetryableExecutionHandle(exception))
else handleExecutionFailure(status, oldHandle, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ object OutputEvaluator {
ioFunctions: IoFunctionSet,
postMapper: WomValue => Try[WomValue] = v => Success(v)): EvaluatedJobOutputs = {
val knownValues: Map[String, WomValue] = jobDescriptor.localInputs

def foldFunction(accumulatedOutputs: Try[ErrorOr[List[(OutputPort, WomValue)]]], output: ExpressionBasedOutputPort) = accumulatedOutputs flatMap { accumulated =>
// Extract the valid pairs from the job outputs accumulated so far, and add to it the inputs (outputs can also reference inputs)
val allKnownValues: Map[String, WomValue] = accumulated match {
case Valid(outputs) =>
case Valid(outputs) =>
// The evaluateValue methods needs a Map[String, WomValue], use the output port name for already computed outputs
outputs.toMap[OutputPort, WomValue].map({ case (port, value) => port.name -> value }) ++ knownValues
case Invalid(_) => knownValues
}

// Attempt to evaluate the expression using all known values
def evaluateOutputExpression: OutputResult[WomValue] = {
EitherT { Try(output.expression.evaluateValue(allKnownValues, ioFunctions)).map(_.toEither) }
}
def evaluateOutputExpression: OutputResult[WomValue] =
EitherT.fromEither[Try] {
output.expression.evaluateValue(allKnownValues, ioFunctions).toEither
}

// Attempt to coerce the womValue to the desired output type
def coerceOutputValue(womValue: WomValue, coerceTo: WomType): OutputResult[WomValue] = {
Expand All @@ -68,7 +69,7 @@ object OutputEvaluator {
(accumulated, evaluatedOutput) mapN { _ :+ _ }
}
}

val emptyValue = Success(List.empty[(OutputPort, WomValue)].validNel): Try[ErrorOr[List[(OutputPort, WomValue)]]]

// Fold over the outputs to evaluate them in order, map the result to an EvaluatedJobOutputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class TestReadLikeFunctions(sizeResult: Try[Double]) extends IoFunctionSet {

override def stderr(params: Seq[Try[WomValue]]): Try[WomFile] = ???

override def glob(path: String, pattern: String): Seq[String] = ???
override def glob(pattern: String): Seq[String] = ???

override def size(params: Seq[Try[WomValue]]): Try[WomFloat] = sizeResult.map(WomFloat.apply)
}
2 changes: 1 addition & 1 deletion core/src/main/scala/cromwell/core/NoIoFunctionSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ case object NoIoFunctionSet extends IoFunctionSet {

override def stderr(params: Seq[Try[WomValue]]): Try[WomFile] = Failure(new NotImplementedError("stderr is not available here"))

override def glob(path: String, pattern: String): Seq[String] = throw new NotImplementedError("glob is not available here")
override def glob(pattern: String): Seq[String] = throw new NotImplementedError("glob is not available here")

override def size(params: Seq[Try[WomValue]]): Try[WomFloat] = Failure(new NotImplementedError("size is not available here"))
}
2 changes: 1 addition & 1 deletion cwl/src/main/scala/cwl/CommandOutputBinding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ case class CommandOutputBinding(
http://www.commonwl.org/v1.0/CommandLineTool.html#CommandOutputBinding
*/
def commandOutputBindingToWdlValue(parameterContext: ParameterContext,
def commandOutputBindingToWomValue(parameterContext: ParameterContext,
ioFunctionSet: IoFunctionSet): WomValue = {

val paths: Seq[String] = glob map { globValue =>
Expand Down
28 changes: 23 additions & 5 deletions cwl/src/main/scala/cwl/CwlWomExpression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,34 @@ case class CommandOutputExpression(outputBinding: CommandOutputBinding,

// TODO WOM: outputBinding.toString is probably not be the best representation of the outputBinding
override def sourceString = outputBinding.toString

override def evaluateValue(inputValues: Map[String, WomValue], ioFunctionSet: IoFunctionSet): ErrorOr[WomValue] = {
val parameterContext = ParameterContext.Empty.withInputs(inputValues, ioFunctionSet)

val wdlValue: WomValue = outputBinding.commandOutputBindingToWdlValue(parameterContext, ioFunctionSet)
val extractFile: WomValue =
wdlValue match {
//To facilitate ECMAScript evaluation, filenames are stored in a map under the key "location"
val womValue = outputBinding.
commandOutputBindingToWomValue(parameterContext, ioFunctionSet) match {
case WomArray(_, Seq(WomMap(WomMapType(WomStringType, WomStringType), map))) => map(WomString("location"))
case other => other
}
cwlExpressionType.coerceRawValue(extractFile).toErrorOr

//If the value is a string but the output is expecting a file, we consider that string a POSIX "glob" and apply
//it accordingly to retrieve the file list to which it expands.
val globbedIfFile =
(womValue, cwlExpressionType) match {

//In the case of a single file being expected, we must enforce that the glob only represents a single file
case (WomString(glob), WomFileType) =>
ioFunctionSet.glob(glob) match {
case head :: Nil => WomString(head)
case list => throw new RuntimeException(s"expecting a single File glob but instead got $list")
}

case _ => womValue
}

//CWL tells us the type this output is expected to be. Attempt to coerce the actual output into this type.
cwlExpressionType.coerceRawValue(globbedIfFile).toErrorOr
}

/*
Expand All @@ -41,7 +59,7 @@ case class CommandOutputExpression(outputBinding: CommandOutputBinding,
*/
override def evaluateFiles(inputs: Map[String, WomValue], ioFunctionSet: IoFunctionSet, coerceTo: WomType): ErrorOr[Set[WomFile]] ={

val pc = ParameterContext.Empty.withInputs(inputs, ioFunctionSet)
val pc = ParameterContext().withInputs(inputs, ioFunctionSet)

val files = for {
globValue <- outputBinding.glob.toList
Expand Down
4 changes: 3 additions & 1 deletion cwl/src/main/scala/cwl/GlobEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ object GlobEvaluator {
}

implicit def caseArrayString: Case.Aux[Array[String], GlobHandler] = {
at[Array[String]] { const(_) }
at[Array[String]] { _ =>
throw new NotImplementedError("The Array[String] case of Glob evaluator has not yet been implemented")
}
}

implicit def caseString: Case.Aux[String, GlobHandler] = {
Expand Down
2 changes: 1 addition & 1 deletion cwl/src/test/scala/cwl/CommandOutputExpressionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class CommandOutputExpressionSpec extends FlatSpec with Matchers {
override def writeFile(path: String, content: String) = throw new Exception("writeFile should not be used in this test")
override def stdout(params: Seq[Try[WomValue]]) = throw new Exception("stdout should not be used in this test")
override def stderr(params: Seq[Try[WomValue]]) = throw new Exception("stderr should not be used in this test")
override def glob(path: String, pattern: String) = throw new Exception("glob should not be used in this test")
override def glob(pattern: String): Seq[String] = throw new Exception("glob should not be used in this test")
override def size(params: Seq[Try[WomValue]]) = throw new Exception("size should not be used in this test")
}

Expand Down
2 changes: 1 addition & 1 deletion engine/src/main/scala/cromwell/engine/WdlFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class WdlFunctions(val pathBuilders: List[PathBuilder]) extends ReadLikeFunction

override def stdout(params: Seq[Try[WomValue]]): Try[WomFile] = fail("stdout")
override def stderr(params: Seq[Try[WomValue]]): Try[WomFile] = fail("stderr")
override def glob(path: String, pattern: String): Seq[String] = throw new NotImplementedError(s"glob(path, pattern) not implemented yet")
override def glob(pattern: String): Seq[String] = throw new NotImplementedError(s"glob(path, pattern) not implemented yet")

// TODO WOM: fix this
override def writeFile(path: String, content: String): Future[WomFile] = Future.failed(new Exception("Can't write files"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.io.GlobFunctions
import org.slf4j.LoggerFactory
import wom.callable.Callable.OutputDefinition
import wom.core.FullyQualifiedName
Expand Down Expand Up @@ -234,7 +235,7 @@ class JesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
}

private def generateJesGlobFileOutputs(wdlFile: WomGlobFile): List[JesFileOutput] = {
val globName = backendEngineFunctions.globName(wdlFile.value)
val globName = GlobFunctions.globName(wdlFile.value)
val globDirectory = globName + "/"
val globListFile = globName + ".list"
val gcsGlobDirectoryDestinationPath = callRootPath.resolve(globDirectory).pathAsString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import cromwell.backend.impl.jes.RunStatus.UnsuccessfulRunStatus
import cromwell.backend.impl.jes.io.{DiskType, JesWorkingDisk}
import cromwell.backend.impl.jes.statuspolling.JesApiQueryManager.DoPoll
import cromwell.backend.standard.{DefaultStandardAsyncExecutionActorParams, StandardAsyncExecutionActorParams, StandardAsyncJob, StandardExpressionFunctionsParams}
import cromwell.backend.wdl.WdlFileMapper
import cromwell.cloudsupport.gcp.gcs.GcsStorage
import cromwell.core.Tags.PostWomTest
import cromwell.core._
Expand All @@ -34,6 +33,7 @@ import org.scalatest.prop.Tables.Table
import org.slf4j.Logger
import org.specs2.mock.Mockito
import spray.json._
import wom.WomFileMapper
import wom.graph.TaskCallNode
import wom.types._
import wom.values._
Expand Down Expand Up @@ -381,7 +381,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend


def gcsPathToLocal(womValue: WomValue): WomValue = {
WdlFileMapper.mapWdlFiles(testActorRef.underlyingActor.mapCommandLineWdlFile)(womValue).get
WomFileMapper.mapWomFiles(testActorRef.underlyingActor.mapCommandLineWdlFile)(womValue).get
}

val mappedInputs = jobDescriptor.localInputs mapValues gcsPathToLocal
Expand Down Expand Up @@ -652,7 +652,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
props, s"TestableJesJobExecutionActor-${jobDescriptor.workflowDescriptor.id}")

def wdlValueToGcsPath(jesOutputs: Set[JesFileOutput])(womValue: WomValue): WomValue = {
WdlFileMapper.mapWdlFiles(testActorRef.underlyingActor.wdlFileToGcsPath(jesOutputs))(womValue).get
WomFileMapper.mapWomFiles(testActorRef.underlyingActor.wdlFileToGcsPath(jesOutputs))(womValue).get
}

val result = outputValues map wdlValueToGcsPath(jesOutputs)
Expand Down

0 comments on commit 05c8dfd

Please sign in to comment.