Skip to content

Commit

Permalink
Add support for cwl.output.json (#3152)
Browse files Browse the repository at this point in the history
* Add support for cwl.output.json
  • Loading branch information
Horneth committed Jan 18, 2018
1 parent 6fd20e7 commit ed90b8d
Show file tree
Hide file tree
Showing 19 changed files with 338 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import cromwell.core._
import wom.expression.IoFunctionSet
import wom.values.WomValue

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Success, Try}

object BackendJobExecutionActor {
Expand Down Expand Up @@ -107,7 +108,7 @@ trait BackendJobExecutionActor extends BackendJobLifecycleActor with ActorLoggin
}

def evaluateOutputs(wdlFunctions: IoFunctionSet,
postMapper: WomValue => Try[WomValue] = v => Success(v)): EvaluatedJobOutputs = {
OutputEvaluator.evaluateOutputs(jobDescriptor, wdlFunctions, postMapper)
postMapper: WomValue => Try[WomValue] = v => Success(v))(implicit ec: ExecutionContext): EvaluatedJobOutputs = {
Await.result(OutputEvaluator.evaluateOutputs(jobDescriptor, wdlFunctions, postMapper), Duration.Inf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ trait GlobFunctions extends IoFunctionSet with AsyncIoFunctions {

def callContext: CallContext

def findGlobOutputs(call: TaskCallNode, jobDescriptor: BackendJobDescriptor): ErrorOr[List[WomGlobFile]] =
call.callable.outputs.flatTraverse[ErrorOr, WomGlobFile] { outputDefinition =>
def findGlobOutputs(call: TaskCallNode, jobDescriptor: BackendJobDescriptor): ErrorOr[List[WomGlobFile]] = {
def fromOutputs = call.callable.outputs.flatTraverse[ErrorOr, WomGlobFile] { outputDefinition =>
outputDefinition.expression.evaluateFiles(jobDescriptor.localInputs, this, outputDefinition.womType) map {
_.toList collect { case glob: WomGlobFile => glob }
}
}
fromOutputs.map(_ ++ call.callable.additionalGlob)
}

/**
* Returns a list of path from the glob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,24 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
val globDir = GlobFunctions.globName(globFile.value)
val globDirectory = parentDirectory./(globDir)
val globList = parentDirectory./(s"$globDir.list")
val controlFileName = "cromwell_glob_control_file"
val controlFileContent =
"""This file is used by Cromwell to allow for globs that would not match any file.
|By its presence it works around the limitation of some backends that do not allow empty globs.
|Regardless of the outcome of the glob, this file will not be part of the final list of globbed files.
""".stripMargin

s"""|# make the directory which will keep the matching files
|mkdir $globDirectory
|
|# create the glob control file that will allow for the globbing to succeed even if there is 0 match
|echo "${controlFileContent.trim}" > $globDirectory/$controlFileName
|
|# symlink all the files into the glob directory
|( ln -L ${globFile.value} $globDirectory 2> /dev/null ) || ( ln ${globFile.value} $globDirectory )
|
|# list all the files that match the glob into a file called glob-[md5 of glob].list
|ls -1 $globDirectory > $globList
|# list all the files (except the control file) that match the glob into a file called glob-[md5 of glob].list
|ls -1 $globDirectory | grep -v $controlFileName > $globList
|""".stripMargin
}

Expand Down Expand Up @@ -514,7 +523,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
*
* @return A Try wrapping evaluated outputs.
*/
def evaluateOutputs: EvaluatedJobOutputs = {
def evaluateOutputs()(implicit ec: ExecutionContext): Future[EvaluatedJobOutputs] = {
OutputEvaluator.evaluateOutputs(jobDescriptor, backendEngineFunctions, outputValueMapper)
}

Expand Down Expand Up @@ -566,8 +575,8 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
*/
def handleExecutionSuccess(runStatus: StandardAsyncRunStatus,
handle: StandardAsyncPendingExecutionHandle,
returnCode: Int): ExecutionHandle = {
evaluateOutputs match {
returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
evaluateOutputs() map {
case ValidJobOutputs(outputs) =>
SuccessfulExecutionHandle(outputs, returnCode, jobPaths.detritusPaths, getTerminalEvents(runStatus))
case InvalidJobOutputs(errors) =>
Expand Down Expand Up @@ -768,7 +777,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
case Success(returnCodeAsInt) if !continueOnReturnCode.continueFor(returnCodeAsInt) =>
Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt)))
case Success(returnCodeAsInt) =>
Future.successful(handleExecutionSuccess(status, oldHandle, returnCodeAsInt))
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
case Failure(_) =>
Future.successful(FailedNonRetryableExecutionHandle(ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption)))
}
Expand Down
35 changes: 32 additions & 3 deletions backend/src/main/scala/cromwell/backend/wdl/OutputEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import cats.instances.try_._
import cats.syntax.apply._
import cats.syntax.either._
import cats.syntax.validated._
import cromwell.backend.BackendJobDescriptor
import common.validation.Checked._
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.BackendJobDescriptor
import cromwell.core.CallOutputs
import wom.expression.IoFunctionSet
import wom.graph.GraphNodePort.{ExpressionBasedOutputPort, OutputPort}
import wom.types.WomType
import wom.values.WomValue

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object OutputEvaluator {
Expand All @@ -28,7 +29,7 @@ object OutputEvaluator {

def evaluateOutputs(jobDescriptor: BackendJobDescriptor,
ioFunctions: IoFunctionSet,
postMapper: WomValue => Try[WomValue] = v => Success(v)): EvaluatedJobOutputs = {
postMapper: WomValue => Try[WomValue] = v => Success(v))(implicit ec: ExecutionContext): Future[EvaluatedJobOutputs] = {
val taskInputValues: Map[String, WomValue] = jobDescriptor.localInputs

def foldFunction(accumulatedOutputs: Try[ErrorOr[List[(OutputPort, WomValue)]]], output: ExpressionBasedOutputPort) = accumulatedOutputs flatMap { accumulated =>
Expand Down Expand Up @@ -75,10 +76,38 @@ object OutputEvaluator {
val emptyValue = Success(List.empty[(OutputPort, WomValue)].validNel): Try[ErrorOr[List[(OutputPort, WomValue)]]]

// Fold over the outputs to evaluate them in order, map the result to an EvaluatedJobOutputs
jobDescriptor.taskCall.expressionBasedOutputPorts.foldLeft(emptyValue)(foldFunction) match {
def fromOutputPorts: EvaluatedJobOutputs = jobDescriptor.taskCall.expressionBasedOutputPorts.foldLeft(emptyValue)(foldFunction) match {
case Success(Valid(outputs)) => ValidJobOutputs(CallOutputs(outputs.toMap))
case Success(Invalid(errors)) => InvalidJobOutputs(errors)
case Failure(exception) => JobOutputsEvaluationException(exception)
}

/*
* Because Cromwell doesn't trust anyone, if custom evaluation is provided,
* still make sure that all the output ports have been filled with values
*/
def validateCustomEvaluation(outputs: Map[OutputPort, WomValue]): EvaluatedJobOutputs = {
def toError(outputPort: OutputPort) = s"Missing output value for ${outputPort.identifier.fullyQualifiedName.value}"

jobDescriptor.taskCall.expressionBasedOutputPorts.diff(outputs.keySet.toList) match {
case Nil => ValidJobOutputs(CallOutputs(outputs))
case head :: tail => InvalidJobOutputs(NonEmptyList.of(toError(head), tail.map(toError): _*))
}
}

/*
* See if the task definition has "short-circuit" for the default output evaluation.
* In the case of CWL for example, this gives a chance to look for cwl.output.json and use it as the output of the tool,
* instead of the default behavior of going over each output port of the task and evaluates their expression.
* If the "customOutputEvaluation" returns None (which will happen if the cwl.output.json is not there, as well as for all WDL workflows),
* we fallback to the default behavior.
*/
jobDescriptor.taskCall.customOutputEvaluation(taskInputValues, ioFunctions, ec).value
.map({
case Some(Right(outputs)) => validateCustomEvaluation(outputs)
case Some(Left(errors)) => InvalidJobOutputs(errors)
// If it returns an empty value, fallback to canonical output evaluation
case None => fromOutputPorts
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import wom.graph.WomIdentifier
import wom.types.{WomIntegerType, WomType}
import wom.values.{WomFile, WomInteger, WomValue}

import scala.concurrent.Await
import scala.concurrent.duration._

class OutputEvaluatorSpec extends FlatSpec with Matchers with Mockito {
behavior of "OutputEvaluator"

Expand Down Expand Up @@ -97,8 +100,8 @@ class OutputEvaluatorSpec extends FlatSpec with Matchers with Mockito {
val call = WomMocks.mockTaskCall(WomIdentifier("call"), WomMocks.EmptyTaskDefinition.copy(outputs = mockOutputs))
val key = BackendJobDescriptorKey(call, None, 1)
val jobDescriptor = BackendJobDescriptor(null, key, null, mockInputs, null, null)

OutputEvaluator.evaluateOutputs(jobDescriptor, NoIoFunctionSet) match {
Await.result(OutputEvaluator.evaluateOutputs(jobDescriptor, NoIoFunctionSet)(scala.concurrent.ExecutionContext.global), 2.seconds) match {
case ValidJobOutputs(outputs) => outputs shouldBe CallOutputs(Map(
jobDescriptor.taskCall.outputPorts.find(_.name == "o1").get -> WomInteger(5),
jobDescriptor.taskCall.outputPorts.find(_.name == "o2").get -> WomInteger(5)
Expand All @@ -118,7 +121,7 @@ class OutputEvaluatorSpec extends FlatSpec with Matchers with Mockito {
val key = BackendJobDescriptorKey(call, None, 1)
val jobDescriptor = BackendJobDescriptor(null, key, null, mockInputs, null, null)

OutputEvaluator.evaluateOutputs(jobDescriptor, NoIoFunctionSet) match {
Await.result(OutputEvaluator.evaluateOutputs(jobDescriptor, NoIoFunctionSet)(scala.concurrent.ExecutionContext.global), 2.seconds) match {
case InvalidJobOutputs(errors) => errors shouldBe NonEmptyList.of(
"Bad output 'invalid1': Invalid expression 1", "Bad output 'invalid2': Invalid expression 2"
)
Expand All @@ -136,7 +139,7 @@ class OutputEvaluatorSpec extends FlatSpec with Matchers with Mockito {
val key = BackendJobDescriptorKey(call, None, 1)
val jobDescriptor = BackendJobDescriptor(null, key, null, mockInputs, null, null)

OutputEvaluator.evaluateOutputs(jobDescriptor, NoIoFunctionSet) match {
Await.result(OutputEvaluator.evaluateOutputs(jobDescriptor, NoIoFunctionSet)(scala.concurrent.ExecutionContext.global), 2.seconds) match {
case JobOutputsEvaluationException(e) => e shouldBe exception
case _ => fail("Output evaluation should have failed")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
cwlVersion: v1.0
$graph:
- id: outputJson
# slightly modified version of conformance test #2 to remove the default File that doesn't work on PAPI
# It is instead being passed as an input through the input json file
class: CommandLineTool
cwlVersion: v1.0
hints:
- class: DockerRequirement
dockerPull: python:2-slim
inputs:
- id: reference
type: File
inputBinding: { position: 2 }

- id: reads
type:
type: array
items: File
inputBinding: { prefix: "-YYY" }
inputBinding: { position: 3, prefix: "-XXX" }

- id: "args.py"
type: File
inputBinding:
position: -1

outputs:
# note the absence of any sort of valueFrom.
# The output value is generated from the "cwl.output.json" file created by the python script
args: string[]

baseCommand: python
arguments: ["bwa", "mem"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"args.py": {
"class": "File",
"location": "gs://centaur-cwl-conformance/cwl-inputs/args.py"
},
"reference": {
"class": "File",
"location": "gs://centaur-cwl-conformance/cwl-inputs/chr20.fa",
"size": 123,
"checksum": "sha1$hash"
},
"reads": [
{
"class": "File",
"location": "gs://centaur-cwl-conformance/cwl-inputs/example_human_Illumina.pe_1.fastq"
},
{
"class": "File",
"location": "gs://centaur-cwl-conformance/cwl-inputs/example_human_Illumina.pe_2.fastq"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: cwl_output_json_jes
testFormat: workflowsuccess
backends: [Jes]

files {
wdl: cwl_output_json/cwl_output_json.cwl
inputs: cwl_output_json/cwl_output_json.inputs
}

metadata {
status: Succeeded
"outputs.outputJson.args.0": "bwa"
"outputs.outputJson.args.1": "mem"
"outputs.outputJson.args.2": "chr20.fa"
"outputs.outputJson.args.3": "-XXX"
"outputs.outputJson.args.4": "-YYY"
"outputs.outputJson.args.5": "example_human_Illumina.pe_1.fastq"
"outputs.outputJson.args.6": "-YYY"
"outputs.outputJson.args.7": "example_human_Illumina.pe_2.fastq"
}

workflowType: CWL
workflowTypeVersion: v1.0
workflowRoot: outputJson
13 changes: 13 additions & 0 deletions centaur/src/main/resources/standardTestCases/wdl_empty_glob.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: wdl_empty_glob
testFormat: workflowsuccess

files {
wdl: wdl_empty_glob/wdl_empty_glob.wdl
}

metadata {
workflowName: wdl_empty_glob
status: Succeeded
"calls.wdl_empty_glob.empty_glob.executionStatus": Done
"outputs.wdl_empty_glob.wf_out": 0
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
task empty_glob {
command {
echo "hello"
}
runtime {
docker: "ubuntu:latest"
}
output {
Array[File] empty_glob = glob("*.txt")
Int nbGlobbed = length(empty_glob)
}
}

workflow wdl_empty_glob {
call empty_glob
output {
Int wf_out = empty_glob.nbGlobbed
}
}
12 changes: 2 additions & 10 deletions cwl/src/main/scala-2.11/cwl/CwlExecutableValidation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ package cwl

import cats.syntax.validated._
import cats.syntax.either._
import io.circe._
import io.circe.shapes._
import io.circe.generic.auto._
import eu.timepit.refined.string._
import io.circe.refined._
import io.circe.yaml
import io.circe.literal._
import io.circe.{Json, yaml}
import common.Checked
import common.validation.Checked._
import common.validation.ErrorOr.ErrorOr
Expand All @@ -21,15 +16,12 @@ import wom.executable.Executable.{InputParsingFunction, ParsedInputMap}
// (ExecutableValidation.scala has more info on why this was necessary)
object CwlExecutableValidation {

implicit val fileDecoder = implicitly[Decoder[File]]
implicit val directoryDecoder = implicitly[Decoder[Directory]]

// Decodes the input file, and build the ParsedInputMap
private val inputCoercionFunction: InputParsingFunction =
inputFile => {
yaml.parser.parse(inputFile).flatMap(_.as[Map[String, Json]]) match {
case Left(error) => error.getMessage.invalidNelCheck[ParsedInputMap]
case Right(inputValue) => inputValue.map({ case (key, value) => key -> value.foldWith(CwlInputCoercion) }).validNelCheck
case Right(inputValue) => inputValue.map({ case (key, value) => key -> value.foldWith(CwlJsonToDelayedCoercionFunction) }).validNelCheck
}
}

Expand Down
9 changes: 2 additions & 7 deletions cwl/src/main/scala-2.12/cwl/CwlExecutableValidation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package cwl
import common.Checked
import common.validation.Checked._
import common.validation.ErrorOr.ErrorOr
import io.circe.generic.auto._
import io.circe.literal._
import io.circe.shapes._
import io.circe.{Decoder, Json, yaml}
import io.circe.{Json, yaml}
import wom.callable.{CallableTaskDefinition, ExecutableCallable, ExecutableTaskDefinition}
import wom.executable.Executable
import wom.executable.Executable.{InputParsingFunction, ParsedInputMap}
Expand All @@ -16,15 +14,12 @@ import wom.executable.Executable.{InputParsingFunction, ParsedInputMap}
// (ExecutableValidation.scala has more info on why this was necessary)
object CwlExecutableValidation {

implicit val fileDecoder = implicitly[Decoder[File]]
implicit val directoryDecoder = implicitly[Decoder[Directory]]

// Decodes the input file, and build the ParsedInputMap
private val inputCoercionFunction: InputParsingFunction =
inputFile => {
yaml.parser.parse(inputFile).flatMap(_.as[Map[String, Json]]) match {
case Left(error) => error.getMessage.invalidNelCheck[ParsedInputMap]
case Right(inputValue) => inputValue.map({ case (key, value) => key -> value.foldWith(CwlInputCoercion) }).validNelCheck
case Right(inputValue) => inputValue.map({ case (key, value) => key -> value.foldWith(CwlJsonToDelayedCoercionFunction) }).validNelCheck
}
}

Expand Down

0 comments on commit ed90b8d

Please sign in to comment.