Skip to content

Commit

Permalink
BCBIO improvements part 1 (#4358)
Browse files Browse the repository at this point in the history
* BCBIO improvements part 1
  • Loading branch information
Horneth committed Nov 6, 2018
1 parent f0c5b16 commit 36ba349
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 85 deletions.
9 changes: 6 additions & 3 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import cromwell.services.keyvalue.KeyValueServiceActor.KvResponse
import wom.callable.{ExecutableCallable, MetaValueElement}
import wom.graph.CommandCallNode
import wom.graph.GraphNodePort.OutputPort
import wom.runtime.WomOutputRuntimeExtractor
import wom.values.WomArray.WomArrayLike
import wom.values._

Expand Down Expand Up @@ -67,8 +68,9 @@ object BackendWorkflowDescriptor {
callable: ExecutableCallable,
knownValues: Map[OutputPort, WomValue],
workflowOptions: WorkflowOptions,
customLabels: Labels) = {
new BackendWorkflowDescriptor(id, callable, knownValues, workflowOptions, customLabels, List.empty)
customLabels: Labels,
outputRuntimeExtractor: Option[WomOutputRuntimeExtractor] = None) = {
new BackendWorkflowDescriptor(id, callable, knownValues, workflowOptions, customLabels, List.empty, outputRuntimeExtractor)
}
}

Expand All @@ -80,7 +82,8 @@ case class BackendWorkflowDescriptor(id: WorkflowId,
knownValues: Map[OutputPort, WomValue],
workflowOptions: WorkflowOptions,
customLabels: Labels,
breadCrumbs: List[BackendJobBreadCrumb]) {
breadCrumbs: List[BackendJobBreadCrumb],
outputRuntimeExtractor: Option[WomOutputRuntimeExtractor]) {

val rootWorkflow = breadCrumbs.headOption.map(_.callable).getOrElse(callable)
val possiblyNotRootWorkflowId = id.toPossiblyNotRoot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import scala.util.{Failure, Success}
* `withDefault` can be used to create a validation that defaults to a particular size.
*/
object InformationValidation {
def instance(attributeName: String = RuntimeAttributesKeys.MemoryKey, defaultUnit: MemoryUnit): RuntimeAttributesValidation[MemorySize] =
new InformationValidation(attributeName, defaultUnit)
def optional(attributeName: String = RuntimeAttributesKeys.MemoryKey, defaultUnit: MemoryUnit): OptionalRuntimeAttributesValidation[MemorySize] =
instance(attributeName, defaultUnit).optional
def configDefaultString(attributeName: String = RuntimeAttributesKeys.MemoryKey, config: Option[Config], defaultUnit: MemoryUnit): Option[String] =
instance(attributeName, defaultUnit).configDefaultValue(config)
def withDefaultMemory(attributeName: String = RuntimeAttributesKeys.MemoryKey, memorySize: String, defaultUnit: MemoryUnit): RuntimeAttributesValidation[MemorySize] = {
def instance(attributeName: String = RuntimeAttributesKeys.MemoryKey, defaultUnit: MemoryUnit, allowZero: Boolean = false): RuntimeAttributesValidation[MemorySize] =
new InformationValidation(attributeName, defaultUnit, allowZero)
def optional(attributeName: String = RuntimeAttributesKeys.MemoryKey, defaultUnit: MemoryUnit, allowZero: Boolean = false): OptionalRuntimeAttributesValidation[MemorySize] =
instance(attributeName, defaultUnit, allowZero).optional
def configDefaultString(attributeName: String = RuntimeAttributesKeys.MemoryKey, config: Option[Config], defaultUnit: MemoryUnit, allowZero: Boolean = false): Option[String] =
instance(attributeName, defaultUnit, allowZero).configDefaultValue(config)
def withDefaultMemory(attributeName: String = RuntimeAttributesKeys.MemoryKey, memorySize: String, defaultUnit: MemoryUnit, allowZero: Boolean = false): RuntimeAttributesValidation[MemorySize] = {
MemorySize.parse(memorySize) match {
case Success(memory) => instance(attributeName, defaultUnit).withDefault(WomInteger(memory.bytes.toInt))
case Failure(_) => instance(attributeName, defaultUnit).withDefault(BadDefaultAttribute(WomString(memorySize.toString)))
case Success(memory) => instance(attributeName, defaultUnit, allowZero).withDefault(WomInteger(memory.bytes.toInt))
case Failure(_) => instance(attributeName, defaultUnit, allowZero).withDefault(BadDefaultAttribute(WomString(memorySize.toString)))
}
}

Expand All @@ -44,12 +44,12 @@ object InformationValidation {
"Expecting %s runtime attribute to be an Integer or String with format '8 GB'." +
" Exception: %s"

private[validation] def validateString(attributeName: String, wdlString: WomString): ErrorOr[MemorySize] =
validateString(attributeName, wdlString.value)
private[validation] def validateString(attributeName: String, wdlString: WomString, allowZero: Boolean): ErrorOr[MemorySize] =
validateString(attributeName, wdlString.value, allowZero)

private[validation] def validateString(attributeName: String, value: String): ErrorOr[MemorySize] = {
private[validation] def validateString(attributeName: String, value: String, allowZero: Boolean): ErrorOr[MemorySize] = {
MemorySize.parse(value) match {
case scala.util.Success(memorySize: MemorySize) if memorySize.amount > 0 =>
case scala.util.Success(memorySize: MemorySize) if memorySize.amount > 0 || (memorySize.amount == 0 && allowZero) =>
memorySize.to(MemoryUnit.GB).validNel
case scala.util.Success(memorySize: MemorySize) =>
wrongAmountFormat.format(attributeName, memorySize.amount).invalidNel
Expand All @@ -58,25 +58,25 @@ object InformationValidation {
}
}

private[validation] def validateInteger(attributeName: String, wdlInteger: WomInteger, defaultUnit: MemoryUnit): ErrorOr[MemorySize] =
validateInteger(attributeName, wdlInteger.value, defaultUnit)
private[validation] def validateInteger(attributeName: String, wdlInteger: WomInteger, defaultUnit: MemoryUnit, allowZero: Boolean): ErrorOr[MemorySize] =
validateInteger(attributeName, wdlInteger.value, defaultUnit, allowZero)

private[validation] def validateInteger(attributeName: String, value: Int, defaultUnit: MemoryUnit): ErrorOr[MemorySize] = {
if (value <= 0)
private[validation] def validateInteger(attributeName: String, value: Int, defaultUnit: MemoryUnit, allowZero: Boolean): ErrorOr[MemorySize] = {
if (value < 0 || (value == 0 && !allowZero))
wrongAmountFormat.format(attributeName, value).invalidNel
else
MemorySize(value.toDouble, defaultUnit).to(MemoryUnit.GB).validNel
}

def validateLong(attributeName: String, value: Long, defaultUnit: MemoryUnit): ErrorOr[MemorySize] = {
if (value <= 0)
def validateLong(attributeName: String, value: Long, defaultUnit: MemoryUnit, allowZero: Boolean): ErrorOr[MemorySize] = {
if (value < 0 || (value == 0 && !allowZero))
wrongAmountFormat.format(attributeName, value).invalidNel
else
MemorySize(value.toDouble, defaultUnit).to(MemoryUnit.GB).validNel
}
}

class InformationValidation(attributeName: String, defaultUnit: MemoryUnit) extends RuntimeAttributesValidation[MemorySize] {
class InformationValidation(attributeName: String, defaultUnit: MemoryUnit, allowZero: Boolean = false) extends RuntimeAttributesValidation[MemorySize] {

import InformationValidation._

Expand All @@ -85,9 +85,9 @@ class InformationValidation(attributeName: String, defaultUnit: MemoryUnit) exte
override def coercion = Seq(WomIntegerType, WomLongType, WomStringType)

override protected def validateValue: PartialFunction[WomValue, ErrorOr[MemorySize]] = {
case WomLong(value) => InformationValidation.validateLong(key, value, defaultUnit)
case WomInteger(value) => InformationValidation.validateInteger(key, value, defaultUnit)
case WomString(value) => InformationValidation.validateString(key, value)
case WomLong(value) => InformationValidation.validateLong(key, value, defaultUnit, allowZero)
case WomInteger(value) => InformationValidation.validateInteger(key, value, defaultUnit, allowZero)
case WomString(value) => InformationValidation.validateString(key, value, allowZero)
}

override def missingValueMessage: String = wrongTypeFormat.format(key, "Not supported WDL type value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object RuntimeAttributesValidation {
}

def parseMemoryString(k: String, s: WomString): ErrorOr[MemorySize] = {
InformationValidation.validateString(k, s)
InformationValidation.validateString(k, s, allowZero = false)
}

def withDefault[ValidatedType](validation: RuntimeAttributesValidation[ValidatedType],
Expand Down
35 changes: 35 additions & 0 deletions common/src/main/scala/common/util/StringUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,47 @@ package common.util

import mouse.all._
import org.apache.commons.lang3.StringUtils
import org.apache.commons.text.StringEscapeUtils

object StringUtil {
implicit class EnhancedString(val string: String) extends AnyVal {

/**
* Ensure string ends with a /
*/
def ensureSlashed: String = StringUtils.appendIfMissing(string, "/")

/**
* Ensure string does not end with a /
*/
def ensureUnslashed: String = string.ensureSlashed |> StringUtils.chop

/**
* Ensure string does not start with a /
*/
def ensureNoLeadingSlash: String = string.stripPrefix("/")

/**
* Escape for shell use
*/
def ensureShellEscaped: String = StringEscapeUtils.escapeXSI(string)

/**
* Escape / with \/
*/
def ensureSlashEscaped: String = string.replaceAll("/", "\\\\/")

/**
* Escape the string for use in shell and also escape slashes so it can be used in a sed expression while keeping
* / as a sed separator
*/
def ensureSedEscaped: String = string.ensureShellEscaped.ensureSlashEscaped

/**
* Makes the string look like a relative directory.
* i.e no slash prefix and a slash suffix
* e.g: /root/some/dir -> root/some/dir/
*/
def relativeDirectory = string.ensureNoLeadingSlash.ensureSlashed
}
}
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,17 @@ languages {
config {
strict-validation: false
enabled: true

# Optional section to define a command returning paths to output files that
# can only be known after the user's command has been run
output-runtime-extractor {
# Optional docker image on which the command should be run - Must have bash if provided
docker-image = "stedolan/jq@sha256:a61ed0bca213081b64be94c5e1b402ea58bc549f457c2682a86704dd55231e09"
# Single line command executed after the tool has run.
# It should write to stdout the file paths that are referenced in the cwl.output.json (or any other file path
# not already defined in the outputs of the tool and therefore unknown when the tool is run)
command = "cat cwl.output.json 2>/dev/null | jq -r '.. | .path? // .location? // empty'"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.pattern.pipe
import cats.Monad
import cats.data.EitherT._
import cats.data.Validated.{Invalid, Valid}
import cats.data.{EitherT, NonEmptyList}
import cats.data.NonEmptyList
import cats.effect.IO
import cats.syntax.apply._
import cats.syntax.either._
Expand Down Expand Up @@ -43,6 +43,7 @@ import wom.core.{WorkflowSource, WorkflowUrl}
import wom.expression.{NoIoFunctionSet, WomExpression}
import wom.graph.CommandCallNode
import wom.graph.GraphNodePort.OutputPort
import wom.runtime.WomOutputRuntimeExtractor
import wom.values.{WomString, WomValue}

import scala.concurrent.Future
Expand Down Expand Up @@ -246,7 +247,7 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
}
}

def buildValidatedNamespace(workflowSource: WorkflowSource, importResolvers: List[ImportResolver]): EitherT[IO, NonEmptyList[String], ValidatedWomNamespace] = {
def findFactory(workflowSource: WorkflowSource): ErrorOr[LanguageFactory] = {
def chooseFactory(factories: List[LanguageFactory]): Option[LanguageFactory] = factories.find(_.looksParsable(workflowSource))

val factory: ErrorOr[LanguageFactory] = sourceFiles.workflowType match {
Expand All @@ -267,16 +268,20 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
workflowLogger.info(s"Parsing workflow as ${validFactory.languageName} ${validFactory.languageVersionName}")
pushLanguageToMetadata(validFactory.languageName, validFactory.languageVersionName)
}

factory
}

errorOrParse(factory).flatMap(_.validateNamespace(
def buildValidatedNamespace(factory: LanguageFactory, workflowSource: WorkflowSource, importResolvers: List[ImportResolver]): Parse[ValidatedWomNamespace] = {
factory.validateNamespace(
sourceFiles,
workflowSource,
workflowOptions,
importLocalFilesystem,
workflowId,
engineIoFunctions,
importResolvers
))
)
}

val localFilesystemResolvers =
Expand All @@ -296,9 +301,11 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
importResolvers = zippedImportResolver.toList ++ localFilesystemResolvers :+ HttpResolver(None, Map.empty)
sourceAndResolvers <- fromEither[IO](findWorkflowSource(sourceFiles.workflowSource, sourceFiles.workflowUrl, importResolvers))
_ = if(sourceFiles.workflowUrl.isDefined) publishWorkflowSourceToMetadata(id, sourceAndResolvers._1)
validatedNamespace <- buildValidatedNamespace(sourceAndResolvers._1, sourceAndResolvers._2)
factory <- errorOrParse(findFactory(sourceAndResolvers._1))
outputRuntimeExtractor <- errorOrParse(factory.womOutputRuntimeExtractor.toValidated)
validatedNamespace <- buildValidatedNamespace(factory, sourceAndResolvers._1, sourceAndResolvers._2)
_ = pushNamespaceMetadata(validatedNamespace)
ewd <- fromEither[IO](buildWorkflowDescriptor(id, validatedNamespace, workflowOptions, labels, conf, pathBuilders).toEither)
ewd <- fromEither[IO](buildWorkflowDescriptor(id, validatedNamespace, workflowOptions, labels, conf, pathBuilders, outputRuntimeExtractor).toEither)
} yield ewd
}

Expand Down Expand Up @@ -384,7 +391,8 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
workflowOptions: WorkflowOptions,
labels: Labels,
conf: Config,
pathBuilders: List[PathBuilder]): ErrorOr[EngineWorkflowDescriptor] = {
pathBuilders: List[PathBuilder],
outputRuntimeExtractor: Option[WomOutputRuntimeExtractor]): ErrorOr[EngineWorkflowDescriptor] = {
val taskCalls = womNamespace.executable.graph.allNodes collect { case taskNode: CommandCallNode => taskNode }
val defaultBackendName = conf.as[Option[String]]("backend.default")

Expand All @@ -396,7 +404,7 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
(failureModeValidation, backendAssignmentsValidation, callCachingModeValidation) mapN {
case (failureMode, backendAssignments, callCachingMode) =>
val callable = womNamespace.executable.entryPoint
val backendDescriptor = BackendWorkflowDescriptor(id, callable, womNamespace.womValueInputs, workflowOptions, labels)
val backendDescriptor = BackendWorkflowDescriptor(id, callable, womNamespace.womValueInputs, workflowOptions, labels, outputRuntimeExtractor)
EngineWorkflowDescriptor(callable, backendDescriptor, backendAssignments, failureMode, pathBuilders, callCachingMode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cromwell.languages.util.ImportResolver.ImportResolver
import wom.core._
import wom.executable.WomBundle
import wom.expression.IoFunctionSet
import wom.runtime.WomOutputRuntimeExtractor

trait LanguageFactory {

Expand All @@ -27,6 +28,11 @@ trait LanguageFactory {

lazy val strictValidation: Boolean = !config.as[Option[Boolean]]("strict-validation").contains(false)

lazy val womOutputRuntimeExtractor: Checked[Option[WomOutputRuntimeExtractor]] = config.getAs[Config]("output-runtime-extractor") match {
case Some(c) => WomOutputRuntimeExtractor.fromConfig(c).map(Option.apply).toEither
case _ => None.validNelCheck
}

def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
importResolvers: List[ImportResolver],
Expand Down
5 changes: 3 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object Dependencies {
private val commonsIoV = "2.6"
private val commonsLang3V = "3.8"
private val commonsLoggingV = "1.2"
private val commonsTextV = "1.4"
private val commonsTextV = "1.6"
private val configsV = "0.4.4"
private val delightRhinoSandboxV = "0.0.9"
private val errorProneAnnotationsV = "2.0.19"
Expand Down Expand Up @@ -142,6 +142,7 @@ object Dependencies {
"io.spray" %% "spray-json" % sprayJsonV,
"joda-time" % "joda-time" % jodaTimeV,
"org.apache.commons" % "commons-lang3" % commonsLang3V,
"org.apache.commons" % "commons-text" % commonsTextV,
"org.apache.httpcomponents" % "httpclient" % apacheHttpClientV,
"org.apache.httpcomponents" % "httpcore" % apacheHttpCoreV,
"org.mongodb" % "mongo-java-driver" % mongoJavaDriverV,
Expand Down Expand Up @@ -341,6 +342,7 @@ object Dependencies {
"org.slf4j" % "slf4j-api" % slf4jV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"org.apache.commons" % "commons-lang3" % commonsLang3V,
"org.apache.commons" % "commons-text" % commonsTextV,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
"ch.qos.logback" % "logback-classic" % logbackV,
"ch.qos.logback" % "logback-access" % logbackV
Expand Down Expand Up @@ -378,7 +380,6 @@ object Dependencies {
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
"io.spray" %% "spray-json" % sprayJsonV,
"org.scalacheck" %% "scalacheck" % scalacheckV % Test,
"org.apache.commons" % "commons-text" % commonsTextV,
"com.github.mpilquist" %% "simulacrum" % simulacrumV,
"commons-codec" % "commons-codec" % commonsCodecV,
"eu.timepit" %% "refined" % refinedV
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
backendLabels,
preemptible,
pipelinesConfiguration.jobShell,
dockerKeyAndToken
dockerKeyAndToken,
jobDescriptor.workflowDescriptor.outputRuntimeExtractor
)
case Some(other) =>
throw new RuntimeException(s"Unexpected initialization data: $other")
Expand Down
Loading

0 comments on commit 36ba349

Please sign in to comment.