Skip to content

Commit

Permalink
EnvVarRequirement
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed Jan 18, 2018
1 parent 9c2528a commit 1f831bc
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import cromwell.services.metadata.CallMetadataKeys
import net.ceedubs.ficus.Ficus._
import wom.expression.WomExpression
import wom.values._
import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper}

Expand Down Expand Up @@ -205,6 +206,8 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
val globFiles: ErrorOr[List[WomGlobFile]] =
backendEngineFunctions.findGlobOutputs(call, jobDescriptor)

lazy val environmentVariables = instantiatedCommand.environmentVariables map { case (k, v) => s"""$k="$v"""" } mkString("", "\n", "\n")

// The `tee` trickery below is to be able to redirect to known filenames for CWL while also streaming
// stdout and stderr for PAPI to periodically upload to cloud storage.
// https://stackoverflow.com/questions/692000/how-do-i-write-stderr-to-a-file-while-using-tee-with-a-pipe
Expand All @@ -225,6 +228,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
|)
|(
|cd $cwd
|ENVIRONMENT_VARIABLES
|INSTANTIATED_COMMAND
|) > >(tee $stdoutPath) 2> >(tee $stderrPath >&2)
|echo $$? > $rcTmpPath
Expand All @@ -236,6 +240,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
|mv $rcTmpPath $rcPath
|""".stripMargin
.replace("SCRIPT_PREAMBLE", scriptPreamble)
.replace("ENVIRONMENT_VARIABLES", environmentVariables)
.replace("INSTANTIATED_COMMAND", instantiatedCommand.commandString)
.replace("SCRIPT_EPILOGUE", scriptEpilogue))
}
Expand Down Expand Up @@ -271,9 +276,16 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
runtimeEnvironment
)

def evaluateEnvironmentExpression(nameAndExpression: (String, WomExpression)): ErrorOr[(String, String)] = {
val (name, expression) = nameAndExpression
expression.evaluateValue(adHocFileCreationInputs, backendEngineFunctions) map { name -> _.valueString }
}

val environmentVariables = jobDescriptor.taskCall.callable.environmentExpressions.toList traverse evaluateEnvironmentExpression

// TODO CWL: toTry.get here. Is throwing an exception the best way to indicate command generation failure?
((adHocFileCreationSideEffectFiles, instantiatedCommandValidation) mapN { (adHocFiles, command) =>
command.copy(createdFiles = command.createdFiles ++ adHocFiles)
((adHocFileCreationSideEffectFiles, instantiatedCommandValidation, environmentVariables) mapN { (adHocFiles, command, env) =>
command.copy(createdFiles = command.createdFiles ++ adHocFiles, environmentVariables = env.toMap)
}).toTry match {
case Success(ic) => ic
case Failure(e) => throw new Exception("Failed to evaluate ad hoc files", e)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/cromwell/util/WomMocks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import wom.values.WomValue

object WomMocks {
val EmptyTaskDefinition = CallableTaskDefinition("emptyTask", Function.const(List.empty.validNel), RuntimeAttributes(Map.empty),
Map.empty, Map.empty, List.empty, List.empty, Set.empty)
Map.empty, Map.empty, List.empty, List.empty, Set.empty, Map.empty)

val EmptyWorkflowDefinition = mockWorkflowDefinition("emptyWorkflow")

Expand All @@ -30,7 +30,7 @@ object WomMocks {

def mockTaskDefinition(name: String) = {
CallableTaskDefinition(name, Function.const(List.empty.validNel), RuntimeAttributes(Map.empty),
Map.empty, Map.empty, List.empty, List.empty, Set.empty)
Map.empty, Map.empty, List.empty, List.empty, Set.empty, Map.empty)
}

def mockOutputPort(name: String, womType: WomType = WomStringType): OutputPort = {
Expand Down
202 changes: 138 additions & 64 deletions cwl/src/main/scala/cwl/CommandLineTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ case class CommandLineTool private(
inputValues
.collectFirst({ case (inputDefinition, womValue) if inputDefinition.name == parsedName => womValue.validNel })
.orElse(defaultValue) match {
case Some(Valid(value)) =>
// See http://www.commonwl.org/v1.0/CommandLineTool.html#Input_binding
lazy val initialKey = CommandBindingSortingKey.empty
.append(inputParameter.inputBinding, Coproduct[StringOrInt](parsedName))

inputParameter.`type`.toList.flatMap(_.fold(MyriadInputTypeToSortedCommandParts).apply(inputParameter.inputBinding, value, initialKey.asNewKey, expressionLib)).validNel
case Some(Invalid(errors)) => Invalid(errors)
case None => s"Could not find an input value for input $parsedName in ${inputValues.prettyString}".invalidNel
case Some(Valid(value)) =>
// See http://www.commonwl.org/v1.0/CommandLineTool.html#Input_binding
lazy val initialKey = CommandBindingSortingKey.empty
.append(inputParameter.inputBinding, Coproduct[StringOrInt](parsedName))

inputParameter.`type`.toList.flatMap(_.fold(MyriadInputTypeToSortedCommandParts).apply(inputParameter.inputBinding, value, initialKey.asNewKey, expressionLib)).validNel
case Some(Invalid(errors)) => Invalid(errors)
case None => s"Could not find an input value for input $parsedName in ${inputValues.prettyString}".invalidNel
}
})

Expand All @@ -150,38 +150,102 @@ case class CommandLineTool private(
}
}


private def environmentDefs(requirementsAndHints: List[Requirement], expressionLib: ExpressionLib): ErrorOr[Map[String, WomExpression]] = {
// For environment variables we need to make sure that we aren't being asked to evaluate expressions from a containing
// workflow step or its containing workflow or anything containing the workflow. The current structure of this code
// is not prepared to evaluate those expressions. Actually this is true for attributes too and we're totally not
// checking for this condition there. Blurgh.
// TODO CWL: for runtime attributes, detect unevaluatable expressions in the containment hierarchy.

// This traverses all `EnvironmentDef`s within all `EnvVarRequirement`s. The spec doesn't appear to say how to handle
// duplicate `envName` keys in a single array of `EnvironmentDef`s; this code gives precedence to the last occurrence.
val allEnvVarDefs = for {
req <- requirementsAndHints
envVarReq <- req.select[EnvVarRequirement].toList
// Reverse the defs within an env var requirement so that when we fold from the right below the later defs
// will take precedence over the earlier defs.
envDef <- envVarReq.envDef.toList.reverse
} yield envDef

// Compact the `EnvironmentDef`s. Don't convert to `WomExpression`s yet, the `StringOrExpression`s need to be
// compared to the `EnvVarRequirement`s that were defined on this tool.
val effectiveEnvironmentDefs = allEnvVarDefs.foldRight(Map.empty[String, StringOrExpression]) {
case (envVarReq, envVarMap) => envVarMap + (envVarReq.envName -> envVarReq.envValue)
}

// These are the effective environment defs irrespective of where they were found in the
// Run / WorkflowStep / Workflow containment hierarchy.
val effectiveExpressionEnvironmentDefs = effectiveEnvironmentDefs filter { case (_, expr) => expr.select[Expression].isDefined }

// These are only the environment defs defined on this tool.
val cltRequirements = requirements.toList.flatten ++ hints.toList.flatten.flatMap(_.select[Requirement])
val cltEnvironmentDefExpressions = (for {
cltEnvVarRequirement <- cltRequirements flatMap { _.select[EnvVarRequirement]}
cltEnvironmentDef <- cltEnvVarRequirement.envDef.toList
expr <- cltEnvironmentDef.envValue.select[Expression].toList
} yield expr).toSet

// If there is an expression in an effective environment def that wasn't defined on this tool then error out since
// there isn't currently a way of evaluating it.
val unevaluatableEnvironmentDefs = for {
(name, stringOrExpression) <- effectiveExpressionEnvironmentDefs.toList
expression <- stringOrExpression.select[Expression].toList
if !cltEnvironmentDefExpressions.contains(expression)
} yield name

unevaluatableEnvironmentDefs match {
case Nil =>
// No unevaluatable environment defs => keep on truckin'
effectiveEnvironmentDefs.foldRight(Map.empty[String, WomExpression]) { case ((envName, envValue), acc) =>
acc + (envName -> envValue.fold(StringOrExpressionToWomExpression).apply(inputNames, expressionLib))
}.validNel
case xs =>
s"Could not evaluate environment variable expressions defined in the call hierarchy of tool $id: ${xs.mkString(", ")}.".invalidNel
}
}

def buildTaskDefinition(validator: RequirementsValidator, parentExpressionLib: ExpressionLib): ErrorOr[CallableTaskDefinition] = {
validateRequirementsAndHints(validator) map { requirementsAndHints: Seq[cwl.Requirement] =>
val id = this.id

val expressionLib: ExpressionLib =
parentExpressionLib ++ inlineJavascriptRequirements(requirementsAndHints)

// This is basically doing a `foldMap` but can't actually be a `foldMap` because:
// - There is no monoid instance for `WomExpression`s.
// - We want to fold from the right so the hints and requirements with the lowest precedence are processed first
// and later overridden if there are duplicate hints or requirements of the same type with higher precedence.
val finalAttributesMap: Map[String, WomExpression] = (requirementsAndHints ++ DefaultDockerRequirement).foldRight(Map.empty[String, WomExpression])({
case (requirement, attributesMap) => attributesMap ++ processRequirement(requirement, expressionLib)
})
for {
requirementsAndHints <- validateRequirementsAndHints(validator)
environment <- environmentDefs(requirementsAndHints, parentExpressionLib)
} yield buildCallableTaskDefinition(requirementsAndHints, parentExpressionLib, environment)
}

def buildCallableTaskDefinition(requirementsAndHints: List[cwl.Requirement],
parentExpressionLib: ExpressionLib,
environmentExpressions: Map[String, WomExpression]): CallableTaskDefinition = {

val runtimeAttributes: RuntimeAttributes = RuntimeAttributes(finalAttributesMap)
val id = this.id

val meta: Map[String, String] = Map.empty
val parameterMeta: Map[String, String] = Map.empty
val expressionLib: ExpressionLib =
parentExpressionLib ++ inlineJavascriptRequirements(requirementsAndHints)

/*
quoted from: http://www.commonwl.org/v1.0/CommandLineTool.html#CommandOutputBinding :
// This is basically doing a `foldMap` but can't actually be a `foldMap` because:
// - There is no monoid instance for `WomExpression`s.
// - We want to fold from the right so the hints and requirements with the lowest precedence are processed first
// and later overridden if there are duplicate hints or requirements of the same type with higher precedence.
val finalAttributesMap: Map[String, WomExpression] = (requirementsAndHints ++ DefaultDockerRequirement).foldRight(Map.empty[String, WomExpression])({
case (requirement, attributesMap) => attributesMap ++ processRequirement(requirement, expressionLib)
})

For inputs and outputs, we only keep the variable name in the definition
*/
val runtimeAttributes: RuntimeAttributes = RuntimeAttributes(finalAttributesMap)

val outputs: List[Callable.OutputDefinition] = this.outputs.map {
case p @ CommandOutputParameter(cop_id, _, _, _, _, _, _, Some(tpe)) =>
val womType = tpe.fold(MyriadOutputTypeToWomType)
OutputDefinition(FullyQualifiedName(cop_id).id, womType, CommandOutputParameterExpression(p, womType, inputNames, expressionLib))
case other => throw new NotImplementedError(s"Command output parameters such as $other are not yet supported")
}.toList
val meta: Map[String, String] = Map.empty
val parameterMeta: Map[String, String] = Map.empty

/*
quoted from: http://www.commonwl.org/v1.0/CommandLineTool.html#CommandOutputBinding :
For inputs and outputs, we only keep the variable name in the definition
*/

val outputs: List[Callable.OutputDefinition] = this.outputs.map {
case p @ CommandOutputParameter(cop_id, _, _, _, _, _, _, Some(tpe)) =>
val womType = tpe.fold(MyriadOutputTypeToWomType)
OutputDefinition(FullyQualifiedName(cop_id).id, womType, CommandOutputParameterExpression(p, womType, inputNames, expressionLib))
case other => throw new NotImplementedError(s"Command output parameters such as $other are not yet supported")
}.toList

val inputDefinitions: List[_ <: Callable.InputDefinition] =
this.inputs.map {
Expand All @@ -197,37 +261,37 @@ case class CommandLineTool private(
case other => throw new NotImplementedError(s"command input parameters such as $other are not yet supported")
}.toList

def stringOrExpressionToString(soe: Option[StringOrExpression]): Option[String] = soe flatMap {
case StringOrExpression.String(str) => Some(str)
case StringOrExpression.Expression(_) => None // ... for now!
}

// The try will succeed if this is a task within a step. If it's a standalone file, the ID will be the file,
// so the filename is the fallback.
def taskName = Try(FullyQualifiedName(id).id).getOrElse(Paths.get(id).getFileName.toString)

val adHocFileCreations: Set[WomExpression] = (for {
requirements <- requirements.getOrElse(Array.empty[Requirement])
initialWorkDirRequirement <- requirements.select[InitialWorkDirRequirement].toArray
listing <- initialWorkDirRequirement.listings
} yield InitialWorkDirFileGeneratorExpression(listing, expressionLib)).toSet[WomExpression]

CallableTaskDefinition(
taskName,
buildCommandTemplate(expressionLib),
runtimeAttributes,
meta,
parameterMeta,
outputs,
inputDefinitions,
// TODO: This doesn't work in all cases and it feels clunky anyway - find a way to sort that out
prefixSeparator = "#",
commandPartSeparator = " ",
stdoutRedirection = stringOrExpressionToString(stdout),
stderrRedirection = stringOrExpressionToString(stderr),
adHocFileCreation = adHocFileCreations
)
def stringOrExpressionToString(soe: Option[StringOrExpression]): Option[String] = soe flatMap {
case StringOrExpression.String(str) => Some(str)
case StringOrExpression.Expression(_) => None // ... for now!
}

// The try will succeed if this is a task within a step. If it's a standalone file, the ID will be the file,
// so the filename is the fallback.
def taskName = Try(FullyQualifiedName(id).id).getOrElse(Paths.get(id).getFileName.toString)

val adHocFileCreations: Set[WomExpression] = (for {
requirements <- requirements.getOrElse(Array.empty[Requirement])
initialWorkDirRequirement <- requirements.select[InitialWorkDirRequirement].toArray
listing <- initialWorkDirRequirement.listings
} yield InitialWorkDirFileGeneratorExpression(listing, expressionLib)).toSet[WomExpression]

CallableTaskDefinition(
taskName,
buildCommandTemplate(expressionLib),
runtimeAttributes,
meta,
parameterMeta,
outputs,
inputDefinitions,
// TODO: This doesn't work in all cases and it feels clunky anyway - find a way to sort that out
prefixSeparator = "#",
commandPartSeparator = " ",
stdoutRedirection = stringOrExpressionToString(stdout),
stderrRedirection = stringOrExpressionToString(stderr),
adHocFileCreation = adHocFileCreations,
environmentExpressions = environmentExpressions
)
}

def asCwl = Coproduct[Cwl](this)
Expand Down Expand Up @@ -553,3 +617,13 @@ object CommandLineTool {
)) } toList

}

object StringOrExpressionToWomExpression extends Poly1 {
implicit def string: Case.Aux[String, (Set[String], ExpressionLib) => WomExpression] = at[String] { s => (inputNames, expressionLib) =>
ValueAsAnExpression(WomString(s))
}

implicit def expression: Case.Aux[Expression, (Set[String], ExpressionLib) => WomExpression] = at[Expression] { e => (inputNames, expressionLib) =>
cwl.JobPreparationExpression(e, inputNames, expressionLib)
}
}
2 changes: 1 addition & 1 deletion cwl/src/main/scala/cwl/CwlExpressionCommandPart.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class CommandLineBindingCommandPart(commandLineBinding: CommandLineBind
case _ => List.empty
}

// Can't flatMap Either in 2.11..
// Can't flatMap Either in 2.11.
evaluatedWomValue match {
case Right(womValue) => applyShellQuote(womValue).map(processValue).map(_.map(InstantiatedCommand(_))).toValidated
case Left(e) => Invalid(e)
Expand Down
10 changes: 5 additions & 5 deletions src/bin/travis/resources/conformance_expected_failures.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
27
#28
29
30
#30
#31
#32
#33
Expand All @@ -39,9 +39,9 @@
#39
40
41
42
43
44
#42
#43
#44
45
46
47
Expand Down Expand Up @@ -94,7 +94,7 @@
94
95
96
97
#97
#98
99
100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ sealed trait ConfigAsyncJobExecutionActor extends SharedFileSystemAsyncJobExecut
val allInputs = providedWomInputs ++ optionalsForciblyInitializedToNone
val womInstantiation = taskDefinition.instantiateCommand(allInputs, NoIoFunctionSet, identity, runtimeEnvironment)

val InstantiatedCommand(command, _) = womInstantiation.toTry.get
val InstantiatedCommand(command, _, _) = womInstantiation.toTry.get
jobLogger.info(s"executing: $command")
val scriptBody =
s"""|#!/bin/bash
Expand Down
3 changes: 2 additions & 1 deletion wdl/src/main/scala/wdl/WdlTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ case class WdlTask(name: String,
parameterMeta = parameterMeta,
outputs = outputs.map(_.womOutputDefinition).toList,
inputs = buildWomInputs,
adHocFileCreation = Set.empty
adHocFileCreation = Set.empty,
environmentExpressions = Map.empty
)

private def buildWomInputs: List[Callable.InputDefinition] = declarations collect {
Expand Down
2 changes: 1 addition & 1 deletion wdl/src/main/scala/wdl/command/ParameterCommandPart.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class ParameterCommandPart(attributes: Map[String, String], expression: Wdl
// Files generated by writeFiles have "engine-relative" paths which will be different from the container paths
// calculated by `valueMapper`. "engine-relative" may mean either the non-Docker container path on the host
// running Cromwell, or a cloud path. Capture these newly created files and their engine paths here.
InstantiatedCommand(f.valueString, List(CommandSetupSideEffectFile(unmappedFile))).validNel
InstantiatedCommand(commandString = f.valueString, createdFiles = List(CommandSetupSideEffectFile(unmappedFile))).validNel
case (f: WomFile, _) =>
InstantiatedCommand(f.valueString).validNel
case (p: WomPrimitive, _) => InstantiatedCommand(p.valueString).validNel
Expand Down
3 changes: 3 additions & 0 deletions wom/src/main/scala/wom/callable/TaskDefinition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sealed trait TaskDefinition extends Callable {
def stdoutRedirection: Option[String]
def stderrRedirection: Option[String]
def adHocFileCreation: Set[WomExpression]
def environmentExpressions: Map[String, WomExpression]

lazy val unqualifiedName: LocallyQualifiedName = name

Expand Down Expand Up @@ -82,6 +83,7 @@ final case class CallableTaskDefinition(name: String,
outputs: List[Callable.OutputDefinition],
inputs: List[_ <: Callable.InputDefinition],
adHocFileCreation: Set[WomExpression],
environmentExpressions: Map[String, WomExpression],
prefixSeparator: String = ".",
commandPartSeparator: String = "",
stdoutRedirection: Option[String] = None,
Expand All @@ -108,6 +110,7 @@ final case class ExecutableTaskDefinition private (callableTaskDefinition: Calla
override def stdoutRedirection = callableTaskDefinition.stdoutRedirection
override def stderrRedirection = callableTaskDefinition.stderrRedirection
override def adHocFileCreation = callableTaskDefinition.adHocFileCreation
override def environmentExpressions = callableTaskDefinition.environmentExpressions
}

object ExecutableTaskDefinition {
Expand Down

0 comments on commit 1f831bc

Please sign in to comment.