Skip to content

Commit

Permalink
Yaml inputs (#2832)
Browse files Browse the repository at this point in the history
* YAML inputs
  • Loading branch information
Horneth committed Nov 10, 2017
1 parent 17e4c5d commit 1f63bd5
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 77 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Other changes

* **API**
+ Cromwell now supports input files in the yaml format (json format is still supported).

* **Database**
You have the option of storing the metadata in a separate SQL database than the database containing the internal engine
data. See the [README](https://github.com/broadinstitute/cromwell#database) or the `database` section in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging =>
case _ => false
}

// This seems dangerous as any exception that is not one of the above (fatal) will trigger infinite retries !
def isTransient(throwable: Throwable): Boolean = !isFatal(throwable)
def isTransient(throwable: Throwable): Boolean = false

private def withRetry[A](work: () => Future[A], backOff: SimpleExponentialBackoff): Future[A] = {
Retry.withRetry(work, isTransient = isTransient, isFatal = isFatal, backoff = backOff)(context.system)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"wf_hello.hello.addressee": m'Lord
16 changes: 16 additions & 0 deletions centaur/src/main/resources/standardTestCases/hello_yaml.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: hello_yaml
testFormat: workflowsuccess

files {
wdl: hello/hello.wdl
inputs: hello/hello_yaml.inputs
}

metadata {
workflowName: wf_hello
status: Succeeded
"calls.wf_hello.hello.executionStatus": Done
"calls.wf_hello.hello.runtimeAttributes.docker": "ubuntu@sha256:71cd81252a3563a03ad8daee81047b62ab5d892ebbfbf71cf53415f29c130950"
"outputs.wf_hello.hello.salutation": "Hello m'Lord!"
"inputs.wf_hello.hello.addressee": "m'Lord"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ submit {
statusCode: 400
message: """{
"status": "fail",
"message": "Error(s): Unexpected end-of-input at input index 63 (line 3, position 1), expected '}':\n\n^\n"
"message": "Error(s): Input file is not valid yaml nor json: while parsing a flow mapping\n in 'reader', line 1, column 1:\n {\n ^\nexpected ',' or '}', but got StreamEnd\n in 'reader', line 3, column 1:\n \n ^\n"
}"""
}
7 changes: 6 additions & 1 deletion common/src/main/scala/common/validation/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.data.{NonEmptyList, Validated}
import cats.syntax.validated._
import cats.syntax.either._
import common.Checked
import common.exception.AggregatedMessageException
import common.validation.ErrorOr.ErrorOr
import net.ceedubs.ficus.readers.{StringReader, ValueReader}
import org.slf4j.Logger
Expand Down Expand Up @@ -41,7 +42,11 @@ object Validation {
implicit class ValidationTry[A](val e: ErrorOr[A]) extends AnyVal {
def toTry: Try[A] = e match {
case Valid(options) => Success(options)
case Invalid(err) => Failure(new RuntimeException(s"Error(s): ${err.toList.mkString(",")}"))
case Invalid(err) => Failure(AggregatedMessageException("Error(s)", err.toList))
}
def toTry(context: String): Try[A] = e match {
case Valid(options) => Success(options)
case Invalid(err) => Failure(AggregatedMessageException(context, err.toList))
}
}
}
14 changes: 8 additions & 6 deletions engine/src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,27 @@ paths:
type: file
in: formData
- name: workflowInputs
description: JSON file containing the inputs as an object. For WDL workflows a skeleton file can be generated from wdltool using the "inputs" subcommand. When multiple files are specified, in case of key conflicts between multiple input JSON files, higher values of x in workflowInputs_x override lower values. For example, an input specified in workflowInputs_3 will override an input with the same name in workflowInputs or workflowInputs_2. Similarly, an input key specified in workflowInputs_5 will override an identical input key in any other input file.
description: JSON or YAML file containing the inputs as an object. For WDL workflows a skeleton file can be generated from wdltool using the "inputs" subcommand. When multiple files are specified, in case of key conflicts between multiple input JSON files, higher values of x in workflowInputs_x override lower values. For example, an input specified in workflowInputs_3 will override an input with the same name in workflowInputs or workflowInputs_2. Similarly, an input key specified in workflowInputs_5 will override an identical input key in any other input file.
required: false
type: file
in: formData
- name: workflowInputs_2
description: A second JSON file containing inputs.
description: A second JSON or YAML file containing inputs.
required: false
type: file
in: formData
- name: workflowInputs_3
description: A third JSON file containing inputs.
description: A third JSON or YAML file containing inputs.
required: false
type: file
in: formData
- name: workflowInputs_4
description: A fourth JSON file containing inputs.
description: A fourth JSON or YAML file containing inputs.
required: false
type: file
in: formData
- name: workflowInputs_5
description: A fifth JSON file containing inputs.
description: A fifth JSON or YAML file containing inputs.
required: false
type: file
in: formData
Expand Down Expand Up @@ -77,7 +77,7 @@ paths:
schema:
$ref: '#/definitions/WorkflowIdAndStatus'
'400':
$ref: '#/responses/BadRequest'
$ref: '#/responses/InvalidSubmission'
'500':
$ref: '#/responses/ServerError'
security:
Expand Down Expand Up @@ -565,6 +565,8 @@ responses:
description: Successful Request
schema:
$ref: '#/definitions/WorkflowIdAndStatus'
InvalidSubmission:
description: Invalid submission request
BadRequest:
description: Malformed Workflow ID
Forbidden:
Expand Down
176 changes: 111 additions & 65 deletions engine/src/main/scala/cromwell/webservice/PartialWorkflowSources.scala
Original file line number Diff line number Diff line change
@@ -1,84 +1,132 @@
package cromwell.webservice

import _root_.io.circe.yaml
import akka.util.ByteString
import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import cats.syntax.apply._
import cats.syntax.validated._
import cromwell.core._
import common.validation.ErrorOr.ErrorOr
import common.validation.Validation._
import cromwell.core._
import org.slf4j.LoggerFactory
import spray.json.{JsObject, JsValue}
import wdl.WorkflowJson
import wom.core._

import scala.util.Try
import scala.util.{Failure, Success, Try}

final case class PartialWorkflowSources(workflowSource: Option[WorkflowSource],
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
workflowInputs: Vector[WorkflowJson],
workflowInputsAux: Map[Int, WorkflowJson],
workflowOptions: Option[WorkflowOptionsJson],
customLabels: Option[WorkflowJson],
zippedImports: Option[Array[Byte]],
warnings: Seq[String])
final case class PartialWorkflowSources(workflowSource: WorkflowSource,
workflowType: Option[WorkflowType] = None,
workflowTypeVersion: Option[WorkflowTypeVersion] = None,
workflowInputs: Vector[WorkflowJson] = Vector.empty,
workflowInputsAux: Map[Int, WorkflowJson] = Map.empty,
workflowOptions: Option[WorkflowOptionsJson] = None,
customLabels: Option[WorkflowJson] = None,
zippedImports: Option[Array[Byte]] = None,
warnings: Seq[String] = List.empty)

object PartialWorkflowSources {
val log = LoggerFactory.getLogger(classOf[PartialWorkflowSources])

def empty = PartialWorkflowSources(
workflowSource = None,
workflowType = None,
workflowTypeVersion = None,
workflowInputs = Vector.empty,
workflowInputsAux = Map.empty,
workflowOptions = None,
customLabels = None,
zippedImports = None,
warnings = Vector.empty
)


val WdlSourceKey = "wdlSource"
val WorkflowSourceKey = "workflowSource"
val WorkflowTypeKey = "workflowType"
val WorkflowTypeVersionKey = "workflowTypeVersion"
val WorkflowInputsKey = "workflowInputs"
val WorkflowInputsAuxPrefix = "workflowInputs_"
val WorkflowOptionsKey = "workflowOptions"
val CustomLabelsKey = "customLabels"
val WdlDependenciesKey = "wdlDependencies"
val WorkflowDependenciesKey = "workflowDependencies"

val allKeys = List(WdlSourceKey, WorkflowSourceKey, WorkflowTypeKey, WorkflowTypeVersionKey, WorkflowInputsKey,
WorkflowOptionsKey, CustomLabelsKey, WdlDependenciesKey, WorkflowDependenciesKey)

val allPrefixes = List(WorkflowInputsAuxPrefix)

def fromSubmitRoute(formData: Map[String, ByteString],
allowNoInputs: Boolean): Try[Seq[WorkflowSourceFilesCollection]] = {
val partialSources: Try[PartialWorkflowSources] = Try {
formData.foldLeft(PartialWorkflowSources.empty) {
(partialSources: PartialWorkflowSources, kv: (String, ByteString)) =>
val (name, data) = kv

name match {
case "wdlSource" =>
val warning = deprecationWarning(out = "wdlSource", in = "workflowSource")
val warnings = warning +: partialSources.warnings
partialSources.copy(workflowSource = Option(data.utf8String), warnings = warnings)
case "workflowSource" => partialSources.copy(workflowSource = Option(data.utf8String))
case "workflowType" => partialSources.copy(workflowType = Option(data.utf8String))
case "workflowTypeVersion" => partialSources.copy(workflowTypeVersion = Option(data.utf8String))
case "workflowInputs" => partialSources.copy(workflowInputs = workflowInputs(data.utf8String))
case _ if name.startsWith("workflowInputs_") =>
val index = name.stripPrefix("workflowInputs_").toInt
partialSources.copy(workflowInputsAux = partialSources.workflowInputsAux + (index -> data.utf8String))
case "workflowOptions" => partialSources.copy(workflowOptions = Option(data.utf8String))
case "wdlDependencies" =>
val warning = deprecationWarning(out = "wdlDependencies", in = "workflowDependencies")
val warnings = warning +: partialSources.warnings
partialSources.copy(zippedImports = Option(data.toArray), warnings = warnings)
case "workflowDependencies" => partialSources.copy(zippedImports = Option(data.toArray))
case "customLabels" => partialSources.copy(customLabels = Option(data.utf8String))
case _ => throw new IllegalArgumentException(s"Unexpected body part name: $name")
}
import cats.instances.list._
import cats.syntax.apply._
import cats.syntax.traverse._
import cats.syntax.validated._

val partialSources: ErrorOr[PartialWorkflowSources] = {
def getStringValue(key: String) = formData.get(key).map(_.utf8String)
def getArrayValue(key: String) = formData.get(key).map(_.toArray)

// unrecognized keys
val unrecognized: ErrorOr[Unit] = formData.keySet
.filterNot(name => allKeys.contains(name) || allPrefixes.exists(name.startsWith))
.toList
.map(name => s"Unexpected body part name: $name") match {
case Nil => ().validNel
case head :: tail => NonEmptyList.of(head, tail: _*).invalid
}

// workflow source
val wdlSource = getStringValue(WdlSourceKey)
val workflowSource = getStringValue(WorkflowSourceKey)
val wdlSourceWarning = wdlSource.map(_ => wdlSourceDeprecationWarning)
val workflowSourceFinal: ErrorOr[String] = (wdlSource, workflowSource) match {
case (Some(source), None) => source.validNel
case (None, Some(source)) => source.validNel
case (Some(_), Some(_)) => "wdlSource and workflowSource can't both be supplied".invalidNel
case (None, None) => "workflowSource needs to be supplied".invalidNel
}

// workflow inputs
val workflowInputs: ErrorOr[Vector[WorkflowJson]] = getStringValue(WorkflowInputsKey) match {
case Some(inputs) => workflowInputsValidation(inputs)
case None => Vector.empty.validNel
}
val workflowInputsAux: ErrorOr[Map[Int, String]] = formData.toList.flatTraverse[ErrorOr, (Int, String)]({
case (name, value) if name.startsWith(WorkflowInputsAuxPrefix) =>
Try(name.stripPrefix(WorkflowInputsAuxPrefix).toInt).toErrorOr.map(index => List((index, value.utf8String)))
case _ => List.empty.validNel
}).map(_.toMap)

// dependencies
val wdlDependencies = getArrayValue(WdlDependenciesKey)
val workflowDependencies = getArrayValue(WorkflowDependenciesKey)
val wdlDependenciesWarning = wdlDependencies.map(_ => wdlDependenciesDeprecationWarning)
val workflowDependenciesFinal: ErrorOr[Option[Array[Byte]]] = (wdlDependencies, workflowDependencies) match {
case (Some(dep), None) => Option(dep).validNel
case (None, Some(dep)) => Option(dep).validNel
case (Some(_), Some(_)) => "wdlDependencies and workflowDependencies can't both be supplied".invalidNel
case (None, None) => None.validNel
}

(unrecognized, workflowSourceFinal, workflowInputs, workflowInputsAux, workflowDependenciesFinal) mapN {
case (_, source, inputs, aux, dep) => PartialWorkflowSources(
workflowSource = source,
workflowType = getStringValue(WorkflowTypeKey),
workflowTypeVersion = getStringValue(WorkflowTypeVersionKey),
workflowInputs = inputs,
workflowInputsAux= aux,
workflowOptions = getStringValue(WorkflowOptionsKey),
customLabels = getStringValue(CustomLabelsKey),
zippedImports = dep,
warnings = wdlSourceWarning.toVector ++ wdlDependenciesWarning.toVector
)
}
}

partialSourcesToSourceCollections(partialSources.toErrorOr, allowNoInputs).toTry
partialSourcesToSourceCollections(partialSources, allowNoInputs) match {
case Valid(source) => Success(source)
case Invalid(errors) => Failure(new RuntimeException(s"Error(s): ${errors.toList.mkString("\n")}"))
}
}

private def workflowInputs(data: String): Vector[WorkflowJson] = {
import spray.json._
data.parseJson match {
case JsArray(Seq(x, xs@_*)) => (Vector(x) ++ xs).map(_.compactPrint)
case JsArray(_) => Vector.empty
case v: JsValue => Vector(v.compactPrint)
private def workflowInputsValidation(data: String): ErrorOr[Vector[WorkflowJson]] = {
import _root_.io.circe.Printer
import cats.syntax.validated._

yaml.parser.parse(data) match {
// If it's an array, treat each element as an individual input object, otherwise simply toString the whole thing
case Right(json) => json.asArray.map(_.map(_.toString())).getOrElse(Vector(json.pretty(Printer.noSpaces))).validNel
case Left(error) => s"Input file is not valid yaml nor json: ${error.getMessage}".invalidNel
}
}

Expand All @@ -96,11 +144,6 @@ object PartialWorkflowSources {
def validateOptions(options: Option[WorkflowOptionsJson]): ErrorOr[WorkflowOptions] =
WorkflowOptions.fromJsonString(options.getOrElse("{}")).toErrorOr leftMap { _ map { i => s"Invalid workflow options provided: $i" } }

def validateWorkflowSource(partialSource: PartialWorkflowSources): ErrorOr[WorkflowJson] = partialSource.workflowSource match {
case Some(src) => src.validNel
case _ => s"Incomplete workflow submission: $partialSource".invalidNel
}

def validateWorkflowType(partialSource: PartialWorkflowSources): ErrorOr[Option[WorkflowType]] = {
partialSource.workflowType match {
case Some(_) => partialSource.workflowType.validNel
Expand All @@ -117,12 +160,12 @@ object PartialWorkflowSources {

partialSources match {
case Valid(partialSource) =>
(validateWorkflowSource(partialSource), validateInputs(partialSource),
(validateInputs(partialSource),
validateOptions(partialSource.workflowOptions), validateWorkflowType(partialSource),
validateWorkflowTypeVersion(partialSource)) mapN {
case (wfSource, wfInputs, wfOptions, workflowType, workflowTypeVersion) =>
case (wfInputs, wfOptions, workflowType, workflowTypeVersion) =>
wfInputs.map(inputsJson => WorkflowSourceFilesCollection(
workflowSource = wfSource,
workflowSource = partialSource.workflowSource,
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion,
inputsJson = inputsJson,
Expand All @@ -134,6 +177,9 @@ object PartialWorkflowSources {
case Invalid(err) => err.invalid
}
}

private val wdlSourceDeprecationWarning = deprecationWarning(out = "wdlSource", in = "workflowSource")
private val wdlDependenciesDeprecationWarning = deprecationWarning(out = "wdlDependencies", in = "workflowDependencies")

private def deprecationWarning(out: String, in: String): String = {
val warning =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,18 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
}

it should "return 400 for an unrecognized form data request parameter " in {
val formData = Multipart.FormData(Multipart.FormData.BodyPart("incorrectParameter", HttpEntity(MediaTypes.`application/json`, HelloWorld.workflowSource()))).toEntity()
val formData = Multipart.FormData(Map(
"incorrectParameter" -> HttpEntity(MediaTypes.`application/json`, HelloWorld.workflowSource()),
"incorrectParameter2" -> HttpEntity(MediaTypes.`application/json`, HelloWorld.workflowSource())
)).toEntity

Post(s"/workflows/$version", formData) ~>
akkaHttpService.workflowRoutes ~>
check {
assertResult(
s"""{
| "status": "fail",
| "message": "Error(s): Unexpected body part name: incorrectParameter"
| "message": "Error(s): Unexpected body part name: incorrectParameter\\nUnexpected body part name: incorrectParameter2\\nworkflowSource needs to be supplied"
|}""".stripMargin) {
responseAs[String]
}
Expand Down

0 comments on commit 1f63bd5

Please sign in to comment.