diff --git a/.pullapprove.yml b/.pullapprove.yml index 67ea4ac58..c26c94ba2 100644 --- a/.pullapprove.yml +++ b/.pullapprove.yml @@ -1,16 +1,24 @@ -approve_by_comment: true -approve_regex: ':\+1:' -reset_on_push: false -author_approval: ignored -reviewers: +# enabling version 2 turns github reviews on by default +version: 2 +group_defaults: + approve_by_comment: + enabled: true + approve_regex: ':\+1:' + reset_on_push: + enabled: false +groups: + reviewers: required: 2 - members: + github_reviews: + enabled: true + author_approval: + ignored: true + users: - Horneth - cjllanwarne - francares - gauravs90 - geoffjentry - - jainh - jsotobroad - katevoss - kcibul diff --git a/.travis.yml b/.travis.yml index 9c5941c5c..7b8dfaa2f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,10 @@ sudo: required dist: trusty +services: + - docker language: scala scala: - - 2.11.8 + - 2.12.2 jdk: - oraclejdk8 cache: @@ -18,7 +20,6 @@ before_cache: before_install: # https://github.com/travis-ci/travis-ci/issues/7940#issuecomment-310759657 - sudo rm -f /etc/boto.cfg - - openssl aes-256-cbc -K "$encrypted_5ebd3ff04788_key" -iv "$encrypted_5ebd3ff04788_iv" -in src/bin/travis/resources/jesConf.tar.enc -out jesConf.tar -d || true env: global: - CENTAUR_BRANCH=develop diff --git a/CHANGELOG.md b/CHANGELOG.md index 82540c66b..794a378b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Cromwell Change Log +## 29 + +### Breaking Changes + +* Request timeouts for HTTP requests on the REST API now return a 503 status code instead of 500. The response for a request timeout is no longer in JSON format. +* The metadata endpoint no longer returns gzipped responses by default. This now needs to be explicitly requested with an `Accept-Encoding: gzip` header + +* Command line usage has been extensively revised for Cromwell 29. Please see the +[README](https://github.com/broadinstitute/cromwell#command-line-usage) for details. + +* The engine endpoints are now served under `/engine`. Previousely engine endpoints were available under +`/api/engine`. Workflow endpoints are still served under `/api/workflows`. The setting `api.routeUnwrapped` has been +retired at the same time. + +* The response format of the [callcaching/diff](https://github.com/broadinstitute/cromwell#get-apiworkflowsversioncallcachingdiff) endpoint has been updated. + +### Cromwell Server + +* Cromwell now attempts to gracefully shutdown when running in server mode and receiving a `SIGINT` (`Ctrl-C`) or `SIGTERM` (`kill`) signal. This includes waiting for all pending Database writes before exiting. +A detailed explanation and information about how to configure this feature can be found in the [Cromwell Wiki](https://github.com/broadinstitute/cromwell/wiki/DevZone#graceful-server-shutdown). + ## 28 ### Bug Fixes diff --git a/NOTICE b/NOTICE deleted file mode 100644 index 15d3c2b54..000000000 --- a/NOTICE +++ /dev/null @@ -1,4 +0,0 @@ -cromwell.webservice/PerRequest.scala (https://github.com/NET-A-PORTER/spray-actor-per-request) -is distributed with this software under the Apache License, Version 2.0 (see the LICENSE-ASL file). In accordance -with that license, that software comes with the following notices: -    Copyright (C) 2011-2012 Ian Forsey diff --git a/README.md b/README.md index 362c5e2fe..97040254e 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ A [Workflow Management System](https://en.wikipedia.org/wiki/Workflow_management * [Building](#building) * [Installing](#installing) * [Upgrading from 0.19 to 0.21](#upgrading-from-019-to-021) -* [**NEW** Command Line Usage](http://gatkforums.broadinstitute.org/wdl/discussion/8782/command-line-cromwell) (on the WDL/Cromwell Website) +* [Command Line Usage](#command-line-usage) * [Getting Started with WDL](#getting-started-with-wdl) * [WDL Support](#wdl-support) * [Configuring Cromwell](#configuring-cromwell) @@ -94,8 +94,8 @@ A [Workflow Management System](https://en.wikipedia.org/wiki/Workflow_management * [POST /api/workflows/:version/:id/abort](#post-apiworkflowsversionidabort) * [GET /api/workflows/:version/backends](#get-apiworkflowsversionbackends) * [GET /api/workflows/:version/callcaching/diff](#get-apiworkflowsversioncallcachingdiff) - * [GET /api/engine/:version/stats](#get-apiengineversionstats) - * [GET /api/engine/:version/version](#get-apiengineversionversion) + * [GET /engine/:version/stats](#get-engineversionstats) + * [GET /engine/:version/version](#get-engineversionversion) * [Error handling](#error-handling) * [Developer](#developer) * [Generating table of contents on Markdown files](#generating-table-of-contents-on-markdown-files) @@ -120,13 +120,13 @@ There is a [Cromwell gitter channel](https://gitter.im/broadinstitute/cromwell) The following is the toolchain used for development of Cromwell. Other versions may work, but these are recommended. -* [Scala 2.11.8](http://www.scala-lang.org/news/2.11.8/) +* [Scala 2.12.2](http://www.scala-lang.org/news/2.12.1#scala-212-notes) * [SBT 0.13.12](https://github.com/sbt/sbt/releases/tag/v0.13.12) * [Java 8](http://www.oracle.com/technetwork/java/javase/overview/java8-2100321.html) # Building -`sbt assembly` will build a runnable JAR in `target/scala-2.11/` +`sbt assembly` will build a runnable JAR in `target/scala-2.12/` Tests are run via `sbt test`. Note that the tests do require Docker to be running. To test this out while downloading the Ubuntu image that is required for tests, run `docker pull ubuntu:latest` prior to running `sbt test` @@ -138,6 +138,206 @@ OS X users can install Cromwell with Homebrew: `brew install cromwell`. See the [migration document](MIGRATION.md) for more details. +# Command Line Usage + +For built-in documentation of Cromwell command line usage, run the Cromwell JAR file with no arguments: + +``` +$ java -jar cromwell-.jar +``` + +For example, `$ java -jar cromwell-29.jar`. You will get a usage message like the following: + +``` +cromwell 29 +Usage: java -jar /path/to/cromwell.jar [server|run] [options] ... + + --help Cromwell - Workflow Execution Engine + --version +Command: server +Starts a web server on port 8000. See the web server documentation for more details about the API endpoints. +Command: run [options] workflow-source +Run the workflow and print out the outputs in JSON format. + workflow-source Workflow source file. + -i, --inputs Workflow inputs file. + -o, --options Workflow options file. + -t, --type Workflow type. + -v, --type-version + Workflow type version. + -l, --labels Workflow labels file. + -p, --imports A directory or zipfile to search for workflow imports. + -m, --metadata-output + An optional directory path to output metadata. +``` + +## --version + +The `--version` option prints the version of Cromwell and exits. + +## --help + +The `--help` option prints the full help text above and exits. + +## server + +The `server` command runs Cromwell as a web server. No arguments are accepted. +See the documentation for Cromwell's REST endpoints [here](#rest-api). + +## run + +The `run` command executes a single workflow in Cromwell. + +### workflow-source +The `run` command requires a single argument for the workflow source file. + +### --inputs +An optional file of workflow inputs. Although optional, it is a best practice to use an inputs file to satisfy workflow +requirements rather than hardcoding inputs directly into a workflow source file. + +### --options +An optional file of workflow options. Some options are global (supported by all backends), while others are backend-specific. +See the [workflow options](#workflow-options) documentation for more details. + +### --type +An optional parameter to specify the language for the workflow source. Any value specified for this parameter is currently +ignored and internally the value `WDL` is used. + +### --type-version +An optional parameter to specify the version of the language for the workflow source. Currently any specified value is ignored. + +### --labels +An optional parameter to specify a file of JSON key-value label pairs to associate with the workflow. + +### --imports +You have the option of importing WDL workflows or tasks to use within your workflow, known as sub-workflows. +If you use sub-workflows within your primary workflow then you must include a zip file with the WDL import files. + +For example, say you have a directory of WDL files: + +``` +wdl_library +└──cgrep.wdl +└──ps.wdl +└──wc.wdl +``` + +If you zip that directory into `wdl_library.zip`, then you can reference and use these WDLs within your primary WDL. + +This could be your primary WDL: + +``` +import "ps.wdl" as ps +import "cgrep.wdl" +import "wc.wdl" as wordCount + +workflow my_wf { + +call ps.ps as getStatus +call cgrep.cgrep { input: str = getStatus.x } +call wordCount { input: str = ... } + +} +``` + +Then to run this WDL without any inputs, workflow options, or metadata files, you would enter: + +`$ java -jar cromwell-.jar run my_wf.wdl --imports /path/to/wdl_library.zip` + +### --metadata-output + +You can include a path where Cromwell will write the workflow metadata JSON, such as start/end timestamps, status, inputs, and outputs. By default, Cromwell does not write workflow metadata. + +This example includes a metadata path called `/path/to/my_wf.metadata`: + +``` +$ java -jar cromwell-.jar run my_wf.wdl --metadata-output /path/to/my_wf.metadata +``` + +Again, Cromwell is very verbose. Here is the metadata output in my_wf.metadata: + +``` +{ + "workflowName": "my_wf", + "submittedFiles": { + "inputs": "{\"my_wf.hello.addressee\":\"m'Lord\"}", + "workflow": "\ntask hello {\n String addressee\n command {\n echo \"Hello ${addressee}!\"\n }\n output {\n String salutation = read_string(stdout())\n }\n runtime {\n +\n }\n}\n\nworkflow my_wf {\n call hello\n output {\n hello.salutation\n }\n}\n", + "options": "{\n\n}" + }, + "calls": { + "my_wf.hello": [ + { + "executionStatus": "Done", + "stdout": "/Users/jdoe/Documents/cromwell-executions/my_wf/cd0fe94a-984e-4a19-ab4c-8f7f07038068/call-hello/execution/stdout", + "backendStatus": "Done", + "shardIndex": -1, + "outputs": { + "salutation": "Hello m'Lord!" + }, + "runtimeAttributes": { + "continueOnReturnCode": "0", + "failOnStderr": "false" + }, + "callCaching": { + "allowResultReuse": false, + "effectiveCallCachingMode": "CallCachingOff" + }, + "inputs": { + "addressee": "m'Lord" + }, + "returnCode": 0, + "jobId": "28955", + "backend": "Local", + "end": "2017-04-19T10:53:25.045-04:00", + "stderr": "/Users/jdoe/Documents/cromwell-executions/my_wf/cd0fe94a-984e-4a19-ab4c-8f7f07038068/call-hello/execution/stderr", + "callRoot": "/Users/jdoe/Documents/cromwell-executions/my_wf/cd0fe94a-984e-4a19-ab4c-8f7f07038068/call-hello", + "attempt": 1, + "executionEvents": [ + { + "startTime": "2017-04-19T10:53:23.570-04:00", + "description": "PreparingJob", + "endTime": "2017-04-19T10:53:23.573-04:00" + }, + { + "startTime": "2017-04-19T10:53:23.569-04:00", + "description": "Pending", + "endTime": "2017-04-19T10:53:23.570-04:00" + }, + { + "startTime": "2017-04-19T10:53:25.040-04:00", + "description": "UpdatingJobStore", + "endTime": "2017-04-19T10:53:25.045-04:00" + }, + { + "startTime": "2017-04-19T10:53:23.570-04:00", + "description": "RequestingExecutionToken", + "endTime": "2017-04-19T10:53:23.570-04:00" + }, + { + "startTime": "2017-04-19T10:53:23.573-04:00", + "description": "RunningJob", + "endTime": "2017-04-19T10:53:25.040-04:00" + } + ], + "start": "2017-04-19T10:53:23.569-04:00" + } + ] + }, + "outputs": { + "my_wf.hello.salutation": "Hello m'Lord!" + }, + "workflowRoot": "/Users/jdoe/Documents/cromwell-executions/my_wf/cd0fe94a-984e-4a19-ab4c-8f7f07038068", + "id": "cd0fe94a-984e-4a19-ab4c-8f7f07038068", + "inputs": { + "my_wf.hello.addressee": "m'Lord" + }, + "submission": "2017-04-19T10:53:19.565-04:00", + "status": "Succeeded", + "end": "2017-04-19T10:53:25.063-04:00", + "start": "2017-04-19T10:53:23.535-04:00" +} +``` + # Getting Started with WDL For many examples on how to use WDL see [the WDL site](https://github.com/broadinstitute/wdl#getting-started-with-wdl) @@ -3439,6 +3639,16 @@ The `call` and `workflow` may optionally contain failures shaped like this: ] ``` +### Compressing the metadata response + +The response from the metadata endpoint can be quite large depending on the workflow. To help with this Cromwell supports gzip encoding the metadata prior to sending it back to the client. In order to enable this, make sure your client is sending the `Accept-Encoding: gzip` header. + +For instance, with cURL: + +``` +$ curl -H "Accept-Encoding: gzip" http://localhost:8000/api/workflows/v1/b3e45584-9450-4e73-9523-fc3ccf749848/metadata +``` + ## POST /api/workflows/:version/:id/abort cURL: @@ -3553,28 +3763,24 @@ Server: spray-can/1.3.3 }, "hashDifferential": [ { - "command template": { - "callA": "4EAADE3CD5D558C5A6CFA4FD101A1486", - "callB": "3C7A0CA3D7A863A486DBF3F7005D4C95" - } + "hashKey": "command template", + "callA": "4EAADE3CD5D558C5A6CFA4FD101A1486", + "callB": "3C7A0CA3D7A863A486DBF3F7005D4C95" }, { - "input count": { - "callA": "C4CA4238A0B923820DCC509A6F75849B", - "callB": "C81E728D9D4C2F636F067F89CC14862C" - } + "hashKey": "input count", + "callA": "C4CA4238A0B923820DCC509A6F75849B", + "callB": "C81E728D9D4C2F636F067F89CC14862C" }, { - "input: String addressee": { - "callA": "D4CC65CB9B5F22D8A762532CED87FE8D", - "callB": "7235E005510D99CB4D5988B21AC97B6D" - } + "hashKey": "input: String addressee", + "callA": "D4CC65CB9B5F22D8A762532CED87FE8D", + "callB": "7235E005510D99CB4D5988B21AC97B6D" }, { - "input: String addressee2": { - "callA": "116C7E36B4AE3EAFD07FA4C536CE092F", - "callB": null - } + "hashKey": "input: String addressee2", + "callA": "116C7E36B4AE3EAFD07FA4C536CE092F", + "callB": null } ] } @@ -3594,9 +3800,10 @@ Differences can be of 3 kinds: For instance, in the example above, ```json -"input: String addressee": { - "callA": "D4CC65CB9B5F22D8A762532CED87FE8D", - "callB": "7235E005510D99CB4D5988B21AC97B6D" +{ + "hashKey": "input: String addressee", + "callA": "D4CC65CB9B5F22D8A762532CED87FE8D", + "callB": "7235E005510D99CB4D5988B21AC97B6D" } ``` @@ -3606,9 +3813,10 @@ indicates that both `callA` and `callB` have a `String` input called `addressee` For instance, in the example above, ```json -"input: String addressee2": { - "callA": "116C7E36B4AE3EAFD07FA4C536CE092F", - "callB": null +{ + "hashKey": "input: String addressee2", + "callA": "116C7E36B4AE3EAFD07FA4C536CE092F", + "callB": null } ``` @@ -3627,7 +3835,7 @@ Server: spray-can/1.3.3 { "status": "error", - "message": "Cannot find a cache entry for 479f8a8-efa4-46e4-af0d-802addc66e5d:wf_hello.hello:-1" + "message": "Cannot find call 479f8a8-efa4-46e4-af0d-802addc66e5d:wf_hello.hello:-1" } ``` @@ -3643,7 +3851,7 @@ Server: spray-can/1.3.3 { "status": "error", - "message": "Cannot find cache entries for 5174842-4a44-4355-a3a9-3a711ce556f1:wf_hello.hello:-1, 479f8a8-efa4-46e4-af0d-802addc66e5d:wf_hello.hello:-1" + "message": "Cannot find calls 5174842-4a44-4355-a3a9-3a711ce556f1:wf_hello.hello:-1, 479f8a8-efa4-46e4-af0d-802addc66e5d:wf_hello.hello:-1" } ``` @@ -3665,18 +3873,18 @@ Server: spray-can/1.3.3 } ``` -## GET /api/engine/:version/stats +## GET /engine/:version/stats This endpoint returns some basic statistics on the current state of the engine. At the moment that includes the number of running workflows and the number of active jobs. cURL: ``` -$ curl http://localhost:8000/api/engine/v1/stats +$ curl http://localhost:8000/engine/v1/stats ``` HTTPie: ``` -$ http http://localhost:8000/api/engine/v1/stats +$ http http://localhost:8000/engine/v1/stats ``` Response: @@ -3692,18 +3900,18 @@ Response: } ``` -## GET /api/engine/:version/version +## GET /engine/:version/version This endpoint returns the version of the Cromwell engine. cURL: ``` -$ curl http://localhost:8000/api/engine/v1/version +$ curl http://localhost:8000/engine/v1/version ``` HTTPie: ``` -$ http http://localhost:8000/api/engine/v1/version +$ http http://localhost:8000/engine/v1/version ``` Response: @@ -3771,25 +3979,3 @@ e.g. The `message` field contains a short description of the error. The `errors` field is optional and may contain additional information about why the request failed. - -# Developer - -## Generating table of contents on Markdown files - -``` -$ pip install mdtoc -$ mdtoc --check-links README.md -``` - -## Generating and Hosting ScalaDoc - -Essentially run `sbt doc` then commit the generated code into the `gh-pages` branch on this repository - -``` -$ sbt doc -$ git co gh-pages -$ mv target/scala-2.11/api scaladoc -$ git add scaladoc -$ git commit -m "API Docs" -$ git push origin gh-pages -``` diff --git a/backend/src/main/scala/cromwell/backend/BackendJobBreadCrumb.scala b/backend/src/main/scala/cromwell/backend/BackendJobBreadCrumb.scala index 3fa6e9c19..c6ddcbcc4 100644 --- a/backend/src/main/scala/cromwell/backend/BackendJobBreadCrumb.scala +++ b/backend/src/main/scala/cromwell/backend/BackendJobBreadCrumb.scala @@ -3,9 +3,9 @@ package cromwell.backend import cromwell.backend.io.JobPaths import cromwell.core.path.Path import cromwell.core.{JobKey, WorkflowId} -import wdl4s.Workflow +import wdl4s.wdl.WdlWorkflow -case class BackendJobBreadCrumb(workflow: Workflow, id: WorkflowId, jobKey: JobKey) { +case class BackendJobBreadCrumb(workflow: WdlWorkflow, id: WorkflowId, jobKey: JobKey) { def toPath(root: Path): Path = { val workflowPart = root.resolve(workflow.unqualifiedName).resolve(id.toString) JobPaths.callPathBuilder(workflowPart, jobKey) diff --git a/backend/src/main/scala/cromwell/backend/BackendJobExecutionActor.scala b/backend/src/main/scala/cromwell/backend/BackendJobExecutionActor.scala index fa93810cb..068859172 100644 --- a/backend/src/main/scala/cromwell/backend/BackendJobExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/BackendJobExecutionActor.scala @@ -7,8 +7,8 @@ import cromwell.backend.BackendLifecycleActor._ import cromwell.backend.wdl.OutputEvaluator import cromwell.core.path.Path import cromwell.core.{CallOutputs, ExecutionEvent, JobKey} -import wdl4s.expression.WdlStandardLibraryFunctions -import wdl4s.values.WdlValue +import wdl4s.wdl.expression.WdlStandardLibraryFunctions +import wdl4s.wdl.values.WdlValue import scala.concurrent.Future import scala.util.{Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/BackendLifecycleActor.scala b/backend/src/main/scala/cromwell/backend/BackendLifecycleActor.scala index a6a09cff4..5c1031726 100644 --- a/backend/src/main/scala/cromwell/backend/BackendLifecycleActor.scala +++ b/backend/src/main/scala/cromwell/backend/BackendLifecycleActor.scala @@ -3,7 +3,7 @@ package cromwell.backend import akka.actor.{Actor, ActorRef} import cromwell.backend.BackendLifecycleActor._ import cromwell.core.logging.{JobLogging, WorkflowLogging} -import wdl4s.TaskCall +import wdl4s.wdl.WdlTaskCall import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -65,7 +65,7 @@ trait BackendWorkflowLifecycleActor extends BackendLifecycleActor with WorkflowL /** * The subset of calls which this backend will be expected to run */ - protected def calls: Set[TaskCall] + protected def calls: Set[WdlTaskCall] } trait BackendJobLifecycleActor extends BackendLifecycleActor with JobLogging { diff --git a/backend/src/main/scala/cromwell/backend/BackendLifecycleActorFactory.scala b/backend/src/main/scala/cromwell/backend/BackendLifecycleActorFactory.scala index d0da2c2c5..c255fdc12 100644 --- a/backend/src/main/scala/cromwell/backend/BackendLifecycleActorFactory.scala +++ b/backend/src/main/scala/cromwell/backend/BackendLifecycleActorFactory.scala @@ -6,8 +6,8 @@ import cromwell.backend.io.WorkflowPathsWithDocker import cromwell.core.CallOutputs import cromwell.core.JobExecutionToken.JobExecutionTokenType import cromwell.core.path.Path -import wdl4s.TaskCall -import wdl4s.expression.{PureStandardLibraryFunctions, WdlStandardLibraryFunctions} +import wdl4s.wdl.WdlTaskCall +import wdl4s.wdl.expression.{PureStandardLibraryFunctions, WdlStandardLibraryFunctions} trait BackendLifecycleActorFactory { @@ -18,8 +18,9 @@ trait BackendLifecycleActorFactory { def workflowInitializationActorProps(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, - calls: Set[TaskCall], - serviceRegistryActor: ActorRef): Option[Props] = None + calls: Set[WdlTaskCall], + serviceRegistryActor: ActorRef, + restarting: Boolean): Option[Props] = None /* ****************************** */ /* Job Execution */ @@ -39,7 +40,7 @@ trait BackendLifecycleActorFactory { def workflowFinalizationActorProps(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, - calls: Set[TaskCall], + calls: Set[WdlTaskCall], jobExecutionMap: JobExecutionMap, workflowOutputs: CallOutputs, initializationData: Option[BackendInitializationData]): Option[Props] = None diff --git a/backend/src/main/scala/cromwell/backend/BackendWorkflowInitializationActor.scala b/backend/src/main/scala/cromwell/backend/BackendWorkflowInitializationActor.scala index a7ad6f5c7..18e341ad7 100644 --- a/backend/src/main/scala/cromwell/backend/BackendWorkflowInitializationActor.scala +++ b/backend/src/main/scala/cromwell/backend/BackendWorkflowInitializationActor.scala @@ -9,10 +9,10 @@ import cromwell.backend.validation.ContinueOnReturnCodeValidation import cromwell.core.{WorkflowMetadataKeys, WorkflowOptions} import cromwell.services.metadata.MetadataService.PutMetadataAction import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue} -import wdl4s._ -import wdl4s.expression.PureStandardLibraryFunctions -import wdl4s.types._ -import wdl4s.values.WdlValue +import wdl4s.wdl._ +import wdl4s.wdl.expression.PureStandardLibraryFunctions +import wdl4s.wdl.types._ +import wdl4s.wdl.values.WdlValue import scala.concurrent.Future import scala.util.{Failure, Success, Try} @@ -38,7 +38,7 @@ object BackendWorkflowInitializationActor { trait BackendWorkflowInitializationActor extends BackendWorkflowLifecycleActor with ActorLogging { def serviceRegistryActor: ActorRef - def calls: Set[TaskCall] + def calls: Set[WdlTaskCall] /** * This method is meant only as a "pre-flight check" validation of runtime attribute expressions during workflow @@ -125,7 +125,7 @@ trait BackendWorkflowInitializationActor extends BackendWorkflowLifecycleActor w defaultRuntimeAttributes.get(name) } - def badRuntimeAttrsForTask(task: Task) = { + def badRuntimeAttrsForTask(task: WdlTask) = { runtimeAttributeValidators map { case (attributeName, validator) => val value = task.runtimeAttributes.attrs.get(attributeName) orElse defaultRuntimeAttribute(attributeName) attributeName -> ((value, validator(value))) diff --git a/backend/src/main/scala/cromwell/backend/RuntimeAttributeDefinition.scala b/backend/src/main/scala/cromwell/backend/RuntimeAttributeDefinition.scala index 506bbe4eb..7e548df2f 100644 --- a/backend/src/main/scala/cromwell/backend/RuntimeAttributeDefinition.scala +++ b/backend/src/main/scala/cromwell/backend/RuntimeAttributeDefinition.scala @@ -3,9 +3,9 @@ package cromwell.backend import cromwell.core.WorkflowOptions import cromwell.util.JsonFormatting.WdlValueJsonFormatter import lenthall.util.TryUtil -import wdl4s.{WdlExpressionException, _} -import wdl4s.expression.WdlStandardLibraryFunctions -import wdl4s.values.WdlValue +import wdl4s.wdl.{WdlExpressionException, _} +import wdl4s.wdl.expression.WdlStandardLibraryFunctions +import wdl4s.wdl.values.WdlValue import scala.util.{Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala index 79503c927..dd6091f42 100644 --- a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala +++ b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala @@ -2,7 +2,7 @@ package cromwell.backend.async import cromwell.core.path.Path import lenthall.exception.ThrowableAggregation -import wdl4s.values.WdlValue +import wdl4s.wdl.values.WdlValue abstract class KnownJobFailureException extends Exception { def stderrPath: Option[Path] diff --git a/backend/src/main/scala/cromwell/backend/backend.scala b/backend/src/main/scala/cromwell/backend/backend.scala index aad2b1b55..527a514f9 100644 --- a/backend/src/main/scala/cromwell/backend/backend.scala +++ b/backend/src/main/scala/cromwell/backend/backend.scala @@ -6,15 +6,15 @@ import cromwell.core.callcaching.MaybeCallCachingEligible import cromwell.core.labels.Labels import cromwell.core.{CallKey, WorkflowId, WorkflowOptions} import cromwell.services.keyvalue.KeyValueServiceActor.KvResponse -import wdl4s._ -import wdl4s.values.WdlValue +import wdl4s.wdl._ +import wdl4s.wdl.values.WdlValue import scala.util.Try /** * For uniquely identifying a job which has been or will be sent to the backend. */ -case class BackendJobDescriptorKey(call: TaskCall, index: Option[Int], attempt: Int) extends CallKey { +case class BackendJobDescriptorKey(call: WdlTaskCall, index: Option[Int], attempt: Int) extends CallKey { def scope = call private val indexString = index map { _.toString } getOrElse "NA" val tag = s"${call.fullyQualifiedName}:$indexString:$attempt" @@ -37,7 +37,7 @@ case class BackendJobDescriptor(workflowDescriptor: BackendWorkflowDescriptor, object BackendWorkflowDescriptor { def apply(id: WorkflowId, - workflow: Workflow, + workflow: WdlWorkflow, knownValues: Map[FullyQualifiedName, WdlValue], workflowOptions: WorkflowOptions, customLabels: Labels) = { @@ -49,7 +49,7 @@ object BackendWorkflowDescriptor { * For passing to a BackendActor construction time */ case class BackendWorkflowDescriptor(id: WorkflowId, - workflow: Workflow, + workflow: WdlWorkflow, knownValues: Map[FullyQualifiedName, WdlValue], workflowOptions: WorkflowOptions, customLabels: Labels, diff --git a/backend/src/main/scala/cromwell/backend/io/GlobFunctions.scala b/backend/src/main/scala/cromwell/backend/io/GlobFunctions.scala index 9ece33cdd..7af985153 100644 --- a/backend/src/main/scala/cromwell/backend/io/GlobFunctions.scala +++ b/backend/src/main/scala/cromwell/backend/io/GlobFunctions.scala @@ -2,15 +2,15 @@ package cromwell.backend.io import cromwell.backend.BackendJobDescriptor import cromwell.core.CallContext -import wdl4s.TaskCall -import wdl4s.expression.{NoFunctions, PureStandardLibraryFunctionsLike} -import wdl4s.values._ +import wdl4s.wdl.WdlTaskCall +import wdl4s.wdl.expression.{NoFunctions, PureStandardLibraryFunctionsLike} +import wdl4s.wdl.values._ trait GlobFunctions extends PureStandardLibraryFunctionsLike { def callContext: CallContext - def findGlobOutputs(call: TaskCall, jobDescriptor: BackendJobDescriptor): Set[WdlGlobFile] = { + def findGlobOutputs(call: WdlTaskCall, jobDescriptor: BackendJobDescriptor): Set[WdlGlobFile] = { val globOutputs = call.task.findOutputFiles(jobDescriptor.fullyQualifiedInputs, NoFunctions) collect { case glob: WdlGlobFile => glob } diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index c33c1733f..0d572de10 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -19,8 +19,8 @@ import cromwell.services.keyvalue.KvClient import cromwell.services.metadata.CallMetadataKeys import lenthall.util.TryUtil import net.ceedubs.ficus.Ficus._ -import wdl4s._ -import wdl4s.values.{WdlFile, WdlGlobFile, WdlSingleFile, WdlValue} +import wdl4s.wdl._ +import wdl4s.wdl.values._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Promise} import scala.util.{Failure, Success, Try} @@ -251,9 +251,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta * * @return the execution handle for the job. */ - def executeAsync()(implicit ec: ExecutionContext): Future[ExecutionHandle] = { - Future.fromTry(Try(execute())) - } + def executeAsync(): Future[ExecutionHandle] = Future.fromTry(Try(execute())) /** * Recovers the specified job id, or starts a new job. The default implementation simply calls execute(). @@ -269,9 +267,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta * @param jobId The previously recorded job id. * @return the execution handle for the job. */ - def recoverAsync(jobId: StandardAsyncJob)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { - Future.fromTry(Try(recover(jobId))) - } + def recoverAsync(jobId: StandardAsyncJob): Future[ExecutionHandle] = Future.fromTry(Try(recover(jobId))) /** * Returns the run status for the job. @@ -289,10 +285,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta * @param handle The handle of the running job. * @return The status of the job. */ - def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle) - (implicit ec: ExecutionContext): Future[StandardAsyncRunStatus] = { - Future.fromTry(Try(pollStatus(handle))) - } + def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[StandardAsyncRunStatus] = Future.fromTry(Try(pollStatus(handle))) /** * Adds custom behavior invoked when polling fails due to some exception. By default adds nothing. diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardCachingActorHelper.scala b/backend/src/main/scala/cromwell/backend/standard/StandardCachingActorHelper.scala index 7b259ccdc..c6c0b5183 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardCachingActorHelper.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardCachingActorHelper.scala @@ -8,7 +8,7 @@ import cromwell.backend.validation.{RuntimeAttributesValidation, ValidatedRuntim import cromwell.core.logging.JobLogging import cromwell.core.path.Path import cromwell.services.metadata.CallMetadataKeys -import wdl4s.TaskCall +import wdl4s.wdl.WdlTaskCall import scala.util.Try @@ -46,7 +46,7 @@ trait StandardCachingActorHelper extends JobCachingActorHelper { */ lazy val workflowDescriptor: BackendWorkflowDescriptor = jobDescriptor.workflowDescriptor - lazy val call: TaskCall = jobDescriptor.key.call + lazy val call: WdlTaskCall = jobDescriptor.key.call lazy val standardInitializationData: StandardInitializationData = BackendInitializationData. as[StandardInitializationData](backendInitializationDataOption) @@ -79,7 +79,7 @@ trait StandardCachingActorHelper extends JobCachingActorHelper { } /** - * Returns any custom medatata for the backend. + * Returns any custom metadata for the backend. * * @return any custom metadata for the backend. */ diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardExpressionFunctions.scala b/backend/src/main/scala/cromwell/backend/standard/StandardExpressionFunctions.scala index 3b29b796d..010240319 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardExpressionFunctions.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardExpressionFunctions.scala @@ -4,8 +4,8 @@ import cromwell.backend.io.GlobFunctions import cromwell.backend.wdl.{ReadLikeFunctions, WriteFunctions} import cromwell.core.CallContext import cromwell.core.path.{Path, PathBuilder} -import wdl4s.expression.PureStandardLibraryFunctionsLike -import wdl4s.values.{WdlFile, WdlValue} +import wdl4s.wdl.expression.PureStandardLibraryFunctionsLike +import wdl4s.wdl.values.{WdlFile, WdlValue} import scala.util.{Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardFinalizationActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardFinalizationActor.scala index 6acbe6cb7..09d80a77e 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardFinalizationActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardFinalizationActor.scala @@ -6,14 +6,14 @@ import cromwell.backend.io.WorkflowPaths import cromwell.core.CallOutputs import cromwell.core.Dispatcher.IoDispatcher import cromwell.core.path.{Path, PathCopier} -import wdl4s.TaskCall +import wdl4s.wdl.WdlTaskCall import scala.concurrent.Future trait StandardFinalizationActorParams { def workflowDescriptor: BackendWorkflowDescriptor - def calls: Set[TaskCall] + def calls: Set[WdlTaskCall] def jobExecutionMap: JobExecutionMap @@ -27,7 +27,7 @@ trait StandardFinalizationActorParams { case class DefaultStandardFinalizationActorParams ( workflowDescriptor: BackendWorkflowDescriptor, - calls: Set[TaskCall], + calls: Set[WdlTaskCall], jobExecutionMap: JobExecutionMap, workflowOutputs: CallOutputs, initializationDataOption: Option[BackendInitializationData], @@ -45,7 +45,7 @@ class StandardFinalizationActor(val standardParams: StandardFinalizationActorPar extends BackendWorkflowFinalizationActor { override lazy val workflowDescriptor: BackendWorkflowDescriptor = standardParams.workflowDescriptor - override lazy val calls: Set[TaskCall] = standardParams.calls + override lazy val calls: Set[WdlTaskCall] = standardParams.calls lazy val initializationDataOption: Option[BackendInitializationData] = standardParams.initializationDataOption lazy val jobExecutionMap: JobExecutionMap = standardParams.jobExecutionMap lazy val workflowOutputs: CallOutputs = standardParams.workflowOutputs diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala index 73dd47925..c5f2c5ed8 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala @@ -7,8 +7,8 @@ import cromwell.backend.wfs.WorkflowPathBuilder import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendWorkflowDescriptor, BackendWorkflowInitializationActor} import cromwell.core.WorkflowOptions import cromwell.core.path.{DefaultPathBuilder, PathBuilder} -import wdl4s.TaskCall -import wdl4s.values.WdlValue +import wdl4s.wdl.WdlTaskCall +import wdl4s.wdl.values.WdlValue import scala.concurrent.Future import scala.util.Try @@ -16,7 +16,7 @@ import scala.util.Try trait StandardInitializationActorParams { def workflowDescriptor: BackendWorkflowDescriptor - def calls: Set[TaskCall] + def calls: Set[WdlTaskCall] def serviceRegistryActor: ActorRef @@ -27,9 +27,10 @@ case class DefaultInitializationActorParams ( workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, - calls: Set[TaskCall], + calls: Set[WdlTaskCall], serviceRegistryActor: ActorRef, - configurationDescriptor: BackendConfigurationDescriptor + configurationDescriptor: BackendConfigurationDescriptor, + restarting: Boolean ) extends StandardInitializationActorParams /** @@ -44,7 +45,7 @@ class StandardInitializationActor(val standardParams: StandardInitializationActo override lazy val serviceRegistryActor: ActorRef = standardParams.serviceRegistryActor - override lazy val calls: Set[TaskCall] = standardParams.calls + override lazy val calls: Set[WdlTaskCall] = standardParams.calls override def beforeAll(): Future[Option[BackendInitializationData]] = { initializationData map Option.apply diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardLifecycleActorFactory.scala b/backend/src/main/scala/cromwell/backend/standard/StandardLifecycleActorFactory.scala index 6b3b8eb63..9c692c9a2 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardLifecycleActorFactory.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardLifecycleActorFactory.scala @@ -7,8 +7,8 @@ import cromwell.backend.standard.callcaching._ import cromwell.core.Dispatcher.BackendDispatcher import cromwell.core.path.Path import cromwell.core.{CallOutputs, Dispatcher} -import wdl4s.TaskCall -import wdl4s.expression.WdlStandardLibraryFunctions +import wdl4s.wdl.WdlTaskCall +import wdl4s.wdl.expression.WdlStandardLibraryFunctions /** * May be extended for using the standard sync/async backend pattern. @@ -73,16 +73,16 @@ trait StandardLifecycleActorFactory extends BackendLifecycleActorFactory { */ lazy val finalizationActorClassOption: Option[Class[_ <: StandardFinalizationActor]] = Option(classOf[StandardFinalizationActor]) - override def workflowInitializationActorProps(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[TaskCall], - serviceRegistryActor: ActorRef): Option[Props] = { - val params = workflowInitializationActorParams(workflowDescriptor, ioActor, calls, serviceRegistryActor) + override def workflowInitializationActorProps(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[WdlTaskCall], + serviceRegistryActor: ActorRef, restart: Boolean): Option[Props] = { + val params = workflowInitializationActorParams(workflowDescriptor, ioActor, calls, serviceRegistryActor, restart) val props = Props(initializationActorClass, params).withDispatcher(Dispatcher.BackendDispatcher) Option(props) } - def workflowInitializationActorParams(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[TaskCall], - serviceRegistryActor: ActorRef): StandardInitializationActorParams = { - DefaultInitializationActorParams(workflowDescriptor, ioActor, calls, serviceRegistryActor, configurationDescriptor) + def workflowInitializationActorParams(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[WdlTaskCall], + serviceRegistryActor: ActorRef, restarting: Boolean): StandardInitializationActorParams = { + DefaultInitializationActorParams(workflowDescriptor, ioActor, calls, serviceRegistryActor, configurationDescriptor, restarting) } override def jobExecutionActorProps(jobDescriptor: BackendJobDescriptor, @@ -152,7 +152,7 @@ trait StandardLifecycleActorFactory extends BackendLifecycleActorFactory { jobDescriptor, initializationDataOption, serviceRegistryActor, ioActor, configurationDescriptor) } - override def workflowFinalizationActorProps(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[TaskCall], + override def workflowFinalizationActorProps(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[WdlTaskCall], jobExecutionMap: JobExecutionMap, workflowOutputs: CallOutputs, initializationData: Option[BackendInitializationData]): Option[Props] = { finalizationActorClassOption map { finalizationActorClass => @@ -162,7 +162,7 @@ trait StandardLifecycleActorFactory extends BackendLifecycleActorFactory { } } - def workflowFinalizationActorParams(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[TaskCall], + def workflowFinalizationActorParams(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[WdlTaskCall], jobExecutionMap: JobExecutionMap, workflowOutputs: CallOutputs, initializationDataOption: Option[BackendInitializationData]): StandardFinalizationActorParams = { diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardSyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardSyncExecutionActor.scala index c3cb05143..180f4d345 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardSyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardSyncExecutionActor.scala @@ -10,7 +10,6 @@ import cromwell.core.Dispatcher import cromwell.services.keyvalue.KeyValueServiceActor._ import scala.concurrent.{Future, Promise} -import scala.language.existentials trait StandardSyncExecutionActorParams extends StandardJobExecutionActorParams { /** The class for creating an async backend. */ diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala index 588028967..e79f2e863 100644 --- a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala @@ -19,7 +19,7 @@ import cromwell.core.io._ import cromwell.core.logging.JobLogging import cromwell.core.path.{Path, PathCopier} import cromwell.core.simpleton.{WdlValueBuilder, WdlValueSimpleton} -import wdl4s.values.WdlFile +import wdl4s.wdl.values.WdlFile import scala.util.{Failure, Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardFileHashingActor.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardFileHashingActor.scala index 53a05ae8d..bcb08d333 100644 --- a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardFileHashingActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardFileHashingActor.scala @@ -9,7 +9,7 @@ import cromwell.core.JobKey import cromwell.core.callcaching._ import cromwell.core.io._ import cromwell.core.logging.JobLogging -import wdl4s.values.WdlFile +import wdl4s.wdl.values.WdlFile import scala.util.{Failure, Success, Try} @@ -72,11 +72,11 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor } // Hash Success - case (fileHashRequest: SingleFileHashRequest, response @ IoSuccess(_, result: String)) => + case (fileHashRequest: SingleFileHashRequest, IoSuccess(_, result: String)) => context.parent ! FileHashResponse(HashResult(fileHashRequest.hashKey, HashValue(result))) // Hash Failure - case (fileHashRequest: SingleFileHashRequest, response @ IoFailure(_, failure: Throwable)) => + case (fileHashRequest: SingleFileHashRequest, IoFailure(_, failure: Throwable)) => context.parent ! HashingFailedMessage(fileHashRequest.file.value, failure) case other => diff --git a/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCode.scala b/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCode.scala index 0639b4f42..ed2bcfad0 100644 --- a/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCode.scala +++ b/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCode.scala @@ -1,6 +1,6 @@ package cromwell.backend.validation -import wdl4s.types.{WdlArrayType, WdlBooleanType, WdlIntegerType, WdlType} +import wdl4s.wdl.types._ object ContinueOnReturnCode { val validWdlTypes = Set[WdlType](WdlArrayType(WdlIntegerType), WdlBooleanType, WdlIntegerType) diff --git a/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCodeValidation.scala b/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCodeValidation.scala index 03e9d5b1a..27ae8170e 100644 --- a/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCodeValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/ContinueOnReturnCodeValidation.scala @@ -7,8 +7,8 @@ import cats.syntax.validated._ import com.typesafe.config.Config import cromwell.backend.validation.RuntimeAttributesValidation._ import lenthall.validation.ErrorOr._ -import wdl4s.types.{WdlArrayType, WdlIntegerType, WdlStringType, WdlType} -import wdl4s.values.{WdlArray, WdlBoolean, WdlInteger, WdlString, WdlValue} +import wdl4s.wdl.types._ +import wdl4s.wdl.values._ import scala.util.Try diff --git a/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala b/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala index 0d398a324..60950ed00 100644 --- a/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala @@ -3,8 +3,8 @@ package cromwell.backend.validation import cats.syntax.validated._ import com.typesafe.config.Config import lenthall.validation.ErrorOr.ErrorOr -import wdl4s.types.WdlIntegerType -import wdl4s.values.{WdlInteger, WdlValue} +import wdl4s.wdl.types.WdlIntegerType +import wdl4s.wdl.values.{WdlInteger, WdlValue} /** * Validates the "cpu" runtime attribute an Integer greater than 0, returning the value as an `Int`. diff --git a/backend/src/main/scala/cromwell/backend/validation/DockerValidation.scala b/backend/src/main/scala/cromwell/backend/validation/DockerValidation.scala index f793d1397..788f7a7e6 100644 --- a/backend/src/main/scala/cromwell/backend/validation/DockerValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/DockerValidation.scala @@ -2,7 +2,7 @@ package cromwell.backend.validation import cats.syntax.validated._ import lenthall.validation.ErrorOr.ErrorOr -import wdl4s.values.{WdlString, WdlValue} +import wdl4s.wdl.values.{WdlString, WdlValue} /** * Validates the "docker" runtime attribute as a String, returning it as `String`. diff --git a/backend/src/main/scala/cromwell/backend/validation/FailOnStderrValidation.scala b/backend/src/main/scala/cromwell/backend/validation/FailOnStderrValidation.scala index c57aaa246..bf4f024c6 100644 --- a/backend/src/main/scala/cromwell/backend/validation/FailOnStderrValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/FailOnStderrValidation.scala @@ -1,7 +1,7 @@ package cromwell.backend.validation import com.typesafe.config.Config -import wdl4s.values.{WdlBoolean, WdlValue} +import wdl4s.wdl.values.{WdlBoolean, WdlValue} /** * Validates the "failOnStderr" runtime attribute as a Boolean or a String 'true' or 'false', returning the value as a diff --git a/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala b/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala index 142e541d9..c2c97343a 100644 --- a/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala @@ -5,8 +5,8 @@ import com.typesafe.config.Config import cromwell.backend.MemorySize import lenthall.validation.ErrorOr._ import wdl4s.parser.MemoryUnit -import wdl4s.types.{WdlIntegerType, WdlStringType} -import wdl4s.values.{WdlInteger, WdlString, WdlValue} +import wdl4s.wdl.types.{WdlIntegerType, WdlStringType} +import wdl4s.wdl.values.{WdlInteger, WdlString, WdlValue} import scala.util.{Failure, Success} diff --git a/backend/src/main/scala/cromwell/backend/validation/PrimitiveRuntimeAttributesValidation.scala b/backend/src/main/scala/cromwell/backend/validation/PrimitiveRuntimeAttributesValidation.scala index aa07afcdc..477f9f9c7 100644 --- a/backend/src/main/scala/cromwell/backend/validation/PrimitiveRuntimeAttributesValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/PrimitiveRuntimeAttributesValidation.scala @@ -2,8 +2,8 @@ package cromwell.backend.validation import cats.syntax.validated._ import lenthall.validation.ErrorOr.ErrorOr -import wdl4s.types._ -import wdl4s.values.{WdlBoolean, WdlFloat, WdlInteger, WdlPrimitive, WdlString, WdlValue} +import wdl4s.wdl.types._ +import wdl4s.wdl.values._ /** * Validates one of the wdl primitive types: Boolean, Float, Integer, or String. WdlFile is not supported. diff --git a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesDefault.scala b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesDefault.scala index 30ffe1043..a111fe554 100644 --- a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesDefault.scala +++ b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesDefault.scala @@ -4,8 +4,8 @@ import cats.data.ValidatedNel import cats.syntax.validated._ import cromwell.core.{EvaluatedRuntimeAttributes, OptionNotFoundException, WorkflowOptions} import lenthall.util.TryUtil -import wdl4s.types.WdlType -import wdl4s.values.WdlValue +import wdl4s.wdl.types.WdlType +import wdl4s.wdl.values.WdlValue import scala.util.{Failure, Try} diff --git a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala index 907cf6ea9..ffba5bbd8 100644 --- a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala @@ -6,10 +6,10 @@ import com.typesafe.config.Config import cromwell.backend.{MemorySize, RuntimeAttributeDefinition} import lenthall.validation.ErrorOr._ import org.slf4j.Logger -import wdl4s.expression.PureStandardLibraryFunctions -import wdl4s.types.{WdlBooleanType, WdlIntegerType, WdlStringType, WdlType} -import wdl4s.values._ -import wdl4s.{NoLookup, WdlExpression} +import wdl4s.wdl.expression.PureStandardLibraryFunctions +import wdl4s.wdl.types.{WdlBooleanType, WdlIntegerType, WdlStringType, WdlType} +import wdl4s.wdl.values._ +import wdl4s.wdl.{NoLookup, WdlExpression} import scala.util.{Failure, Success} diff --git a/backend/src/main/scala/cromwell/backend/validation/ValidatedRuntimeAttributesBuilder.scala b/backend/src/main/scala/cromwell/backend/validation/ValidatedRuntimeAttributesBuilder.scala index ed3981817..c125e24b8 100644 --- a/backend/src/main/scala/cromwell/backend/validation/ValidatedRuntimeAttributesBuilder.scala +++ b/backend/src/main/scala/cromwell/backend/validation/ValidatedRuntimeAttributesBuilder.scala @@ -6,8 +6,8 @@ import cromwell.backend.RuntimeAttributeDefinition import lenthall.exception.MessageAggregation import lenthall.validation.ErrorOr._ import org.slf4j.Logger -import wdl4s.types.WdlType -import wdl4s.values.WdlValue +import wdl4s.wdl.types.WdlType +import wdl4s.wdl.values.WdlValue final case class ValidatedRuntimeAttributes(attributes: Map[String, Any]) diff --git a/backend/src/main/scala/cromwell/backend/wdl/Command.scala b/backend/src/main/scala/cromwell/backend/wdl/Command.scala index b0e3c9294..5e0d1d424 100644 --- a/backend/src/main/scala/cromwell/backend/wdl/Command.scala +++ b/backend/src/main/scala/cromwell/backend/wdl/Command.scala @@ -1,9 +1,9 @@ package cromwell.backend.wdl import cromwell.backend.BackendJobDescriptor -import wdl4s.EvaluatedTaskInputs -import wdl4s.expression.WdlFunctions -import wdl4s.values.WdlValue +import wdl4s.wdl.EvaluatedTaskInputs +import wdl4s.wdl.expression.WdlFunctions +import wdl4s.wdl.values.WdlValue import scala.util.{Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/wdl/OutputEvaluator.scala b/backend/src/main/scala/cromwell/backend/wdl/OutputEvaluator.scala index d41d25b19..93d3563b6 100644 --- a/backend/src/main/scala/cromwell/backend/wdl/OutputEvaluator.scala +++ b/backend/src/main/scala/cromwell/backend/wdl/OutputEvaluator.scala @@ -2,9 +2,9 @@ package cromwell.backend.wdl import cromwell.backend.BackendJobDescriptor import cromwell.core.JobOutput -import wdl4s.LocallyQualifiedName -import wdl4s.expression.WdlStandardLibraryFunctions -import wdl4s.values.WdlValue +import wdl4s.wdl.LocallyQualifiedName +import wdl4s.wdl.expression.WdlStandardLibraryFunctions +import wdl4s.wdl.values.WdlValue import scala.util.{Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/wdl/ReadLikeFunctions.scala b/backend/src/main/scala/cromwell/backend/wdl/ReadLikeFunctions.scala index 073bb6a91..349cefbe6 100644 --- a/backend/src/main/scala/cromwell/backend/wdl/ReadLikeFunctions.scala +++ b/backend/src/main/scala/cromwell/backend/wdl/ReadLikeFunctions.scala @@ -2,10 +2,10 @@ package cromwell.backend.wdl import cromwell.backend.MemorySize import cromwell.core.path.PathFactory -import wdl4s.expression.WdlStandardLibraryFunctions +import wdl4s.wdl.expression.WdlStandardLibraryFunctions import wdl4s.parser.MemoryUnit -import wdl4s.types.{WdlArrayType, WdlFileType, WdlObjectType, WdlStringType} -import wdl4s.values._ +import wdl4s.wdl.types._ +import wdl4s.wdl.values._ import scala.util.{Failure, Success, Try} @@ -54,7 +54,7 @@ trait ReadLikeFunctions extends PathFactory { this: WdlStandardLibraryFunctions fileSize <- fileSize(fileName) _ = if (fileSize > limit) { val errorMsg = s"Use of $fileName failed because the file was too big ($fileSize bytes when only files of up to $limit bytes are permissible" - throw new FileSizeTooBig(errorMsg) + throw FileSizeTooBig(errorMsg) } } yield () @@ -119,14 +119,39 @@ trait ReadLikeFunctions extends PathFactory { this: WdlStandardLibraryFunctions override def read_boolean(params: Seq[Try[WdlValue]]): Try[WdlBoolean] = read_string(params) map { s => WdlBoolean(java.lang.Boolean.parseBoolean(s.value.trim.toLowerCase)) } + protected def size(file: WdlValue): Try[Double] = Try(buildPath(file.valueString).size.toDouble) + + /** + * Gets the size of a file. + * + * @param params First parameter must be a File or File? or coerceable to one. The second is an optional string containing the size unit (eg "MB", "GiB") + */ override def size(params: Seq[Try[WdlValue]]): Try[WdlFloat] = { + // Inner function: get the memory unit from the second (optional) parameter def toUnit(wdlValue: Try[WdlValue]) = wdlValue flatMap { unit => Try(MemoryUnit.fromSuffix(unit.valueString)) } + // Inner function: is this a file type, or an optional containing a file type? + def isOptionalOfFileType(wdlType: WdlType): Boolean = wdlType match { + case f if WdlFileType.isCoerceableFrom(f) => true + case WdlOptionalType(inner) => isOptionalOfFileType(inner) + case _ => false + } + + // Inner function: Get the file size, allowing for unpacking of optionals + def optionalSafeFileSize(value: WdlValue): Try[Double] = value match { + case f if f.isInstanceOf[WdlFile] || WdlFileType.isCoerceableFrom(f.wdlType) => size(f) + case WdlOptionalValue(_, Some(o)) => optionalSafeFileSize(o) + case WdlOptionalValue(f, None) if isOptionalOfFileType(f) => Success(0d) + case _ => Failure(new Exception(s"The 'size' method expects a 'File' or 'File?' argument but instead got ${value.wdlType.toWdlString}.")) + } + + // Inner function: get the file size and convert into the requested memory unit def fileSize(wdlValue: Try[WdlValue], convertTo: Try[MemoryUnit] = Success(MemoryUnit.Bytes)) = { for { value <- wdlValue unit <- convertTo - } yield MemorySize(buildPath(value.valueString).size.toDouble, MemoryUnit.Bytes).to(unit).amount + fileSize <- optionalSafeFileSize(value) + } yield MemorySize(fileSize, MemoryUnit.Bytes).to(unit).amount } params match { diff --git a/backend/src/main/scala/cromwell/backend/wdl/WdlFileMapper.scala b/backend/src/main/scala/cromwell/backend/wdl/WdlFileMapper.scala index 1c39c4d89..746b71ecd 100644 --- a/backend/src/main/scala/cromwell/backend/wdl/WdlFileMapper.scala +++ b/backend/src/main/scala/cromwell/backend/wdl/WdlFileMapper.scala @@ -1,7 +1,7 @@ package cromwell.backend.wdl import lenthall.util.TryUtil -import wdl4s.values.{WdlArray, WdlFile, WdlMap, WdlOptionalValue, WdlPair, WdlValue} +import wdl4s.wdl.values._ import scala.util.{Success, Try} diff --git a/backend/src/main/scala/cromwell/backend/wdl/WriteFunctions.scala b/backend/src/main/scala/cromwell/backend/wdl/WriteFunctions.scala index b8568f840..28ff91145 100644 --- a/backend/src/main/scala/cromwell/backend/wdl/WriteFunctions.scala +++ b/backend/src/main/scala/cromwell/backend/wdl/WriteFunctions.scala @@ -1,10 +1,10 @@ package cromwell.backend.wdl import cromwell.core.path.Path -import wdl4s.TsvSerializable -import wdl4s.expression.WdlStandardLibraryFunctions -import wdl4s.types._ -import wdl4s.values._ +import wdl4s.wdl.TsvSerializable +import wdl4s.wdl.expression.WdlStandardLibraryFunctions +import wdl4s.wdl.types._ +import wdl4s.wdl.values._ import scala.util.{Failure, Try} diff --git a/backend/src/test/scala/cromwell/backend/BackendSpec.scala b/backend/src/test/scala/cromwell/backend/BackendSpec.scala index 0e20c15cd..ec7d92949 100644 --- a/backend/src/test/scala/cromwell/backend/BackendSpec.scala +++ b/backend/src/test/scala/cromwell/backend/BackendSpec.scala @@ -11,9 +11,9 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} import org.specs2.mock.Mockito import spray.json.{JsObject, JsValue} -import wdl4s._ -import wdl4s.expression.NoFunctions -import wdl4s.values.WdlValue +import wdl4s.wdl._ +import wdl4s.wdl.expression.NoFunctions +import wdl4s.wdl.values.WdlValue trait BackendSpec extends ScalaFutures with Matchers with Mockito { @@ -23,7 +23,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito { executeJobAndAssertOutputs(backend, workflow.expectedResponse) } - def buildWorkflowDescriptor(workflowSource: WdlSource, + def buildWorkflowDescriptor(workflowSource: WorkflowSource, inputs: Map[String, WdlValue] = Map.empty, options: WorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue])), runtime: String = "") = { @@ -58,7 +58,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito { BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, NoDocker, Map.empty) } - def jobDescriptorFromSingleCallWorkflow(wdl: WdlSource, + def jobDescriptorFromSingleCallWorkflow(wdl: WorkflowSource, options: WorkflowOptions, runtimeAttributeDefinitions: Set[RuntimeAttributeDefinition]): BackendJobDescriptor = { val workflowDescriptor = buildWorkflowDescriptor(wdl) @@ -70,7 +70,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito { BackendJobDescriptor(workflowDescriptor, jobKey, runtimeAttributes, inputDeclarations, NoDocker, Map.empty) } - def jobDescriptorFromSingleCallWorkflow(wdl: WdlSource, + def jobDescriptorFromSingleCallWorkflow(wdl: WorkflowSource, runtime: String, attempt: Int, options: WorkflowOptions, diff --git a/backend/src/test/scala/cromwell/backend/BackendWorkflowInitializationActorSpec.scala b/backend/src/test/scala/cromwell/backend/BackendWorkflowInitializationActorSpec.scala index d26df12f6..7438282ba 100644 --- a/backend/src/test/scala/cromwell/backend/BackendWorkflowInitializationActorSpec.scala +++ b/backend/src/test/scala/cromwell/backend/BackendWorkflowInitializationActorSpec.scala @@ -7,9 +7,9 @@ import cromwell.backend.validation.{ContinueOnReturnCodeFlag, ContinueOnReturnCo import cromwell.core.{TestKitSuite, WorkflowOptions} import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{FlatSpecLike, Matchers} -import wdl4s.types._ -import wdl4s.values.{WdlArray, WdlBoolean, WdlFloat, WdlInteger, WdlString, WdlValue} -import wdl4s.{TaskCall, WdlExpression} +import wdl4s.wdl.types._ +import wdl4s.wdl.values.{WdlArray, WdlBoolean, WdlFloat, WdlInteger, WdlString, WdlValue} +import wdl4s.wdl.{WdlTaskCall, WdlExpression} import scala.concurrent.Future import scala.util.Try @@ -183,7 +183,7 @@ class TestPredicateBackendWorkflowInitializationActor extends BackendWorkflowIni override val serviceRegistryActor: ActorRef = context.system.deadLetters - override def calls: Set[TaskCall] = throw new NotImplementedError("calls") + override def calls: Set[WdlTaskCall] = throw new NotImplementedError("calls") override protected def runtimeAttributeValidators: Map[String, (Option[WdlValue]) => Boolean] = throw new NotImplementedError("runtimeAttributeValidators") diff --git a/backend/src/test/scala/cromwell/backend/io/JobPathsSpec.scala b/backend/src/test/scala/cromwell/backend/io/JobPathsSpec.scala index 793680b6c..eeccc39e1 100644 --- a/backend/src/test/scala/cromwell/backend/io/JobPathsSpec.scala +++ b/backend/src/test/scala/cromwell/backend/io/JobPathsSpec.scala @@ -4,7 +4,7 @@ import com.typesafe.config.ConfigFactory import cromwell.backend.{BackendConfigurationDescriptor, BackendJobDescriptorKey, BackendSpec, TestConfig} import cromwell.core.path.DefaultPathBuilder import org.scalatest.{FlatSpec, Matchers} -import wdl4s.TaskCall +import wdl4s.wdl.WdlTaskCall class JobPathsSpec extends FlatSpec with Matchers with BackendSpec { @@ -29,7 +29,7 @@ class JobPathsSpec extends FlatSpec with Matchers with BackendSpec { "JobPaths" should "provide correct paths for a job" in { val wd = buildWorkflowDescriptor(TestWorkflows.HelloWorld) - val call: TaskCall = wd.workflow.taskCalls.head + val call: WdlTaskCall = wd.workflow.taskCalls.head val jobKey = BackendJobDescriptorKey(call, None, 1) val workflowPaths = new WorkflowPathsWithDocker(wd, backendConfig) val jobPaths = new JobPathsWithDocker(workflowPaths, jobKey) diff --git a/backend/src/test/scala/cromwell/backend/io/WorkflowPathsSpec.scala b/backend/src/test/scala/cromwell/backend/io/WorkflowPathsSpec.scala index c16348da9..66b48467b 100644 --- a/backend/src/test/scala/cromwell/backend/io/WorkflowPathsSpec.scala +++ b/backend/src/test/scala/cromwell/backend/io/WorkflowPathsSpec.scala @@ -6,7 +6,7 @@ import cromwell.core.path.DefaultPathBuilder import cromwell.core.{JobKey, WorkflowId} import org.mockito.Mockito._ import org.scalatest.{FlatSpec, Matchers} -import wdl4s.{Call, Workflow} +import wdl4s.wdl.{WdlCall, WdlWorkflow} class WorkflowPathsSpec extends FlatSpec with Matchers with BackendSpec { @@ -29,22 +29,22 @@ class WorkflowPathsSpec extends FlatSpec with Matchers with BackendSpec { when(backendConfig.getString(any[String])).thenReturn("local-cromwell-executions") // This is the folder defined in the config as the execution root dir val rootWd = mock[BackendWorkflowDescriptor] - val rootWorkflow = mock[Workflow] + val rootWorkflow = mock[WdlWorkflow] val rootWorkflowId = WorkflowId.randomId() rootWorkflow.unqualifiedName returns "rootWorkflow" rootWd.workflow returns rootWorkflow rootWd.id returns rootWorkflowId val subWd = mock[BackendWorkflowDescriptor] - val subWorkflow = mock[Workflow] + val subWorkflow = mock[WdlWorkflow] val subWorkflowId = WorkflowId.randomId() subWorkflow.unqualifiedName returns "subWorkflow" subWd.workflow returns subWorkflow subWd.id returns subWorkflowId - val call1 = mock[Call] + val call1 = mock[WdlCall] call1.unqualifiedName returns "call1" - val call2 = mock[Call] + val call2 = mock[WdlCall] call2.unqualifiedName returns "call2" val jobKey = new JobKey { diff --git a/backend/src/test/scala/cromwell/backend/standard/StandardValidatedRuntimeAttributesBuilderSpec.scala b/backend/src/test/scala/cromwell/backend/standard/StandardValidatedRuntimeAttributesBuilderSpec.scala index e4e18bbfb..7c78177b3 100644 --- a/backend/src/test/scala/cromwell/backend/standard/StandardValidatedRuntimeAttributesBuilderSpec.scala +++ b/backend/src/test/scala/cromwell/backend/standard/StandardValidatedRuntimeAttributesBuilderSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.{Matchers, WordSpecLike} import org.slf4j.{Logger, LoggerFactory} import org.specs2.mock.Mockito import spray.json.{JsArray, JsBoolean, JsNumber, JsObject, JsValue} -import wdl4s.values.{WdlBoolean, WdlInteger, WdlString, WdlValue} +import wdl4s.wdl.values.{WdlBoolean, WdlInteger, WdlString, WdlValue} class StandardValidatedRuntimeAttributesBuilderSpec extends WordSpecLike with Matchers with Mockito { diff --git a/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesDefaultSpec.scala b/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesDefaultSpec.scala index 7420c2d89..05edd089e 100644 --- a/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesDefaultSpec.scala +++ b/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesDefaultSpec.scala @@ -4,8 +4,8 @@ import cromwell.backend.validation.RuntimeAttributesDefault._ import cromwell.core.WorkflowOptions import org.scalatest.{FlatSpec, Matchers} import spray.json._ -import wdl4s.types._ -import wdl4s.values.{WdlArray, WdlBoolean, WdlInteger, WdlString} +import wdl4s.wdl.types._ +import wdl4s.wdl.values.{WdlArray, WdlBoolean, WdlInteger, WdlString} class RuntimeAttributesDefaultSpec extends FlatSpec with Matchers { diff --git a/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesValidationSpec.scala b/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesValidationSpec.scala index 4cd60eef7..76e47ce2a 100644 --- a/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesValidationSpec.scala +++ b/backend/src/test/scala/cromwell/backend/validation/RuntimeAttributesValidationSpec.scala @@ -5,8 +5,8 @@ import cats.syntax.validated._ import com.typesafe.config.{Config, ConfigFactory} import cromwell.backend.TestConfig import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -import wdl4s.types.{WdlArrayType, WdlIntegerType, WdlStringType} -import wdl4s.values.{WdlArray, WdlBoolean, WdlInteger, WdlString} +import wdl4s.wdl.types.{WdlArrayType, WdlIntegerType, WdlStringType} +import wdl4s.wdl.values.{WdlArray, WdlBoolean, WdlInteger, WdlString} class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { @@ -37,7 +37,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateDocker(dockerValue, "Failed to get Docker mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Failed to get Docker mandatory key from runtime attributes") } } @@ -47,7 +47,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateDocker(dockerValue, "Failed to get Docker mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting docker runtime attribute to be a String") } } @@ -87,7 +87,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateFailOnStderr(failOnStderrValue, "Failed to get failOnStderr mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting failOnStderr runtime attribute to be a Boolean or a String with values of 'true' or 'false'") } } @@ -146,7 +146,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateContinueOnReturnCode(continueOnReturnCodeValue, "Failed to get continueOnReturnCode mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting continueOnReturnCode runtime attribute to be either a Boolean, a String 'true' or 'false', or an Array[Int]") } @@ -167,7 +167,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateContinueOnReturnCode(continueOnReturnCodeValue, "Failed to get continueOnReturnCode mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting continueOnReturnCode runtime attribute to be either a Boolean, a String 'true' or 'false', or an Array[Int]") } } @@ -197,7 +197,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateMemory(memoryValue, "Failed to get memory mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting memory runtime attribute value greater than 0 but got -1") } } @@ -218,7 +218,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateMemory(memoryValue, "Failed to get memory mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting memory runtime attribute value greater than 0 but got 0.0") } } @@ -228,7 +228,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateMemory(memoryValue, "Failed to get memory mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting memory runtime attribute to be an Integer or String with format '8 GB'. Exception: value should be of the form 'X Unit' where X is a number, e.g. 8 GB") } } @@ -238,7 +238,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateMemory(memoryValue, "Failed to get memory mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting memory runtime attribute to be an Integer or String with format '8 GB'. Exception: Not supported WDL type value") } } @@ -248,7 +248,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateMemory(memoryValue, "Failed to get memory mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Failed to get memory mandatory key from runtime attributes") } } @@ -268,7 +268,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateCpu(cpuValue, "Failed to get cpu mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Expecting cpu runtime attribute value greater than 0") } } @@ -278,7 +278,7 @@ class RuntimeAttributesValidationSpec extends WordSpecLike with Matchers with Be val result = RuntimeAttributesValidation.validateMemory(cpuValue, "Failed to get cpu mandatory key from runtime attributes".invalidNel) result match { - case Valid(x) => fail("A failure was expected.") + case Valid(_) => fail("A failure was expected.") case Invalid(e) => assert(e.head == "Failed to get cpu mandatory key from runtime attributes") } } diff --git a/backend/src/test/scala/cromwell/backend/wdl/FileSizeSpec.scala b/backend/src/test/scala/cromwell/backend/wdl/FileSizeSpec.scala index b0cc1166c..bc265b9aa 100644 --- a/backend/src/test/scala/cromwell/backend/wdl/FileSizeSpec.scala +++ b/backend/src/test/scala/cromwell/backend/wdl/FileSizeSpec.scala @@ -8,7 +8,7 @@ import cromwell.backend.standard.{DefaultStandardExpressionFunctionsParams, Stan import cromwell.core.CallContext import cromwell.core.path.DefaultPathBuilder import org.scalatest.{FlatSpec, Matchers} -import wdl4s.values._ +import wdl4s.wdl.values._ import com.google.common.io.Files import fs2.{Task, Stream} @@ -73,7 +73,7 @@ class FileSizeSpec extends FlatSpec with Matchers { def testOver() = { testInner(n + 1, { - case Failure(s: FileSizeTooBig) => //success + case Failure(_: FileSizeTooBig) => //success case t => throw new RuntimeException(s"should not have eaten this file that is too big! msg: $t") }) } @@ -81,8 +81,8 @@ class FileSizeSpec extends FlatSpec with Matchers { def testUnder() = { testInner(n - 1, { case Success(_) => - case Failure(nfe: NumberFormatException) => //we're not testing parsing - case Failure(uoe: UnsupportedOperationException) => //we're not testing tsv compatibility + case Failure(_: NumberFormatException) => //we're not testing parsing + case Failure(_: UnsupportedOperationException) => //we're not testing tsv compatibility case Failure(t) => throw t }) } diff --git a/backend/src/test/scala/cromwell/backend/wdl/ReadLikeFunctionsSpec.scala b/backend/src/test/scala/cromwell/backend/wdl/ReadLikeFunctionsSpec.scala new file mode 100644 index 000000000..ad4716b95 --- /dev/null +++ b/backend/src/test/scala/cromwell/backend/wdl/ReadLikeFunctionsSpec.scala @@ -0,0 +1,99 @@ +package cromwell.backend.wdl + +import cromwell.core.path.PathBuilder +import org.apache.commons.lang3.NotImplementedException +import org.scalatest.{FlatSpec, Matchers} +import wdl4s.wdl.expression.PureStandardLibraryFunctionsLike +import wdl4s.wdl.types.{WdlFileType, WdlIntegerType, WdlOptionalType} +import wdl4s.wdl.values.{WdlFloat, WdlInteger, WdlOptionalValue, WdlSingleFile, WdlString, WdlValue} + +import scala.util.{Failure, Success, Try} + +class ReadLikeFunctionsSpec extends FlatSpec with Matchers { + + behavior of "ReadLikeFunctions.size" + + it should "correctly report a 2048 byte file, in bytes by default" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlSingleFile("blah")))) should be(Success(WdlFloat(2048d))) + } + + it should "correctly report a 2048 byte file, in bytes" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlSingleFile("blah")), Success(WdlString("B")))) should be(Success(WdlFloat(2048d))) + } + + it should "correctly report a 2048 byte file, in KB" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlSingleFile("blah")), Success(WdlString("KB")))) should be(Success(WdlFloat(2.048d))) + } + + it should "correctly report a 2048 byte file, in KiB" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlSingleFile("blah")), Success(WdlString("Ki")))) should be(Success(WdlFloat(2d))) + } + + it should "correctly report the size of a supplied, optional, 2048 byte file" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlOptionalValue(WdlFileType, Some(WdlSingleFile("blah")))))) should be(Success(WdlFloat(2048d))) + } + + it should "correctly report the size of a supplied, optional optional, 2048 byte file" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlOptionalValue(WdlOptionalType(WdlFileType), Some(WdlOptionalValue(WdlFileType, Some(WdlSingleFile("blah")))))))) should be(Success(WdlFloat(2048d))) + } + + it should "correctly report the size of a supplied, optional, 2048 byte file, in MB" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlOptionalValue(WdlFileType, Some(WdlSingleFile("blah")))), Success(WdlString("MB")))) should be(Success(WdlFloat(0.002048d))) + } + + it should "correctly report that an unsupplied optional file is empty" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlOptionalValue(WdlFileType, None)))) should be(Success(WdlFloat(0d))) + } + + it should "correctly report that an unsupplied File?? is empty" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlOptionalValue(WdlOptionalType(WdlFileType), None)))) should be(Success(WdlFloat(0d))) + } + + it should "correctly report that an unsupplied optional file is empty, even in MB" in { + val readLike = new TestReadLikeFunctions(Success(2048d)) + readLike.size(Seq(Success(WdlOptionalValue(WdlFileType, None)), Success(WdlString("MB")))) should be(Success(WdlFloat(0d))) + } + + it should "refuse to report file sizes for Ints" in { + val readLike = new TestReadLikeFunctions(Failure(new Exception("Bad result: WdlIntegers shouldn't even be tried for getting file size"))) + val oops = readLike.size(Seq(Success(WdlInteger(7)))) + oops match { + case Success(x) => fail(s"Expected a string to not have a file length but instead got $x") + case Failure(e) => e.getMessage should be("The 'size' method expects a 'File' or 'File?' argument but instead got Int.") + } + } + + it should "refuse to report file sizes for Int?s" in { + val readLike = new TestReadLikeFunctions(Failure(new Exception("Bad result: WdlIntegers shouldn't even be tried for getting file size"))) + val oops = readLike.size(Seq(Success(WdlOptionalValue(WdlIntegerType, None)))) + oops match { + case Success(x) => fail(s"Expected a string to not have a file length but instead got $x") + case Failure(e) => e.getMessage should be("The 'size' method expects a 'File' or 'File?' argument but instead got Int?.") + } + } + + it should "pass on underlying size reading errors" in { + val readLike = new TestReadLikeFunctions(Failure(new Exception("'size' inner exception, expect me to be passed on"))) + val oops = readLike.size(Seq(Success(WdlSingleFile("blah")))) + oops match { + case Success(_) => fail(s"The 'size' engine function didn't return the error generated in the inner 'size' method") + case Failure(e) => e.getMessage should be("'size' inner exception, expect me to be passed on") + } + } +} + + +class TestReadLikeFunctions(sizeResult: Try[Double]) extends PureStandardLibraryFunctionsLike with ReadLikeFunctions { + override protected def size(file: WdlValue): Try[Double] = sizeResult + override def pathBuilders: List[PathBuilder] = throw new NotImplementedException("Didn't expect ReadLikefunctionsSpec to need pathBuilders") +} + diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 5439efbdf..865559f87 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -59,11 +59,33 @@ akka { # Note that without further configuration, all other actors run on the default dispatcher } + + coordinated-shutdown.phases { + abort-all-workflows { + # This phase is used to give time to Cromwell to abort all workflows upon shutdown. + # It's only used if system.abort-jobs-on-terminate = true + # This timeout can be adusted to give more or less time to Cromwell to abort workflows + timeout = 1 hour + depends-on = [service-unbind] + } + + stop-io-activity{ + # Adjust this timeout according to the maximum amount of time Cromwell + # should be allowed to spend flushing its database queues + timeout = 30 minutes + depends-on = [service-stop] + } + } } system { # If 'true', a SIGINT will trigger Cromwell to attempt to abort all currently running jobs before exiting - #abort-jobs-on-terminate = false + abort-jobs-on-terminate = false + + # If 'true', a SIGTERM or SIGINT will trigger Cromwell to attempt to gracefully shutdown in server mode, + # in particular clearing up all queued database writes before letting the JVM shut down. + # The shutdown is a multi-phase process, each phase having its own configurable timeout. See the Dev Wiki for more details. + graceful-server-shutdown = true # If 'true' then when Cromwell starts up, it tries to restart incomplete workflows workflow-restart = true @@ -82,13 +104,13 @@ system { # Default number of cache read workers number-of-cache-read-workers = 25 - + io { # Global Throttling - This is mostly useful for GCS and can be adjusted to match # the quota availble on the GCS API number-of-requests = 100000 per = 100 seconds - + # Number of times an I/O operation should be attempted before giving up and failing it. number-of-attempts = 5 } @@ -200,11 +222,11 @@ engine { # You will need to provide the engine with a gcs filesystem # Note that the default filesystem (local) is always available. filesystems { - # gcs { - # auth = "application-default" - # } + # gcs { + # auth = "application-default" + # } local { - enabled: true + enabled: true } } } @@ -242,7 +264,7 @@ backend { ${docker} ${script} """ - + # Root directory where Cromwell writes job results. This directory must be # visible and writeable by the Cromwell process as well as the jobs that Cromwell @@ -342,7 +364,7 @@ backend { # job-id-regex = "Job <(\\d+)>.*" # } #} - + #SLURM { # actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory" # config { @@ -498,6 +520,10 @@ backend { # # # Endpoint for APIs, no reason to change this unless directed by Google. # endpoint-url = "https://genomics.googleapis.com/" + # + # # Restrict access to VM metadata. Useful in cases when untrusted containers are running under a service + # # account not owned by the submitting user + # restrict-metadata-access = false # } # # filesystems { diff --git a/core/src/main/scala/cromwell/core/CallKey.scala b/core/src/main/scala/cromwell/core/CallKey.scala index 547eadabd..02a987431 100644 --- a/core/src/main/scala/cromwell/core/CallKey.scala +++ b/core/src/main/scala/cromwell/core/CallKey.scala @@ -1,7 +1,7 @@ package cromwell.core -import wdl4s.Call +import wdl4s.wdl.WdlCall trait CallKey extends JobKey { - def scope: Call + def scope: WdlCall } diff --git a/core/src/main/scala/cromwell/core/ConfigUtil.scala b/core/src/main/scala/cromwell/core/ConfigUtil.scala index 881fec686..40dae4784 100644 --- a/core/src/main/scala/cromwell/core/ConfigUtil.scala +++ b/core/src/main/scala/cromwell/core/ConfigUtil.scala @@ -7,7 +7,7 @@ import cats.syntax.validated._ import com.typesafe.config.{Config, ConfigException, ConfigValue} import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.{ClassTag, classTag} object ConfigUtil { @@ -15,7 +15,7 @@ object ConfigUtil { val validationLogger = LoggerFactory.getLogger("ConfigurationValidation") implicit class EnhancedConfig(val config: Config) extends AnyVal { - def keys = config.entrySet().toSet map { v: java.util.Map.Entry[String, ConfigValue] => v.getKey } + def keys = config.entrySet().asScala.toSet map { v: java.util.Map.Entry[String, ConfigValue] => v.getKey } /** * For keys that are in the configuration but not in the reference keySet, log a warning. @@ -37,14 +37,14 @@ object ConfigUtil { def validateString(key: String): ValidatedNel[String, String] = try { config.getString(key).validNel } catch { - case e: ConfigException.Missing => s"Could not find key: $key".invalidNel + case _: ConfigException.Missing => s"Could not find key: $key".invalidNel } def validateConfig(key: String): ValidatedNel[String, Config] = try { config.getConfig(key).validNel } catch { - case e: ConfigException.Missing => s"Could not find key: $key".invalidNel - case e: ConfigException.WrongType => s"key $key cannot be parsed to a Config".invalidNel + case _: ConfigException.Missing => s"Could not find key: $key".invalidNel + case _: ConfigException.WrongType => s"key $key cannot be parsed to a Config".invalidNel } } diff --git a/core/src/main/scala/cromwell/core/CromwellUserGuardianStrategy.scala b/core/src/main/scala/cromwell/core/CromwellUserGuardianStrategy.scala index 52af92e5c..79e1621ca 100644 --- a/core/src/main/scala/cromwell/core/CromwellUserGuardianStrategy.scala +++ b/core/src/main/scala/cromwell/core/CromwellUserGuardianStrategy.scala @@ -5,7 +5,7 @@ import akka.actor.{ActorInitializationException, OneForOneStrategy, SupervisorSt class CromwellUserGuardianStrategy extends SupervisorStrategyConfigurator { override def create(): SupervisorStrategy = OneForOneStrategy() { - case aie: ActorInitializationException => Escalate + case _: ActorInitializationException => Escalate case t => akka.actor.SupervisorStrategy.defaultDecider.applyOrElse(t, (_: Any) => Escalate) } } diff --git a/core/src/main/scala/cromwell/core/ExecutionStatus.scala b/core/src/main/scala/cromwell/core/ExecutionStatus.scala index 25319404c..b4ee84f9e 100644 --- a/core/src/main/scala/cromwell/core/ExecutionStatus.scala +++ b/core/src/main/scala/cromwell/core/ExecutionStatus.scala @@ -3,7 +3,8 @@ package cromwell.core object ExecutionStatus extends Enumeration { type ExecutionStatus = Value val NotStarted, QueuedInCromwell, Starting, Running, Failed, RetryableFailure, Done, Bypassed, Aborted = Value - val TerminalStatuses = Set(Failed, Done, Aborted, RetryableFailure, Bypassed) + val TerminalStatuses = Set(Failed, Done, Aborted, Bypassed) + val TerminalOrRetryableStatuses = TerminalStatuses + RetryableFailure implicit val ExecutionStatusOrdering = Ordering.by { status: ExecutionStatus => status match { @@ -20,9 +21,9 @@ object ExecutionStatus extends Enumeration { } implicit class EnhancedExecutionStatus(val status: ExecutionStatus) extends AnyVal { - def isTerminal: Boolean = { - TerminalStatuses contains status - } + def isTerminal: Boolean = TerminalStatuses contains status + + def isTerminalOrRetryable: Boolean = TerminalOrRetryableStatuses contains status def isDoneOrBypassed: Boolean = status == Done || status == Bypassed } diff --git a/core/src/main/scala/cromwell/core/JobKey.scala b/core/src/main/scala/cromwell/core/JobKey.scala index 9fd22b31e..f230134ce 100644 --- a/core/src/main/scala/cromwell/core/JobKey.scala +++ b/core/src/main/scala/cromwell/core/JobKey.scala @@ -1,9 +1,9 @@ package cromwell.core -import wdl4s.{GraphNode, Scope} +import wdl4s.wdl.{Scope, WdlGraphNode} trait JobKey { - def scope: Scope with GraphNode + def scope: Scope with WdlGraphNode def index: Option[Int] def attempt: Int def tag: String diff --git a/core/src/main/scala/cromwell/core/MonitoringCompanionActor.scala b/core/src/main/scala/cromwell/core/MonitoringCompanionActor.scala new file mode 100644 index 000000000..53f0e4de6 --- /dev/null +++ b/core/src/main/scala/cromwell/core/MonitoringCompanionActor.scala @@ -0,0 +1,46 @@ +package cromwell.core + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import cromwell.core.MonitoringCompanionActor._ +import cromwell.util.GracefulShutdownHelper.ShutdownCommand + +import scala.concurrent.duration._ +import scala.language.postfixOps + +object MonitoringCompanionActor { + sealed trait MonitoringCompanionCommand + private [core] case object AddWork extends MonitoringCompanionCommand + private [core] case object RemoveWork extends MonitoringCompanionCommand + private [core] def props(actorToMonitor: ActorRef) = Props(new MonitoringCompanionActor(actorToMonitor)) +} + +private [core] class MonitoringCompanionActor(actorToMonitor: ActorRef) extends Actor with ActorLogging { + private var workCount: Int = 0 + + override def receive = { + case AddWork => workCount += 1 + case RemoveWork => workCount -= 1 + case ShutdownCommand if workCount <= 0 => + context stop actorToMonitor + context stop self + case ShutdownCommand => + log.info(s"{} is still processing {} messages", actorToMonitor.path.name, workCount) + context.system.scheduler.scheduleOnce(1 second, self, ShutdownCommand)(context.dispatcher) + () + } +} + +trait MonitoringCompanionHelper { this: Actor => + private val monitoringActor = context.actorOf(MonitoringCompanionActor.props(self)) + private var shuttingDown: Boolean = false + + def addWork() = monitoringActor ! AddWork + def removeWork() = monitoringActor ! RemoveWork + + val monitoringReceive: Receive = { + case ShutdownCommand if !shuttingDown => + shuttingDown = true + monitoringActor ! ShutdownCommand + case ShutdownCommand => // Ignore if we're already shutting down + } +} diff --git a/core/src/main/scala/cromwell/core/WorkflowOptions.scala b/core/src/main/scala/cromwell/core/WorkflowOptions.scala index 709e667e1..701ea5428 100644 --- a/core/src/main/scala/cromwell/core/WorkflowOptions.scala +++ b/core/src/main/scala/cromwell/core/WorkflowOptions.scala @@ -152,7 +152,7 @@ case class WorkflowOptions(jsObject: JsObject) { } lazy val defaultRuntimeOptions = jsObject.fields.get(defaultRuntimeOptionKey) match { - case Some(jsObj: JsObject) => TryUtil.sequenceMap(jsObj.fields map { case (k, v) => k -> WorkflowOptions.getAsJson(k, jsObj) }) + case Some(jsObj: JsObject) => TryUtil.sequenceMap(jsObj.fields map { case (k, _) => k -> WorkflowOptions.getAsJson(k, jsObj) }) case Some(jsVal) => Failure(new IllegalArgumentException(s"Unsupported JsValue for $defaultRuntimeOptionKey: $jsVal. Expected a JSON object.")) case None => Failure(OptionNotFoundException(s"Cannot find definition for default runtime attributes")) } diff --git a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala index 5025690af..281cdf653 100644 --- a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala +++ b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala @@ -1,16 +1,16 @@ package cromwell.core -import wdl4s.{WdlJson, WdlSource} +import wdl4s.wdl.{WorkflowJson, WorkflowSource} /** * Represents the collection of source files that a user submits to run a workflow */ sealed trait WorkflowSourceFilesCollection { - def workflowSource: WdlSource - def inputsJson: WdlJson + def workflowSource: WorkflowSource + def inputsJson: WorkflowJson def workflowOptionsJson: WorkflowOptionsJson - def labelsJson: WdlJson + def labelsJson: WorkflowJson def workflowType: Option[WorkflowType] def workflowTypeVersion: Option[WorkflowTypeVersion] @@ -26,12 +26,12 @@ sealed trait WorkflowSourceFilesCollection { } object WorkflowSourceFilesCollection { - def apply(workflowSource: WdlSource, + def apply(workflowSource: WorkflowSource, workflowType: Option[WorkflowType], workflowTypeVersion: Option[WorkflowTypeVersion], - inputsJson: WdlJson, + inputsJson: WorkflowJson, workflowOptionsJson: WorkflowOptionsJson, - labelsJson: WdlJson, + labelsJson: WorkflowJson, importsFile: Option[Array[Byte]]): WorkflowSourceFilesCollection = importsFile match { case Some(imports) => WorkflowSourceFilesWithDependenciesZip(workflowSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson, imports) @@ -40,19 +40,19 @@ object WorkflowSourceFilesCollection { } } -final case class WorkflowSourceFilesWithoutImports(workflowSource: WdlSource, +final case class WorkflowSourceFilesWithoutImports(workflowSource: WorkflowSource, workflowType: Option[WorkflowType], workflowTypeVersion: Option[WorkflowTypeVersion], - inputsJson: WdlJson, + inputsJson: WorkflowJson, workflowOptionsJson: WorkflowOptionsJson, - labelsJson: WdlJson) extends WorkflowSourceFilesCollection + labelsJson: WorkflowJson) extends WorkflowSourceFilesCollection -final case class WorkflowSourceFilesWithDependenciesZip(workflowSource: WdlSource, +final case class WorkflowSourceFilesWithDependenciesZip(workflowSource: WorkflowSource, workflowType: Option[WorkflowType], workflowTypeVersion: Option[WorkflowTypeVersion], - inputsJson: WdlJson, + inputsJson: WorkflowJson, workflowOptionsJson: WorkflowOptionsJson, - labelsJson: WdlJson, + labelsJson: WorkflowJson, importsZip: Array[Byte]) extends WorkflowSourceFilesCollection { override def toString = s"WorkflowSourceFilesWithDependenciesZip($workflowSource, $inputsJson, $workflowOptionsJson, $labelsJson, <>)" } diff --git a/core/src/main/scala/cromwell/core/actor/BatchingDbWriter.scala b/core/src/main/scala/cromwell/core/actor/BatchingDbWriter.scala index 75be80654..db5be437b 100644 --- a/core/src/main/scala/cromwell/core/actor/BatchingDbWriter.scala +++ b/core/src/main/scala/cromwell/core/actor/BatchingDbWriter.scala @@ -1,7 +1,9 @@ package cromwell.core.actor -import akka.actor.ActorRef +import akka.actor.{ActorRef, Cancellable, FSM} import cats.data.NonEmptyVector +import cromwell.core.actor.BatchingDbWriter._ +import cromwell.util.GracefulShutdownHelper.ShutdownCommand import org.slf4j.LoggerFactory import scala.util.{Failure, Success, Try} @@ -52,3 +54,53 @@ object BatchingDbWriter { case class CommandAndReplyTo[C](command: C, replyTo: ActorRef) } + +/** + * Trait that contains some common batch-related and graceful shutdown logic. + * Be careful NOT to add a custom whenUnhandled state function when mixing in this trait as it will override the + * graceful shutdown handling logic. + * + * Note that there is more common logic that could be abstracted here. + */ +trait BatchingDbWriterActor { this: FSM[BatchingDbWriterState, BatchingDbWriterData] => + import scala.concurrent.duration._ + + private var shuttingDown: Boolean = false + + def isShuttingDown: Boolean = shuttingDown + def dbFlushRate: FiniteDuration + var periodicFlush: Option[Cancellable] = None + + override def preStart(): Unit = { + periodicFlush = Option(context.system.scheduler.schedule(0.seconds, dbFlushRate, self, ScheduledFlushToDb)(context.dispatcher)) + } + + /** + * WhenUnhandled state function that handles reception of ShutdownCommand and acts appropriately + */ + private val whenUnhandledFunction: StateFunction = { + case Event(ShutdownCommand, NoData) if stateName == WaitingToWrite => + periodicFlush foreach { _.cancel() } + context stop self + stay() + case Event(ShutdownCommand, _) if stateName == WaitingToWrite => + logger.info("{} flushing database writes...", self.path.name) + shuttingDown = true + // transitioning to WritingToDb triggers a FlushBatchToDb to be sent to self + goto(WritingToDb) + case Event(ShutdownCommand, _) if stateName == WritingToDb => + logger.info("{} waiting for database writes to be flushed...", self.path.name) + shuttingDown = true + stay() + } + + whenUnhandled(whenUnhandledFunction) + + onTransition { + case WaitingToWrite -> WritingToDb => self ! FlushBatchToDb + // When transitioning back to WaitingToWrite, if there's no data left to process, and we're trying to shutdown, then stop + case _ -> WaitingToWrite if shuttingDown && nextStateData == NoData => + periodicFlush foreach { _.cancel() } + context stop self + } +} diff --git a/core/src/main/scala/cromwell/core/actor/StreamActorHelper.scala b/core/src/main/scala/cromwell/core/actor/StreamActorHelper.scala index 1e87ea59b..cb4dd6e67 100644 --- a/core/src/main/scala/cromwell/core/actor/StreamActorHelper.scala +++ b/core/src/main/scala/cromwell/core/actor/StreamActorHelper.scala @@ -7,6 +7,7 @@ import akka.stream.QueueOfferResult.{Dropped, Enqueued, QueueClosed} import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import cromwell.core.actor.StreamActorHelper.{ActorRestartException, StreamCompleted, StreamFailed} import cromwell.core.actor.StreamIntegration._ +import cromwell.util.GracefulShutdownHelper.ShutdownCommand import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -46,10 +47,8 @@ trait StreamActorHelper[T <: StreamContext] { this: Actor with ActorLogging => override def preStart(): Unit = { stream.watchCompletion() onComplete { - case Success(_) => - self ! StreamCompleted - case Failure(failure) => - self ! StreamFailed(failure) + case Success(_) => self ! StreamCompleted + case Failure(failure) => self ! StreamFailed(failure) } } @@ -71,17 +70,17 @@ trait StreamActorHelper[T <: StreamContext] { this: Actor with ActorLogging => } private def streamReceive: Receive = { - case EnqueueResponse(Enqueued, commandContext: T @unchecked) => // Good ! + case ShutdownCommand => stream.complete() + case EnqueueResponse(Enqueued, _: T @unchecked) => // Good ! case EnqueueResponse(Dropped, commandContext) => backpressure(commandContext) // In any of the cases below, the stream is in a failed state, which will be caught by the watchCompletion hook and the // actor will be restarted case EnqueueResponse(QueueClosed, commandContext) => backpressure(commandContext) - case EnqueueResponse(QueueOfferResult.Failure(failure), commandContext) => backpressure(commandContext) - case FailedToEnqueue(throwable, commandContext) => backpressure(commandContext) + case EnqueueResponse(QueueOfferResult.Failure(_), commandContext) => backpressure(commandContext) + case FailedToEnqueue(_, commandContext) => backpressure(commandContext) - // Those 2 cases should never happen, as long as the strategy is Resume, but in case it does... - case StreamCompleted => restart(new IllegalStateException("Stream was completed unexpectedly")) + case StreamCompleted => context stop self case StreamFailed(failure) => restart(failure) } diff --git a/core/src/main/scala/cromwell/core/core.scala b/core/src/main/scala/cromwell/core/core.scala index 8cfe7c2f1..7a5831bbe 100644 --- a/core/src/main/scala/cromwell/core/core.scala +++ b/core/src/main/scala/cromwell/core/core.scala @@ -2,7 +2,7 @@ package cromwell.core import cromwell.core.path.Path import lenthall.exception.ThrowableAggregation -import wdl4s.values.WdlValue +import wdl4s.wdl.values.WdlValue case class CallContext(root: Path, stdout: String, stderr: String) diff --git a/core/src/main/scala/cromwell/core/package.scala b/core/src/main/scala/cromwell/core/package.scala index 625fab0d5..e94dbc809 100644 --- a/core/src/main/scala/cromwell/core/package.scala +++ b/core/src/main/scala/cromwell/core/package.scala @@ -3,7 +3,7 @@ package cromwell import cats.data.Validated._ import cats.syntax.validated._ import lenthall.validation.ErrorOr.ErrorOr -import wdl4s.values.WdlValue +import wdl4s.wdl.values.WdlValue import scala.util.{Failure, Success, Try} diff --git a/core/src/main/scala/cromwell/core/path/BetterFileMethods.scala b/core/src/main/scala/cromwell/core/path/BetterFileMethods.scala index 5346ec70c..94c780a84 100644 --- a/core/src/main/scala/cromwell/core/path/BetterFileMethods.scala +++ b/core/src/main/scala/cromwell/core/path/BetterFileMethods.scala @@ -214,11 +214,11 @@ trait BetterFileMethods { betterFile.bufferedReader(codec) final def newBufferedWriter(implicit codec: Codec, openOptions: OpenOptions = OpenOptions.default): BufferedWriter = - betterFile.newBufferedWriter(codec) + betterFile.newBufferedWriter(codec, openOptions) final def bufferedWriter(implicit codec: Codec, openOptions: OpenOptions = OpenOptions.default): ManagedResource[BufferedWriter] = - betterFile.bufferedWriter(codec) + betterFile.bufferedWriter(codec, openOptions) final def newFileReader: FileReader = betterFile.newFileReader diff --git a/core/src/main/scala/cromwell/core/simpleton/WdlValueBuilder.scala b/core/src/main/scala/cromwell/core/simpleton/WdlValueBuilder.scala index 774c1b5ce..238553f2a 100644 --- a/core/src/main/scala/cromwell/core/simpleton/WdlValueBuilder.scala +++ b/core/src/main/scala/cromwell/core/simpleton/WdlValueBuilder.scala @@ -1,12 +1,12 @@ package cromwell.core.simpleton -import wdl4s.TaskOutput -import wdl4s.types._ -import wdl4s.values.{WdlArray, WdlMap, WdlOptionalValue, WdlPair, WdlValue} +import cromwell.core.simpleton.WdlValueSimpleton._ +import cromwell.core.{CallOutputs, JobOutput} +import wdl4s.wdl.TaskOutput +import wdl4s.wdl.types._ +import wdl4s.wdl.values.{WdlArray, WdlMap, WdlOptionalValue, WdlPair, WdlValue} import scala.language.postfixOps -import cromwell.core.{CallOutputs, JobOutput} -import cromwell.core.simpleton.WdlValueSimpleton._ /** @@ -88,7 +88,7 @@ object WdlValueBuilder { // Group tuples by key using a Map with key type `K`. def group[K](tuples: Traversable[(K, SimpletonComponent)]): Map[K, Traversable[SimpletonComponent]] = { - tuples groupBy { case (i, _) => i } mapValues { _ map { case (i, s) => s} } + tuples groupBy { case (i, _) => i } mapValues { _ map { case (_, s) => s} } } outputType match { diff --git a/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala b/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala index 1f5e04375..aa5397d55 100644 --- a/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala +++ b/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala @@ -1,6 +1,6 @@ package cromwell.core.simpleton -import wdl4s.values._ +import wdl4s.wdl.values._ case class WdlValueSimpleton(simpletonKey: String, simpletonValue: WdlPrimitive) diff --git a/core/src/main/scala/cromwell/util/GracefulShutdownHelper.scala b/core/src/main/scala/cromwell/util/GracefulShutdownHelper.scala new file mode 100644 index 000000000..5ed66fb5e --- /dev/null +++ b/core/src/main/scala/cromwell/util/GracefulShutdownHelper.scala @@ -0,0 +1,34 @@ +package cromwell.util + +import akka.actor.{Actor, ActorLogging, ActorRef, Terminated} +import akka.pattern.GracefulStopSupport +import cats.data.NonEmptyList +import cromwell.util.GracefulShutdownHelper.ShutdownCommand + +object GracefulShutdownHelper { + case object ShutdownCommand +} + +trait GracefulShutdownHelper extends GracefulStopSupport { this: Actor with ActorLogging => + private var shuttingDown: Boolean = false + private var shutdownList: Set[ActorRef] = Set.empty + + def isShuttingDown: Boolean = shuttingDown + + def waitForActorsAndShutdown(actorsLists: NonEmptyList[ActorRef]): Unit = { + if (shuttingDown) { + log.error("Programmer error, this actor has already initiated its shutdown. Only call this once per actor !") + } else { + shuttingDown = true + shutdownList = actorsLists.toList.toSet + shutdownList foreach context.watch + shutdownList foreach { _ ! ShutdownCommand } + + context become { + case Terminated(actor) if shuttingDown && shutdownList.contains(actor) => + shutdownList = shutdownList - actor + if (shutdownList.isEmpty) context stop self + } + } + } +} diff --git a/core/src/main/scala/cromwell/util/JsonFormatting/WdlValueJsonFormatter.scala b/core/src/main/scala/cromwell/util/JsonFormatting/WdlValueJsonFormatter.scala index dc7f55fe5..53bdd4293 100644 --- a/core/src/main/scala/cromwell/util/JsonFormatting/WdlValueJsonFormatter.scala +++ b/core/src/main/scala/cromwell/util/JsonFormatting/WdlValueJsonFormatter.scala @@ -1,9 +1,10 @@ package cromwell.util.JsonFormatting import spray.json._ -import wdl4s.WdlExpression -import wdl4s.types.{WdlArrayType, WdlMapType, WdlStringType} -import wdl4s.values._ +import wdl4s.wdl.WdlExpression +import wdl4s.wdl.types._ +import wdl4s.wdl.values._ +import wdl4s.wdl.values.{WdlBoolean, WdlFloat, WdlInteger, WdlString, WdlValue} object WdlValueJsonFormatter extends DefaultJsonProtocol { implicit object WdlValueJsonFormat extends RootJsonFormat[WdlValue] { diff --git a/core/src/main/scala/cromwell/util/PromiseActor.scala b/core/src/main/scala/cromwell/util/PromiseActor.scala index 58aea267a..bd5efa5b0 100644 --- a/core/src/main/scala/cromwell/util/PromiseActor.scala +++ b/core/src/main/scala/cromwell/util/PromiseActor.scala @@ -1,7 +1,7 @@ package cromwell.util import akka.actor._ - +import cromwell.core.Dispatcher.EngineDispatcher import scala.concurrent.{Future, Promise} private class PromiseActor(promise: Promise[Any], sendTo: ActorRef, msg: Any) extends Actor with ActorLogging { @@ -42,7 +42,7 @@ object PromiseActor { promise.future } - def props(promise: Promise[Any], sendTo: ActorRef, msg: Any): Props = Props(new PromiseActor(promise, sendTo, msg)) + def props(promise: Promise[Any], sendTo: ActorRef, msg: Any): Props = Props(new PromiseActor(promise, sendTo, msg)).withDispatcher(EngineDispatcher) implicit class EnhancedActorRef(val actorRef: ActorRef) extends AnyVal { def askNoTimeout(message: Any)(implicit actorRefFactory: ActorRefFactory): Future[Any] = { diff --git a/core/src/test/scala/cromwell/core/retry/RetrySpec.scala b/core/src/test/scala/cromwell/core/retry/RetrySpec.scala index 27f24076c..f62b49d47 100644 --- a/core/src/test/scala/cromwell/core/retry/RetrySpec.scala +++ b/core/src/test/scala/cromwell/core/retry/RetrySpec.scala @@ -34,7 +34,7 @@ class RetrySpec extends TestKitSuite("retry-spec") with FlatSpecLike with Matche isFatal: Throwable => Boolean = Retry.throwableToFalse): Future[Int] = { withRetry( - f = work.doIt, + f = () => work.doIt(), maxRetries = Option(retries), isTransient = isTransient, isFatal = isFatal diff --git a/core/src/test/scala/cromwell/core/simpleton/WdlValueBuilderSpec.scala b/core/src/test/scala/cromwell/core/simpleton/WdlValueBuilderSpec.scala index 1e558ceb1..514051e22 100644 --- a/core/src/test/scala/cromwell/core/simpleton/WdlValueBuilderSpec.scala +++ b/core/src/test/scala/cromwell/core/simpleton/WdlValueBuilderSpec.scala @@ -4,9 +4,9 @@ import cromwell.core.simpleton.WdlValueBuilderSpec._ import org.scalatest.{FlatSpec, Matchers} import org.specs2.mock.Mockito import wdl4s.parser.WdlParser.Ast -import wdl4s.types.{WdlArrayType, WdlIntegerType, WdlMapType, WdlStringType} -import wdl4s.values.{WdlArray, WdlInteger, WdlMap, WdlPair, WdlString, WdlValue} -import wdl4s.{TaskOutput, WdlExpression} +import wdl4s.wdl.types.{WdlArrayType, WdlIntegerType, WdlMapType, WdlStringType} +import wdl4s.wdl.values.{WdlArray, WdlInteger, WdlMap, WdlPair, WdlString, WdlValue} +import wdl4s.wdl.{TaskOutput, WdlExpression} object WdlValueBuilderSpec { // WdlValueBuilder doesn't care about this expression, but something needs to be passed to the TaskOutput constructor. @@ -115,9 +115,9 @@ class WdlValueBuilderSpec extends FlatSpec with Matchers with Mockito { it should "round trip everything together with no losses" in { - val wdlValues = (simpletonConversions map { case SimpletonConversion(name, wdlValue, simpletons) => name -> wdlValue }).toMap + val wdlValues = (simpletonConversions map { case SimpletonConversion(name, wdlValue, _) => name -> wdlValue }).toMap val taskOutputs = wdlValues map { case (k, wv) => TaskOutput(k, wv.wdlType, IgnoredExpression, mock[Ast], None) } - val allSimpletons = simpletonConversions flatMap { case SimpletonConversion(name, wdlValue, simpletons) => simpletons } + val allSimpletons = simpletonConversions flatMap { case SimpletonConversion(_, _, simpletons) => simpletons } import WdlValueSimpleton._ diff --git a/core/src/test/scala/cromwell/util/AkkaTestUtil.scala b/core/src/test/scala/cromwell/util/AkkaTestUtil.scala index 10b05dc2b..1633a7d1d 100644 --- a/core/src/test/scala/cromwell/util/AkkaTestUtil.scala +++ b/core/src/test/scala/cromwell/util/AkkaTestUtil.scala @@ -8,7 +8,7 @@ object AkkaTestUtil { implicit class EnhancedTestProbe(probe: TestProbe) { def props = Props(new Actor with ActorLogging { def receive = { - case outbound if sender == probe.ref => + case outbound @ _ if sender == probe.ref => val msg = "Unexpected outbound message from Probe. You're doing something wrong!" log.error(msg) throw new RuntimeException(msg) diff --git a/core/src/test/scala/cromwell/util/GracefulShutdownHelperSpec.scala b/core/src/test/scala/cromwell/util/GracefulShutdownHelperSpec.scala new file mode 100644 index 000000000..4d93073dc --- /dev/null +++ b/core/src/test/scala/cromwell/util/GracefulShutdownHelperSpec.scala @@ -0,0 +1,42 @@ +package cromwell.util + +import akka.actor.{Actor, ActorLogging, Props} +import akka.testkit.TestProbe +import cats.data.NonEmptyList +import cromwell.core.TestKitSuite +import cromwell.util.GracefulShutdownHelper.ShutdownCommand +import org.scalatest.{FlatSpecLike, Matchers} + +class GracefulShutdownHelperSpec extends TestKitSuite with FlatSpecLike with Matchers { + behavior of "GracefulShutdownHelper" + + it should "send ShutdownCommand to actors, wait for them to shutdown, then shut itself down" in { + val testProbeA = TestProbe() + val testProbeB = TestProbe() + + val testActor = system.actorOf(Props(new Actor with GracefulShutdownHelper with ActorLogging { + override def receive: Receive = { + case ShutdownCommand => waitForActorsAndShutdown(NonEmptyList.of(testProbeA.ref, testProbeB.ref)) + } + })) + + watch(testActor) + + testActor ! ShutdownCommand + + testProbeA.expectMsg(ShutdownCommand) + testProbeB.expectMsg(ShutdownCommand) + + // Make sure it's still alive + expectNoMsg() + + system stop testProbeA.ref + + // Make sure it's still alive + expectNoMsg() + + system stop testProbeB.ref + + expectTerminated(testActor) + } +} diff --git a/core/src/test/scala/cromwell/util/SampleWdl.scala b/core/src/test/scala/cromwell/util/SampleWdl.scala index 06bc98283..b7d07fee9 100644 --- a/core/src/test/scala/cromwell/util/SampleWdl.scala +++ b/core/src/test/scala/cromwell/util/SampleWdl.scala @@ -4,16 +4,15 @@ import java.util.UUID import cromwell.core.WorkflowSourceFilesWithoutImports import cromwell.core.path.{DefaultPathBuilder, Path} -import cromwell.core.WorkflowSourceFilesWithoutImports import spray.json._ -import wdl4s._ -import wdl4s.types.{WdlArrayType, WdlStringType} -import wdl4s.values._ +import wdl4s.wdl.types.{WdlArrayType, WdlStringType} +import wdl4s.wdl.values._ +import wdl4s.wdl.{WorkflowJson, WorkflowRawInputs, WorkflowSource} import scala.language.postfixOps trait SampleWdl extends TestFileUtil { - def workflowSource(runtime: String = ""): WdlSource + def workflowSource(runtime: String = ""): WorkflowSource def asWorkflowSources(runtime: String = "", workflowOptions: String = "{}", labels: String = "{}", @@ -21,7 +20,7 @@ trait SampleWdl extends TestFileUtil { workflowTypeVersion: Option[String] = None) = { WorkflowSourceFilesWithoutImports( workflowSource = workflowSource(runtime), - inputsJson = wdlJson, + inputsJson = workflowJson, workflowOptionsJson = workflowOptions, labelsJson = labels, workflowType = workflowType, @@ -66,7 +65,7 @@ trait SampleWdl extends TestFileUtil { def read(value: JsValue) = throw new NotImplementedError(s"Reading JSON not implemented: $value") } - def wdlJson: WdlJson = rawInputs.toJson.prettyPrint + def workflowJson: WorkflowJson = rawInputs.toJson.prettyPrint def deleteFile(path: Path) = path.delete() } @@ -399,7 +398,7 @@ object SampleWdl { object DeclarationsWorkflow extends SampleWdl { - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s""" |task cat { | File file @@ -453,7 +452,7 @@ object SampleWdl { } trait ZeroOrMorePostfixQuantifier extends SampleWdl { - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s""" |task hello { | Array[String] person @@ -484,7 +483,7 @@ object SampleWdl { } trait OneOrMorePostfixQuantifier extends SampleWdl { - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s""" |task hello { | Array[String]+ person @@ -853,7 +852,7 @@ object SampleWdl { } object FilePassingWorkflow extends SampleWdl { - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s"""task a { | File in | String out_name = "out" @@ -893,7 +892,7 @@ object SampleWdl { * different */ case class CallCachingWorkflow(salt: String) extends SampleWdl { - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s"""task a { | File in | String out_name = "out" @@ -945,7 +944,7 @@ object SampleWdl { |k3\tv3 """.stripMargin.trim - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s""" |task a { | Array[String] array @@ -1021,7 +1020,7 @@ object SampleWdl { } object CallCachingHashingWdl extends SampleWdl { - override def workflowSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WorkflowSource = s"""task t { | Int a | Float b diff --git a/core/src/test/scala/cromwell/util/TestFileUtil.scala b/core/src/test/scala/cromwell/util/TestFileUtil.scala index a6bedd490..6f0d08a1f 100644 --- a/core/src/test/scala/cromwell/util/TestFileUtil.scala +++ b/core/src/test/scala/cromwell/util/TestFileUtil.scala @@ -3,7 +3,7 @@ package cromwell.util import java.nio.file.attribute.PosixFilePermission import cromwell.core.path.{DefaultPathBuilder, Path} -import wdl4s.values._ +import wdl4s.wdl.values._ trait TestFileUtil { def createCannedFile(prefix: String, contents: String, dir: Option[Path] = None): Path = { diff --git a/core/src/test/scala/cromwell/util/WdlValueJsonFormatterSpec.scala b/core/src/test/scala/cromwell/util/WdlValueJsonFormatterSpec.scala index 91d678c01..7924f30ab 100644 --- a/core/src/test/scala/cromwell/util/WdlValueJsonFormatterSpec.scala +++ b/core/src/test/scala/cromwell/util/WdlValueJsonFormatterSpec.scala @@ -1,14 +1,10 @@ package cromwell.util -import scala.Vector - -import org.scalatest.FlatSpec -import org.scalatest.Matchers - -import JsonFormatting.WdlValueJsonFormatter.WdlValueJsonFormat -import spray.json.{ JsObject, pimpString } -import wdl4s.types.{ WdlArrayType, WdlStringType } -import wdl4s.values.{ WdlArray, WdlPair, WdlString } +import cromwell.util.JsonFormatting.WdlValueJsonFormatter.WdlValueJsonFormat +import org.scalatest.{FlatSpec, Matchers} +import spray.json.{JsObject, pimpString} +import wdl4s.wdl.types.{WdlArrayType, WdlStringType} +import wdl4s.wdl.values.{WdlArray, WdlPair, WdlString} class WdlValueJsonFormatterSpec extends FlatSpec with Matchers { diff --git a/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala b/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala index b8bbb4dd8..71c0d813f 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala @@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try} class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit actorSystem: ActorSystem, materializer: ActorMaterializer) { - lazy val engineEndpoint = s"$cromwellUrl/api/engine/$apiVersion" + lazy val engineEndpoint = s"$cromwellUrl/engine/$apiVersion" lazy val submitEndpoint = s"$cromwellUrl/api/workflows/$apiVersion" // Everything else is a suffix off the submit endpoint: lazy val batchSubmitEndpoint = s"$submitEndpoint/batch" @@ -29,12 +29,21 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto def abortEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "abort") def statusEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "status") def metadataEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "metadata") + def outputsEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "outputs") + def logsEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "logs") + def diffEndpoint(workflowA: WorkflowId, callA: String, indexA: ShardIndex, workflowB: WorkflowId, callB: String, indexB: ShardIndex): String = { + def shardParam(aOrB: String, s: ShardIndex) = s.index.map(i => s"&index$aOrB=$i.toString").getOrElse("") + s"$submitEndpoint/callcaching/diff?workflowA=$workflowA&callA=$callA&workflowB=$workflowB&callB=$callB${shardParam("A", indexA)}${shardParam("B", indexB)}" + } lazy val backendsEndpoint = s"$submitEndpoint/backends" lazy val versionEndpoint = s"$engineEndpoint/version" import model.CromwellStatusJsonSupport._ + import model.WorkflowOutputsJsonSupport._ + import model.WorkflowLogsJsonSupport._ import model.CromwellBackendsJsonSupport._ import model.CromwellVersionJsonSupport._ + import model.CallCacheDiffJsonSupport._ private def requestEntityForSubmit(workflowSubmission: WorkflowSubmission) = { import cromwell.api.model.LabelsJsonFormatter._ @@ -87,39 +96,38 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto } } - def abort(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStatus] = getRequest[CromwellStatus](abortEndpoint(workflowId)) map WorkflowStatus.apply - def status(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStatus] = getRequest[CromwellStatus](statusEndpoint(workflowId)) map WorkflowStatus.apply - def metadata(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowMetadata] = getRequest[String](metadataEndpoint(workflowId)) map WorkflowMetadata - def backends(implicit ec: ExecutionContext): Future[CromwellBackends] = getRequest[CromwellBackends](backendsEndpoint) - def version(implicit ec: ExecutionContext): Future[CromwellVersion] = getRequest[CromwellVersion](versionEndpoint) + def abort(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStatus] = simpleRequest[CromwellStatus](uri = abortEndpoint(workflowId), method = HttpMethods.POST) map WorkflowStatus.apply + def status(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStatus] = simpleRequest[CromwellStatus](statusEndpoint(workflowId)) map WorkflowStatus.apply + def metadata(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowMetadata] = simpleRequest[String](metadataEndpoint(workflowId)) map WorkflowMetadata + def outputs(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowOutputs] = simpleRequest[WorkflowOutputs](outputsEndpoint(workflowId)) + def logs(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowLogs] = simpleRequest[WorkflowLogsStruct](outputsEndpoint(workflowId)) map WorkflowLogs.apply + def callCacheDiff(workflowA: WorkflowId, callA: String, shardIndexA: ShardIndex, workflowB: WorkflowId, callB: String, shardIndexB: ShardIndex)(implicit ec: ExecutionContext): Future[CallCacheDiff] = + simpleRequest[CallCacheDiff](diffEndpoint(workflowA, callA, shardIndexA, workflowB, callB, shardIndexB)) + def backends(implicit ec: ExecutionContext): Future[CromwellBackends] = simpleRequest[CromwellBackends](backendsEndpoint) + def version(implicit ec: ExecutionContext): Future[CromwellVersion] = simpleRequest[CromwellVersion](versionEndpoint) + + private [api] def executeRequest(request: HttpRequest) = Http().singleRequest(request) /** * * @tparam A The type of response expected. Must be supported by an implicit unmarshaller from ResponseEntity. */ private def makeRequest[A](request: HttpRequest)(implicit um: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Future[A] = for { - response <- Http().singleRequest(request) + response <- executeRequest(request) decoded <- Future.fromTry(decodeResponse(response)) entity <- Future.fromTry(decoded.toEntity) - unmarshalled <- entity.to[A] + unmarshalled <- unmarshall(response, entity)(um, ec) } yield unmarshalled - private def getRequest[A](uri: String)(implicit um: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Future[A] = makeRequest[A](HttpRequest(uri = uri)) - - private def insertSecrets(options: Option[String], refreshToken: Option[String]): Option[String] = { - import DefaultJsonProtocol._ - val tokenKey = "refresh_token" - - def addToken(optionsMap: Map[String, JsValue]): Map[String, JsValue] = { - refreshToken match { - case Some(token) if optionsMap.get(tokenKey).isDefined => optionsMap + (tokenKey -> JsString(token)) - case _ => optionsMap - } - } + private def unmarshall[A](response: HttpResponse, entity: Unmarshal[ResponseEntity])(implicit um: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Future[A] = { + import CromwellFailedResponseExceptionJsonSupport._ - options map (o => addToken(o.parseJson.asJsObject.convertTo[Map[String, JsValue]]).toJson.toString) + if (response.status.isSuccess()) entity.to[A] + else entity.to[CromwellFailedResponseException] flatMap Future.failed } + private def simpleRequest[A](uri: String, method: HttpMethod = HttpMethods.GET)(implicit um: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Future[A] = makeRequest[A](HttpRequest(uri = uri, method = method)) + private val decoders = Map( HttpEncodings.gzip -> Gzip, HttpEncodings.deflate -> Deflate, @@ -128,7 +136,7 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto private def decodeResponse(response: HttpResponse): Try[HttpResponse] = { decoders.get(response.encoding) map { decoder => - Try(decoder.decode(response)) + Try(decoder.decodeMessage(response)) } getOrElse Failure(UnsuccessfulRequestException(s"No decoder for ${response.encoding}", response)) } } @@ -138,6 +146,7 @@ object CromwellClient { def toEntity: Try[Unmarshal[ResponseEntity]] = response match { case HttpResponse(_: StatusCodes.Success, _, entity, _) => Success(Unmarshal(entity)) + case HttpResponse(_: StatusCodes.ServerError, _, entity, _) => Success(Unmarshal(entity)) case other => Failure(UnsuccessfulRequestException("Unmarshalling error", other)) } } @@ -145,4 +154,20 @@ object CromwellClient { final case class UnsuccessfulRequestException(message: String, httpResponse: HttpResponse) extends Exception { override def getMessage: String = message + ": " + httpResponse.toString } + + private[api] def insertSecrets(options: Option[String], refreshToken: Option[String]): Option[String] = { + import DefaultJsonProtocol._ + val tokenKey = "refresh_token" + + val secretOptions = for { + refreshTokenValue <- refreshToken + optionsValue <- options + optionsMap = optionsValue.parseJson.asJsObject.convertTo[Map[String, JsValue]] + if optionsMap.contains(tokenKey) + secretMap = optionsMap.updated(tokenKey, JsString(refreshTokenValue)) + secretValue = secretMap.toJson.toString + } yield secretValue + + secretOptions orElse options + } } diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/CallCacheDiff.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/CallCacheDiff.scala new file mode 100644 index 000000000..fa4e7fb91 --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/CallCacheDiff.scala @@ -0,0 +1,15 @@ +package cromwell.api.model + +import ShardIndexFormatter._ +import WorkflowIdJsonFormatter._ +import spray.json.DefaultJsonProtocol + +case class CallCacheDiffCallDescription(executionStatus: String, allowResultReuse: Boolean, callFqn: String, jobIndex: ShardIndex, workflowId: WorkflowId) +case class HashDifference(hashKey: String, callA: Option[String], callB: Option[String]) +case class CallCacheDiff(callA: CallCacheDiffCallDescription, callB: CallCacheDiffCallDescription, hashDifferential: List[HashDifference]) + +object CallCacheDiffJsonSupport extends DefaultJsonProtocol { + implicit val CallCacheDiffCallDescriptionFormat = jsonFormat5(CallCacheDiffCallDescription) + implicit val HashDifferenceFormat = jsonFormat3(HashDifference) + implicit val CallCacheDiffFormat = jsonFormat3(CallCacheDiff) +} diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellFailedResponseException.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellFailedResponseException.scala new file mode 100644 index 000000000..6f58fe3bd --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellFailedResponseException.scala @@ -0,0 +1,9 @@ +package cromwell.api.model + +import spray.json.DefaultJsonProtocol + +object CromwellFailedResponseExceptionJsonSupport extends DefaultJsonProtocol { + implicit val CromwellFailedResponseExceptionFormat = jsonFormat2(CromwellFailedResponseException) +} + +case class CromwellFailedResponseException(status: String, message: String) extends Exception(message) diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellQueryResult.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellQueryResult.scala new file mode 100644 index 000000000..7fcb0dcba --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellQueryResult.scala @@ -0,0 +1,15 @@ +package cromwell.api.model + +import java.time.OffsetDateTime +import spray.json.DefaultJsonProtocol +import cromwell.api.model.WorkflowIdJsonFormatter._ +import cromwell.api.model.WorkflowStatusJsonFormatter._ + +case class CromwellQueryResults(results: Seq[CromwellQueryResult]) + +case class CromwellQueryResult(name: String, id: WorkflowId, status: WorkflowStatus, end: OffsetDateTime, start: OffsetDateTime) + +object CromwellQueryResultJsonFormatter extends DefaultJsonProtocol { + implicit val CromwellQueryResultJsonFormat = jsonFormat5(CromwellQueryResult) + implicit val CromwellQueryResultsJsonFormat = jsonFormat1(CromwellQueryResults) +} diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala index fb5e97669..fd9d88d21 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala @@ -1,6 +1,7 @@ package cromwell.api.model import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat} +import scala.language.postfixOps object LabelsJsonFormatter extends DefaultJsonProtocol { implicit object LabelJsonFormat extends RootJsonFormat[List[Label]] { diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/OutputResponse.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/OutputResponse.scala deleted file mode 100644 index 657aca668..000000000 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/OutputResponse.scala +++ /dev/null @@ -1,9 +0,0 @@ -package cromwell.api.model - -import spray.json.DefaultJsonProtocol - -object OutputResponseJsonSupport extends DefaultJsonProtocol { - implicit val OutputResponseFormat = jsonFormat2(OutputResponse) -} - -case class OutputResponse(id: String, outputs: Map[String, String]) diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/ShardIndex.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/ShardIndex.scala new file mode 100644 index 000000000..405305b8a --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/ShardIndex.scala @@ -0,0 +1,18 @@ +package cromwell.api.model + +import spray.json.{DefaultJsonProtocol, JsNumber, JsValue, RootJsonFormat} + +case class ShardIndex(index: Option[Int]) extends AnyVal { + override def toString: String = index.getOrElse(-1).toString +} + +object ShardIndexFormatter extends DefaultJsonProtocol { + implicit object ShardIndexJsonFormat extends RootJsonFormat[ShardIndex] { + def write(si: ShardIndex) = JsNumber(si.index.getOrElse(-1)) + def read(value: JsValue) = value match { + case JsNumber(i) if i.equals(-1) => ShardIndex(None) + case JsNumber(i) if i.isValidInt && i.intValue > 0 => ShardIndex(Option(i.intValue())) + case other => throw new UnsupportedOperationException(s"Cannot deserialize $other into a ShardIndex") + } + } +} diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowId.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowId.scala index 2ad4760bb..f52495136 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowId.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowId.scala @@ -2,6 +2,8 @@ package cromwell.api.model import java.util.UUID +import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat} + // ********* !!!!!!!!!! ******** // // WARNING! This is the Cromwell API version of WorkflowId. If you aren't changing the API client, you probably @@ -9,7 +11,7 @@ import java.util.UUID // // ********* !!!!!!!!!! ******** -case class WorkflowId(id: UUID) { +final case class WorkflowId(id: UUID) extends AnyVal { override def toString = id.toString def shortString = id.toString.split("-")(0) } @@ -18,3 +20,14 @@ object WorkflowId { def fromString(id: String): WorkflowId = new WorkflowId(UUID.fromString(id)) def randomId() = WorkflowId(UUID.randomUUID()) } + +object WorkflowIdJsonFormatter extends DefaultJsonProtocol { + implicit object WorkflowIdJsonFormat extends RootJsonFormat[WorkflowId] { + def write(id: WorkflowId) = JsString(id.id.toString) + def read(value: JsValue) = value match { + case JsString(s) => WorkflowId.fromString(s) + case other => throw new UnsupportedOperationException(s"Cannot deserialize $other into a ShardIndex") + } + } +} + diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowLogs.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowLogs.scala new file mode 100644 index 000000000..b08d6299a --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowLogs.scala @@ -0,0 +1,39 @@ +package cromwell.api.model + +import spray.json.DefaultJsonProtocol +import cromwell.api.model.ShardIndexFormatter._ + +private[api] case class CallLogStruct(stdout: String, stderr: String, backendLogs: Map[String, String], shardIndex: ShardIndex, attempt: Int) +private[api] case class WorkflowLogsStruct(calls: Map[String, List[CallLogStruct]], id: String) + + +object WorkflowLogsJsonSupport extends DefaultJsonProtocol { + implicit val CallLogStructFormat = jsonFormat5(CallLogStruct) + implicit val WorkflowLogsStructFormat = jsonFormat2(WorkflowLogsStruct) +} + +/** + * @param logs Mapping from shard index and attempt + */ +case class CallLogs(logs: Map[JobLogsKey, JobLogs]) +case class JobLogsKey(shardIndex: ShardIndex, attempt: Int) +case class JobLogs(stdout: String, stderr: String, backendLogs: Map[String, String]) + +/** + * @param logs Mapping from call name to all logs for that call (including all shards and attempts) + */ +case class WorkflowLogs(logs: Map[String, CallLogs]) + +object WorkflowLogs { + def callStructsToCallLogs(structs: List[CallLogStruct]): CallLogs = { + val callLogs = structs map { struct => + JobLogsKey(struct.shardIndex, struct.attempt) -> JobLogs(struct.stdout, struct.stderr, struct.backendLogs) + } + CallLogs(callLogs.toMap) + } + + def apply(struct: WorkflowLogsStruct): WorkflowLogs = { + val workflowLogs = struct.calls map { case (callName, structs) => callName -> callStructsToCallLogs(structs)} + WorkflowLogs(workflowLogs) + } +} diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowOutputs.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowOutputs.scala new file mode 100644 index 000000000..4eb4cd0c8 --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowOutputs.scala @@ -0,0 +1,9 @@ +package cromwell.api.model + +import spray.json.DefaultJsonProtocol + +object WorkflowOutputsJsonSupport extends DefaultJsonProtocol { + implicit val OutputResponseFormat = jsonFormat2(WorkflowOutputs) +} + +case class WorkflowOutputs(id: String, outputs: Map[String, String]) diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowStatus.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowStatus.scala index adadea912..6da1282d2 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowStatus.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowStatus.scala @@ -1,5 +1,7 @@ package cromwell.api.model +import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat} + // ********* !!!!!!!!!! ******** // // WARNING! This is a Cromwell API class. If you aren't changing the API client, you probably @@ -37,3 +39,13 @@ object WorkflowStatus { def apply(workflowStatus: CromwellStatus): WorkflowStatus = apply(workflowStatus.status) } + +object WorkflowStatusJsonFormatter extends DefaultJsonProtocol { + implicit object WorkflowStatusJsonFormat extends RootJsonFormat[WorkflowStatus] { + def write(status: WorkflowStatus) = new JsString(status.toString) + def read(value: JsValue) = value match { + case JsString(string) => WorkflowStatus(string) + case other => throw new UnsupportedOperationException(s"Cannot deserialize $other into a WorkflowStatus") + } + } +} diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/package.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/package.scala new file mode 100644 index 000000000..5dd6bffb1 --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/package.scala @@ -0,0 +1,20 @@ +package cromwell.api + +import java.time.OffsetDateTime + +import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat} + +package object model { + + implicit val OffsetDateTimeJsonFormat = OffsetDateTimeJsonFormatter.OffsetDateTimeFormat + + object OffsetDateTimeJsonFormatter extends DefaultJsonProtocol { + object OffsetDateTimeFormat extends RootJsonFormat[OffsetDateTime] { + def write(odt: OffsetDateTime) = new JsString(odt.toString) + def read(value: JsValue) = value match { + case JsString(string) => OffsetDateTime.parse(string) + case other => throw new UnsupportedOperationException(s"Cannot deserialize $other into an OffsetDateTime") + } + } + } +} diff --git a/cromwellApiClient/src/test/scala/cromwell/api/CromwellClientSpec.scala b/cromwellApiClient/src/test/scala/cromwell/api/CromwellClientSpec.scala new file mode 100644 index 000000000..5c0b43628 --- /dev/null +++ b/cromwellApiClient/src/test/scala/cromwell/api/CromwellClientSpec.scala @@ -0,0 +1,36 @@ +package cromwell.api + +import org.scalatest.prop.TableDrivenPropertyChecks +import org.scalatest.{FlatSpec, Matchers} +import spray.json.JsonParser.ParsingException + +class CromwellClientSpec extends FlatSpec with Matchers with TableDrivenPropertyChecks { + behavior of "CromwellClient" + + val table = Table( + ("description", "optionsOption", "refreshTokenOption", "expected"), + ("ignore bad json when refresh token not provided", Option("{"), None, Option("{")), + ("not format json when refresh token key not found", Option("{ }"), Option("myToken"), Option("{ }")), + ("replace token when found", Option("""{"refresh_token" : "replace_me"}"""), Option("myToken"), + Option("""{"refresh_token":"myToken"}""")), + ) + + forAll(table) { (description, optionsOption, refreshTokenOption, expected) => + it should description in { + val actual = CromwellClient.insertSecrets(optionsOption, refreshTokenOption) + actual should be(expected) + } + } + + it should "throw an exception when inserting a refresh token into bad json" in { + val optionsOption = Option("{") + val refreshTokenOption = Option("myToken") + val actual = intercept[ParsingException](CromwellClient.insertSecrets(optionsOption, refreshTokenOption)) + actual.summary should be("""Unexpected end-of-input at input index 1 (line 1, position 2), expected '"'""") + actual.detail should be( + """| + |{ + | ^ + |""".stripMargin) + } +} diff --git a/cromwellApiClient/src/test/scala/cromwell/api/CromwellResponseFailedSpec.scala b/cromwellApiClient/src/test/scala/cromwell/api/CromwellResponseFailedSpec.scala new file mode 100644 index 000000000..d8b2dd917 --- /dev/null +++ b/cromwellApiClient/src/test/scala/cromwell/api/CromwellResponseFailedSpec.scala @@ -0,0 +1,41 @@ +package cromwell.api + +import java.net.URL + +import akka.actor.ActorSystem +import akka.http.scaladsl.model._ +import akka.stream.ActorMaterializer +import akka.testkit.TestKit +import cromwell.api.model.CromwellFailedResponseException +import org.scalatest.{AsyncFlatSpecLike, BeforeAndAfterAll, Matchers} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.language.postfixOps + +class CromwellResponseFailedSpec extends TestKit(ActorSystem()) with AsyncFlatSpecLike with Matchers with BeforeAndAfterAll { + override def afterAll(): Unit = { + Await.ready(system.terminate(), 1 second) + super.afterAll() + } + + implicit val materializer = ActorMaterializer() + + "CromwellAPIClient" should "try to fail the Future with a CromwellFailedResponseException if the HttpResponse is unsuccessful" in { + val client = new CromwellClient(new URL("http://fakeurl"), "v1") { + override def executeRequest(request: HttpRequest): Future[HttpResponse] = Future.successful( + new HttpResponse(StatusCodes.ServiceUnavailable, List.empty[HttpHeader], HttpEntity(ContentTypes.`application/json`, + """{ + | "status": "fail", + | "message": "Cromwell service shutting down" + |} + """.stripMargin), HttpProtocols.`HTTP/1.1`) + ) + } + + recoverToExceptionIf[CromwellFailedResponseException] { client.version(scala.concurrent.ExecutionContext.global) } map { exception => + assert(exception.status == "fail") + assert(exception.message == "Cromwell service shutting down") + } + } +} diff --git a/cromwellApiClient/src/test/scala/cromwell/api/model/CromwellQueryResultJsonFormatterSpec.scala b/cromwellApiClient/src/test/scala/cromwell/api/model/CromwellQueryResultJsonFormatterSpec.scala new file mode 100644 index 000000000..f1983e794 --- /dev/null +++ b/cromwellApiClient/src/test/scala/cromwell/api/model/CromwellQueryResultJsonFormatterSpec.scala @@ -0,0 +1,45 @@ +package cromwell.api.model + +import java.time.OffsetDateTime + +import org.scalatest.{FlatSpec, Matchers} +import spray.json._ +import cromwell.api.model.CromwellQueryResultJsonFormatter._ + +class CromwellQueryResultJsonFormatterSpec extends FlatSpec with Matchers { + + behavior of "CromwellQueryResultJsonFormat" + + val sampleQueryResult = CromwellQueryResults(results = List( + CromwellQueryResult("switcheroo", WorkflowId.fromString("bee51f36-396d-4e22-8a81-33dedff66bf6"), Failed, OffsetDateTime.parse("2017-07-24T14:44:34.010-04:00"), OffsetDateTime.parse("2017-07-24T14:44:33.227-04:00")), + CromwellQueryResult("switcheroo", WorkflowId.fromString("0071495e-39eb-478e-bc98-8614b986c91e"), Succeeded, OffsetDateTime.parse("2017-07-24T15:06:45.940-04:00"), OffsetDateTime.parse("2017-07-24T15:04:54.372-04:00")) + )) + + val sampleJson = """|{ + | "results": [ + | { + | "name": "switcheroo", + | "id": "bee51f36-396d-4e22-8a81-33dedff66bf6", + | "status": "Failed", + | "end": "2017-07-24T14:44:34.010-04:00", + | "start": "2017-07-24T14:44:33.227-04:00" + | }, + | { + | "name": "switcheroo", + | "id": "0071495e-39eb-478e-bc98-8614b986c91e", + | "status": "Succeeded", + | "end": "2017-07-24T15:06:45.940-04:00", + | "start": "2017-07-24T15:04:54.372-04:00" + | } + | ] + |}""".stripMargin.parseJson.asJsObject + + it should "write a query result as a structured JsObject" in { + + sampleQueryResult.toJson shouldEqual sampleJson + } + + it should "read a query result as a structured JsObject" in { + sampleJson.convertTo[CromwellQueryResults] shouldBe sampleQueryResult + } +} diff --git a/database/migration/src/main/resources/changelog.xml b/database/migration/src/main/resources/changelog.xml index 03883abb7..e005c5ef4 100644 --- a/database/migration/src/main/resources/changelog.xml +++ b/database/migration/src/main/resources/changelog.xml @@ -66,6 +66,8 @@ + +