diff --git a/.travis.yml b/.travis.yml index 1be54097a..9c5941c5c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,10 +16,13 @@ before_cache: - find $HOME/.ivy2 -name "ivydata-*.properties" -delete - find $HOME/.sbt -name "*.lock" -delete before_install: + # https://github.com/travis-ci/travis-ci/issues/7940#issuecomment-310759657 + - sudo rm -f /etc/boto.cfg - openssl aes-256-cbc -K "$encrypted_5ebd3ff04788_key" -iv "$encrypted_5ebd3ff04788_iv" -in src/bin/travis/resources/jesConf.tar.enc -out jesConf.tar -d || true env: global: - CENTAUR_BRANCH=develop + - INTEGRATION_TESTS_DIR=src/main/resources/integrationTestCases matrix: # Setting this variable twice will cause the 'script' section to run twice with the respective env var invoked - BUILD_TYPE=sbt @@ -36,3 +39,12 @@ deploy: script: src/bin/travis/publishRelease.sh on: tags: true +notifications: + slack: + rooms: + - secure: B5KYcnhk/ujAUWlHsjzP7ROLm6MtYhaGikdYf6JYINovhMbVKnZCTlZEy7rqT3L2T5uJ25iefD500VQGk1Gn7puQ1sNq50wqjzQaj20PWEiBwoWalcV/nKBcQx1TyFT13LJv8fbFnVPxFCkC3YXoHedx8qAhDs8GH/tT5J8XOC8= + template: + - "Build <%{build_url}|#%{build_number}> (<%{compare_url}|%{commit}>) of %{repository}@%{branch} by %{author} %{result} in %{duration}" + on_success: change + on_failure: change + on_pull_requests: false diff --git a/CHANGELOG.md b/CHANGELOG.md index d6b4a0938..82540c66b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,113 @@ # Cromwell Change Log +## 28 + +### Bug Fixes + +#### WDL write_* functions add a final newline + +The following WDL functions now add a newline after the final line of output (the previous behavior of not adding this +newline was inadvertent): +- `write_lines` +- `write_map` +- `write_object` +- `write_objects` +- `write_tsv` + +For example: + +``` +task writer { + Array[String] a = ["foo", "bar"] + command { + # used to output: "foo\nbar" + # now outputs: "foo\nbar\n" + cat write_lines(a) + } +} +``` + +#### `ContinueWhilePossible` + +A workflow utilizing the WorkflowFailureMode Workflow Option `ContinueWhilePossible` will now successfully reach a terminal state once all runnable jobs have completed. +#### `FailOnStderr` +When `FailOnStderr` is set to false, Cromwell no longer checks for the existence of a stderr file for that task. + +### WDL Functions + +#### New functions: floor, ceil and round: + +Enables the `floor`, `ceil` and `round` functions in WDL to convert floating point numbers to integers. + +For example we can now use the size of an input file to influence the amount of memory the task is given. In the example below a 500MB input file will result in a request for a VM with 2GB of memory: + +``` +task foo { + File in_file + command { ... } + runtime { + docker: "..." + memory: ceil(size(in_file)) * 4 + } +} +``` + +### Call Caching + +* Hash values calculated by Cromwell for a call when call caching is enabled are now published to the metadata. +It is published even if the call failed. However if the call is attempted multiple times (because it has been preempted for example), +since hash values are strictly identical for all attempts, they will only be published in the last attempt section of the metadata for this call. +If the hashes fail to be calculated, the reason is indicated in a `hashFailures` field in the `callCaching` section of the call metadata. +*Important*: Hashes are not retroactively published to the metadata. Which means only workflows run on Cromwell 28+ will have hashes in their metadata. + +See the [README](https://github.com/broadinstitute/cromwell#get-apiworkflowsversionidmetadata) for an example metadata response. + +* New endpoint returning the hash differential for 2 calls. + +`GET /api/workflows/:version/callcaching/diff` + +See the [README](https://github.com/broadinstitute/cromwell#get-apiworkflowsversioncallcachingdiff) for more details. + +### Workflow Submission + +* The workflow submission parameters `wdlSource` and `wdlDependencies` have been deprecated in favor of `workflowSource` and +`workflowDependencies` respectively. The older names are still supported in Cromwell 28 with deprecation warnings but will +be removed in a future version of Cromwell. + +### Labels +* A new `/labels` endpoint has been added to update labels for an existing workflow. See the [README](README.md#patch-apiworkflowsversionidlabels) for more information. +* Label formatting requirements have been updated, please check the [README](README.md#label-format) for more detailed documentation. + + +### JES Backend + +The JES backend now supports a `filesystems.gcs.caching.duplication-strategy` configuration entry. +It can be set to specify the desired behavior of Cromwell regarding call outputs when a call finds a hit in the cache. +The default value is `copy` which will copy all output files to the new call directory. +A second value is allowed, `reference`, that will instead point to the original output files, without copying them. + + +```hocon +filesystems { + gcs { + auth = "application-default" + + caching { + duplication-strategy = "reference" + } + } +} +``` + +A placeholder file will be placed in the execution folder of the cached call to explain the absence of output files and point to the location of the original ones. + + +### Metadata Write Batching + +Metadata write batching works the same as in previous versions of Cromwell, but the default batch size has been changed from 1 to 200. It's possible that 200 is too high in some environments, but 200 is more likely to be an appropriate value +than the previous default. + + ## 27 ### Migration diff --git a/README.md b/README.md index 1c228c7af..362c5e2fe 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ A [Workflow Management System](https://en.wikipedia.org/wiki/Workflow_management * [POST /api/workflows/:version/batch](#post-apiworkflowsversionbatch) * [GET /api/workflows/:version/query](#get-apiworkflowsversionquery) * [POST /api/workflows/:version/query](#post-apiworkflowsversionquery) + * [PATCH /api/workflows/:version/:id/labels](#patch-apiworkflowsversionidlabels) * [GET /api/workflows/:version/:id/status](#get-apiworkflowsversionidstatus) * [GET /api/workflows/:version/:id/outputs](#get-apiworkflowsversionidoutputs) * [GET /api/workflows/:version/:id/timing](#get-apiworkflowsversionidtiming) @@ -92,6 +93,7 @@ A [Workflow Management System](https://en.wikipedia.org/wiki/Workflow_management * [GET /api/workflows/:version/:id/metadata](#get-apiworkflowsversionidmetadata) * [POST /api/workflows/:version/:id/abort](#post-apiworkflowsversionidabort) * [GET /api/workflows/:version/backends](#get-apiworkflowsversionbackends) + * [GET /api/workflows/:version/callcaching/diff](#get-apiworkflowsversioncallcachingdiff) * [GET /api/engine/:version/stats](#get-apiengineversionstats) * [GET /api/engine/:version/version](#get-apiengineversionversion) * [Error handling](#error-handling) @@ -361,7 +363,7 @@ system { Or, via `-Dsystem.abort-jobs-on-terminate=true` command line option. -By default, this value is false when running `java -jar cromwell.jar server`, and true when running `java -jar cromwell.jar run `. +By default, this value is false when running `java -jar cromwell.jar server`, and true when running `java -jar cromwell.jar run `. # Security @@ -477,7 +479,7 @@ When Cromwell runs a workflow, it first creates a directory `//call-`. This is the ``. For example, having a `stdout` and `stderr` file is common among both backends and they both write a shell script file to the `` as well. See the descriptions below for details about backend-specific files that are written to these directories. -An example of a workflow output directory for a three-step WDL file might look like this: +An example of a workflow output directory for a three-step workflow might look like this: ``` cromwell-executions/ @@ -841,7 +843,7 @@ backend { TES { actor-factory = "cromwell.backend.impl.tes.TesBackendLifecycleActorFactory" config { - endpoint = "https:///v1/jobs" + endpoint = "https:///v1/tasks" root = "cromwell-executions" dockerRoot = "/cromwell-executions" concurrent-job-limit = 1000 @@ -859,16 +861,13 @@ This backend supports the following optional runtime attributes / workflow optio * docker: Docker image to use such as "Ubuntu". * dockerWorkingDir: defines the working directory in the container. -Outputs: -It will use `dockerOutputDir` runtime attribute / workflow option to resolve the folder in which the execution results will placed. If there is no `dockerWorkingDir` defined it will use `/cromwell-executions//call-/execution`. - ### CPU, Memory and Disk This backend supports CPU, memory and disk size configuration through the use of the following runtime attributes / workflow options: -* cpu: defines the amount of CPU to use. Default value: 1. Type: Integer. Ex: 4. -* memory: defines the amount of memory to use. Default value: "2 GB". Type: String. Ex: "4 GB" or "4096 MB" -* disk: defines the amount of disk to use. Default value: "2 GB". Type: String. Ex: "1 GB" or "1024 MB" +* cpu: defines the amount of CPU to use. Type: Integer. Ex: 4. +* memory: defines the amount of memory to use. Type: String. Ex: "4 GB" or "4096 MB" +* disk: defines the amount of disk to use. Type: String. Ex: "1 GB" or "1024 MB" -It they are not set, the TES backend will use default values. +If they are not set, the TES backend may use default values. ## Sun GridEngine Backend @@ -1230,7 +1229,7 @@ nativeSpecs attribute needs to be specified as String. ## Spark Backend -This backend adds support for execution of spark jobs in a workflow using the existing wdl format. +This backend adds support for execution of spark jobs in a workflow. It supports the following Spark deploy modes: @@ -1304,7 +1303,7 @@ Supported runtime attributes for a Spark Job is as follows: * appMainClass ( Spark app/job entry point) * numberOfExecutors ( Specific to cluster deploy mode) -Sample usage : +Sample usage: ```wdl task sparkjob_with_yarn_cluster { @@ -1330,8 +1329,8 @@ Supported File Systems as follows: * Network File System * Distributed file system -### Sample Wdl -Next, create a Wdl, and it's json input like so: +### Sample WDL +Next, create a WDL, and its json input like so: ```wdl task sparkjob_with_yarn_cluster { @@ -1685,16 +1684,15 @@ Valid keys and their meanings: * **google_project** - (JES backend only) Specifies which google project to execute this workflow. * **refresh_token** - (JES backend only) Only used if `localizeWithRefreshToken` is specified in the [configuration file](#configuring-cromwell). * **auth_bucket** - (JES backend only) defaults to the the value in **jes_gcs_root**. This should represent a GCS URL that only Cromwell can write to. The Cromwell account is determined by the `google.authScheme` (and the corresponding `google.userAuth` and `google.serviceAuth`) - * **monitoring_script** - (JES backend only) Specifies a GCS URL to a script that will be invoked prior to the WDL command being run. For example, if the value for monitoring_script is "gs://bucket/script.sh", it will be invoked as `./script.sh > monitoring.log &`. The value `monitoring.log` file will be automatically de-localized. + * **monitoring_script** - (JES backend only) Specifies a GCS URL to a script that will be invoked prior to the user command being run. For example, if the value for monitoring_script is "gs://bucket/script.sh", it will be invoked as `./script.sh > monitoring.log &`. The value `monitoring.log` file will be automatically de-localized. # Labels -Every call in Cromwell is labelled by Cromwell so that it can be queried about later. The current label set automatically applied is: +Every call run on the JES backend is given certain labels by default, so that Google resources can be queried by these labels later. The current default label set automatically applied is: | Key | Value | Example | Notes | |-----|-------|---------|-------| | cromwell-workflow-id | The Cromwell ID given to the root workflow (i.e. the ID returned by Cromwell on submission) | cromwell-d4b412c5-bf3d-4169-91b0-1b635ce47a26 | To fit the required [format](#label-format), we prefix with 'cromwell-' | -| cromwell-workflow-name | The name of the root workflow | my-root-workflow | | | cromwell-sub-workflow-name | The name of this job's sub-workflow | my-sub-workflow | Only present if the task is called in a subworkflow. | | wdl-task-name | The name of the WDL task | my-task | | | wdl-call-alias | The alias of the WDL call that created this job | my-task-1 | Only present if the task was called with an alias. | @@ -1712,10 +1710,15 @@ Custom labels can also be applied to every call in a workflow by specifying a cu ## Label Format -To fit in with the Google schema for labels, label key and value strings must match the regex `[a-z]([-a-z0-9]*[a-z0-9])?` and be between 1 and 63 characters in length. - -For custom labels, Cromwell will reject any request which is made containing invalid label strings. For automatically applied labels, Cromwell will modify workflow/task/call names to fit the schema, according to the following rules: +When labels are supplied to Cromwell, it will fail any request containing invalid label strings. Below are the requirements for a valid label key/value pair in Cromwell: +- Label keys and values can't contain characters other than `[a-z]`, `[0-9]` or `-`. +- Label keys must start with `[a-z]` and end with `[a-z]` or `[0-9]`. +- Label values must start and end with `[a-z]` or `[0-9]`. +- Label keys may not be empty but label values may be empty. +- Label key and values have a max char limit of 63. +Google has a different schema for labels, where label key and value strings must match the regex `[a-z]([-a-z0-9]*[a-z0-9])?` and be no more than 63 characters in length. +For automatically applied labels, Cromwell will modify workflow/task/call names to fit the schema, according to the following rules: - Any capital letters are lowercased. - Any character which is not one of `[a-z]`, `[0-9]` or `-` will be replaced with `-`. - If the start character does not match `[a-z]` then prefix with `x--` @@ -1760,12 +1763,12 @@ Cromwell also accepts two [workflow option](#workflow-options) related to call c Docker tags are a convenient way to point to a version of an image (ubuntu:14.04), or even the latest version (ubuntu:latest). For that purpose, tags are mutable, meaning that the image they point to can change, while the tag name stays the same. -While this is very convenient in some cases, using mutable, or "floating" tags in WDL affects the reproducibility of the WDL file: the same WDL using "ubuntu:latest" run now, and a year, or even a month from now will actually run with different docker images. +While this is very convenient in some cases, using mutable, or "floating" tags in tasks affects the reproducibility of a workflow: the same workflow using "ubuntu:latest" run now, and a year, or even a month from now will actually run with different docker images. This has an even bigger impact when Call Caching is turned on in Cromwell, and could lead to unpredictable behaviors if a tag is updated in the middle of a workflow or even a scatter for example. Docker provides another way of identifying an image version, using the specific digest of the image. The digest is guaranteed to be different if 2 images have different byte content. For more information see https://docs.docker.com/registry/spec/api/#/content-digests A docker image with digest can be referenced as follows : **ubuntu@sha256:71cd81252a3563a03ad8daee81047b62ab5d892ebbfbf71cf53415f29c130950** The above image refers to a specific image of ubuntu, that does not depend on a floating tag. -A WDL containing this Docker image run now and a year from now will run in the exact same container. +A workflow containing this Docker image run now and a year from now will run in the exact same container. In order to remove unpredictable behaviors, Cromwell takes the following approach regarding floating docker tags. @@ -1837,7 +1840,7 @@ When running a job on the Config (Shared Filesystem) backend, Cromwell provides ``` # Imports -Import statements inside of a WDL file are supported by Cromwell when running in Server mode as well as Single Workflow Runner Mode. +Import statements inside of a workflow file are supported by Cromwell when running in Server mode as well as Single Workflow Runner Mode. In Single Workflow Runner Mode, you pass in a zip file which includes the WDL files referenced by the import statements. Cromwell requires the zip file to be passed in as a command line argument, as explained by the section [run](#run). @@ -1846,7 +1849,7 @@ For example, given a workflow `wf.wdl` and an imports directory `WdlImports.zip` java -jar cromwell.jar wf.wdl wf.inputs - - WdlImports.zip ``` -In Server Mode, you pass in a zip file using the parameter `wdlDependencies` via the [POST /api/workflows/:version](#post-apiworkflowsversion) endpoint. +In Server Mode, you pass in a zip file using the parameter `workflowDependencies` via the [POST /api/workflows/:version](#post-apiworkflowsversion) endpoint. # Sub Workflows @@ -2307,7 +2310,7 @@ It's also possible to set the URL query parameter `expandSubWorkflows` to `true` # REST API -The `server` subcommand on the executable JAR will start an HTTP server which can accept WDL files to run as well as check status and output of existing workflows. +The `server` subcommand on the executable JAR will start an HTTP server which can accept workflow files to run as well as check status and output of existing workflows. The following sub-sections define which HTTP Requests the web server can accept and what they will return. Example HTTP requests are given in [HTTPie](https://github.com/jkbrzt/httpie) and [cURL](https://curl.haxx.se/) @@ -2319,12 +2322,12 @@ All web server requests include an API version in the url. The current version i This endpoint accepts a POST request with a `multipart/form-data` encoded body. The form fields that may be included are: -* `wdlSource` - *Required* Contains the WDL file to submit for execution. -* `workflowInputs` - *Optional* JSON file containing the inputs. A skeleton file can be generated from [wdltool](https://github.com/broadinstitute/wdltool) using the "inputs" subcommand. +* `workflowSource` - *Required* Contains the workflow source file to submit for execution. +* `workflowInputs` - *Optional* JSON file containing the inputs. For WDL workflows a skeleton file can be generated from [wdltool](https://github.com/broadinstitute/wdltool) using the "inputs" subcommand. * `workflowInputs_n` - *Optional* Where `n` is an integer. JSON file containing the 'n'th set of auxiliary inputs. * `workflowOptions` - *Optional* JSON file containing options for this workflow execution. See the [run](#run) CLI sub-command for some more information about this. * `customLabels` - *Optional* JSON file containing a set of custom labels to apply to this workflow. See [Labels](#labels) for the expected format. -* `wdlDependencies` - *Optional* ZIP file containing WDL files that are used to resolve import statements. +* `workflowDependencies` - *Optional* ZIP file containing workflow source files that are used to resolve import statements. Regarding the workflowInputs parameter, in case of key conflicts between multiple input JSON files, higher values of x in workflowInputs_x override lower values. For example, an input specified in workflowInputs_3 will override an input with the same name in workflowInputs or workflowInputs_2. Similarly, an input key specified in workflowInputs_5 will override an identical input key in any other input file. @@ -2335,13 +2338,13 @@ Additionally, although Swagger has a limit of 5 JSON input files, the REST endpo cURL: ``` -$ curl -v "localhost:8000/api/workflows/v1" -F wdlSource=@src/main/resources/3step.wdl -F workflowInputs=@test.json +$ curl -v "localhost:8000/api/workflows/v1" -F workflowSource=@src/main/resources/3step.wdl -F workflowInputs=@test.json ``` HTTPie: ``` -$ http --print=hbHB --form POST localhost:8000/api/workflows/v1 wdlSource=@src/main/resources/3step.wdl workflowInputs@inputs.json +$ http --print=hbHB --form POST localhost:8000/api/workflows/v1 workflowSource=@src/main/resources/3step.wdl workflowInputs@inputs.json ``` Request: @@ -2357,7 +2360,7 @@ Host: localhost:8000 User-Agent: HTTPie/0.9.2 --64128d499e9e4616adea7d281f695dca -Content-Disposition: form-data; name="wdlSource" +Content-Disposition: form-data; name="workflowSource" task ps { command { @@ -2427,13 +2430,13 @@ To specify workflow options as well: cURL: ``` -$ curl -v "localhost:8000/api/workflows/v1" -F wdlSource=@wdl/jes0.wdl -F workflowInputs=@wdl/jes0.json -F workflowOptions=@options.json +$ curl -v "localhost:8000/api/workflows/v1" -F workflowSource=@wdl/jes0.wdl -F workflowInputs=@wdl/jes0.json -F workflowOptions=@options.json ``` HTTPie: ``` -http --print=HBhb --form POST http://localhost:8000/api/workflows/v1 wdlSource=@wdl/jes0.wdl workflowInputs@wdl/jes0.json workflowOptions@options.json +http --print=HBhb --form POST http://localhost:8000/api/workflows/v1 workflowSource=@wdl/jes0.wdl workflowInputs@wdl/jes0.json workflowOptions@options.json ``` Request (some parts truncated for brevity): @@ -2449,7 +2452,7 @@ Host: localhost:8000 User-Agent: HTTPie/0.9.2 --f3fd038395644de596c460257626edd7 -Content-Disposition: form-data; name="wdlSource" +Content-Disposition: form-data; name="workflowSource" task x { ... } task y { ... } @@ -2485,28 +2488,28 @@ Content-Disposition: form-data; name="workflowOptions"; filename="options.json" This endpoint accepts a POST request with a `multipart/form-data` encoded body. The form fields that may be included are: -* `wdlSource` - *Required* Contains the WDL file to submit for +* `workflowSource` - *Required* Contains the workflow source file to submit for execution. * `workflowInputs` - *Required* JSON file containing the inputs in a -JSON array. A skeleton file for a single inputs json element can be +JSON array. For WDL workflows a skeleton file for a single inputs json element can be generated from [wdltool](https://github.com/broadinstitute/wdltool) using the "inputs" subcommand. The orderded endpoint responses will contain one workflow submission response for each input, respectively. * `workflowOptions` - *Optional* JSON file containing options for this workflow execution. See the [run](#run) CLI sub-command for some more information about this. -* `wdlDependencies` - *Optional* ZIP file containing WDL files that are used to resolve import statements. Applied equally to all workflowInput sets. +* `workflowDependencies` - *Optional* ZIP file containing workflow source files that are used to resolve import statements. Applied equally to all workflowInput sets. cURL: ``` -$ curl -v "localhost:8000/api/workflows/v1/batch" -F wdlSource=@src/main/resources/3step.wdl -F workflowInputs=@test_array.json +$ curl -v "localhost:8000/api/workflows/v1/batch" -F workflowSource=@src/main/resources/3step.wdl -F workflowInputs=@test_array.json ``` HTTPie: ``` -$ http --print=hbHB --form POST localhost:8000/api/workflows/v1/batch wdlSource=@src/main/resources/3step.wdl workflowInputs@inputs_array.json +$ http --print=hbHB --form POST localhost:8000/api/workflows/v1/batch workflowSource=@src/main/resources/3step.wdl workflowInputs@inputs_array.json ``` Request: @@ -2522,7 +2525,7 @@ Host: localhost:8000 User-Agent: HTTPie/0.9.2 --64128d499e9e4616adea7d281f695dcb -Content-Disposition: form-data; name="wdlSource" +Content-Disposition: form-data; name="workflowSource" task ps { command { @@ -2603,13 +2606,13 @@ To specify workflow options as well: cURL: ``` -$ curl -v "localhost:8000/api/workflows/v1/batch" -F wdlSource=@wdl/jes0.wdl -F workflowInputs=@wdl/jes0_array.json -F workflowOptions=@options.json +$ curl -v "localhost:8000/api/workflows/v1/batch" -F workflowSource=@wdl/jes0.wdl -F workflowInputs=@wdl/jes0_array.json -F workflowOptions=@options.json ``` HTTPie: ``` -http --print=HBhb --form POST http://localhost:8000/api/workflows/v1/batch wdlSource=@wdl/jes0.wdl workflowInputs@wdl/jes0_array.json workflowOptions@options.json +http --print=HBhb --form POST http://localhost:8000/api/workflows/v1/batch workflowSource=@wdl/jes0.wdl workflowInputs@wdl/jes0_array.json workflowOptions@options.json ``` Request (some parts truncated for brevity): @@ -2625,7 +2628,7 @@ Host: localhost:8000 User-Agent: HTTPie/0.9.2 --f3fd038395644de596c460257626edd8 -Content-Disposition: form-data; name="wdlSource" +Content-Disposition: form-data; name="workflowSource" task x { ... } task y { ... } @@ -2855,6 +2858,37 @@ Server: spray-can/1.3.3 } ``` +## PATCH /api/workflows/:version/:id/labels + +This endpoint is used to update multiple labels for an existing workflow. When supplying a label with a key unique to the workflow submission, a new label key/value entry is appended to that workflow's metadata. When supplying a label with a key that is already associated to the workflow submission, the original label value is updated with the new value for that workflow's metadata. + +The [labels](#labels) must be a mapping of key/value pairs in JSON format that are sent via the PATCH body. The request content type must be +`application/json`. + +cURL: + +``` +$ curl -X PATCH --header "Content-Type: application/json" -d "{\"label-key-1\":\"label-value-1\", \"label-key-2\": \"label-value-2\"}" "http://localhost:8000/api/workflows/v1/c4c6339c-8cc9-47fb-acc5-b5cb8d2809f5/labels" +``` + +HTTPie: + +``` +$ echo '{"label-key-1":"label-value-1", "label-key-2": "label-value-2"}' | http PATCH "http://localhost:8000/api/workflows/v1/c4c6339c-8cc9-47fb-acc5-b5cb8d2809f5/labels" +``` + +Response: +``` +{ "id": "c4c6339c-8cc9-47fb-acc5-b5cb8d2809f5", + "labels": + { + "label-key-1": "label-value-1", + "label-key-2": "label-value-2" + } +} +``` + + ## GET /api/workflows/:version/:id/status cURL: @@ -3017,11 +3051,18 @@ Content-Type: application/json; charset=UTF-8 Content-Length: 7286 { "workflowName": "sc_test", + "submittedFiles": { + "inputs": "{}", + "workflow": "task do_prepare {\n File input_file\n command {\n split -l 1 ${input_file} temp_ && ls -1 temp_?? > files.list\n }\n output {\n Array[File] split_files = read_lines(\"files.list\")\n }\n}\n# count the number of words in the input file, writing the count to an output file overkill in this case, but simulates a real scatter-gather that would just return an Int (map)\ntask do_scatter {\n File input_file\n command {\n wc -w ${input_file} > output.txt\n }\n output {\n File count_file = \"output.txt\"\n }\n}\n# aggregate the results back together (reduce)\ntask do_gather {\n Array[File] input_files\n command <<<\n cat ${sep = ' ' input_files} | awk '{s+=$$1} END {print s}'\n >>>\n output {\n Int sum = read_int(stdout())\n }\n}\nworkflow sc_test {\n call do_prepare\n scatter(f in do_prepare.split_files) {\n call do_scatter {\n input: input_file = f\n }\n }\n call do_gather {\n input: input_files = do_scatter.count_file\n }\n}", + "options": "{\n\n}", + "workflowType": "WDL" + }, "calls": { "sc_test.do_prepare": [ { "executionStatus": "Done", "stdout": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_prepare/stdout", + "backendStatus": "Done", "shardIndex": -1, "outputs": { "split_files": [ @@ -3036,6 +3077,30 @@ Content-Length: 7286 "failOnStderr": "true", "continueOnReturnCode": "0" }, + "callCaching": { + "allowResultReuse": true, + "hit": false, + "result": "Cache Miss", + "hashes": { + "output count": "C4CA4238A0B923820DCC509A6F75849B", + "runtime attribute": { + "docker": "N/A", + "continueOnReturnCode": "CFCD208495D565EF66E7DFF9F98764DA", + "failOnStderr": "68934A3E9455FA72420237EB05902327" + }, + "output expression": { + "Array": "D856082E6599CF6EC9F7F42013A2EC4C" + }, + "input count": "C4CA4238A0B923820DCC509A6F75849B", + "backend name": "509820290D57F333403F490DDE7316F4", + "command template": "9F5F1F24810FACDF917906BA4EBA807D", + "input": { + "File input_file": "11fa6d7ed15b42f2f73a455bf5864b49" + } + }, + "effectiveCallCachingMode": "ReadAndWriteCache" + }, + "jobId": "34479", "returnCode": 0, "backend": "Local", "end": "2016-02-04T13:47:56.000-05:00", @@ -3049,15 +3114,40 @@ Content-Length: 7286 { "executionStatus": "Preempted", "stdout": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-0/stdout", + "backendStatus": "Preempted", "shardIndex": 0, "outputs": {}, "runtimeAttributes": { "failOnStderr": "true", "continueOnReturnCode": "0" }, + "callCaching": { + "allowResultReuse": true, + "hit": false, + "result": "Cache Miss", + "hashes": { + "output count": "C4CA4238A0B923820DCC509A6F75849B", + "runtime attribute": { + "docker": "N/A", + "continueOnReturnCode": "CFCD208495D565EF66E7DFF9F98764DA", + "failOnStderr": "68934A3E9455FA72420237EB05902327" + }, + "output expression": { + "File count_file": "EF1B47FFA9990E8D058D177073939DF7" + }, + "input count": "C4CA4238A0B923820DCC509A6F75849B", + "backend name": "509820290D57F333403F490DDE7316F4", + "command template": "FD00A1B0AB6A0C97B0737C83F179DDE7", + "input": { + "File input_file": "a53794d214dc5dedbcecdf827bf683a2" + } + }, + "effectiveCallCachingMode": "ReadAndWriteCache" + }, "inputs": { "input_file": "f" }, + "jobId": "34496", "backend": "Local", "end": "2016-02-04T13:47:56.000-05:00", "stderr": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-0/stderr", @@ -3068,6 +3158,7 @@ Content-Length: 7286 { "executionStatus": "Done", "stdout": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-0/attempt-2/stdout", + "backendStatus": "Done", "shardIndex": 0, "outputs": { "count_file": "/home/jdoe/cromwell/cromwell-test-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-0/attempt-2/output.txt" @@ -3076,10 +3167,34 @@ Content-Length: 7286 "failOnStderr": "true", "continueOnReturnCode": "0" }, + "callCaching": { + "allowResultReuse": true, + "hit": false, + "result": "Cache Miss", + "hashes": { + "output count": "C4CA4238A0B923820DCC509A6F75849B", + "runtime attribute": { + "docker": "N/A", + "continueOnReturnCode": "CFCD208495D565EF66E7DFF9F98764DA", + "failOnStderr": "68934A3E9455FA72420237EB05902327" + }, + "output expression": { + "File count_file": "EF1B47FFA9990E8D058D177073939DF7" + }, + "input count": "C4CA4238A0B923820DCC509A6F75849B", + "backend name": "509820290D57F333403F490DDE7316F4", + "command template": "FD00A1B0AB6A0C97B0737C83F179DDE7", + "input": { + "File input_file": "a53794d214dc5dedbcecdf827bf683a2" + } + }, + "effectiveCallCachingMode": "ReadAndWriteCache" + }, "inputs": { "input_file": "f" }, "returnCode": 0, + "jobId": "34965", "end": "2016-02-04T13:47:56.000-05:00", "stderr": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-0/attempt-2/stderr", "attempt": 2, @@ -3089,6 +3204,7 @@ Content-Length: 7286 { "executionStatus": "Done", "stdout": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-1/stdout", + "backendStatus": "Done", "shardIndex": 1, "outputs": { "count_file": "/home/jdoe/cromwell/cromwell-test-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-1/output.txt" @@ -3097,10 +3213,34 @@ Content-Length: 7286 "failOnStderr": "true", "continueOnReturnCode": "0" }, + "callCaching": { + "allowResultReuse": true, + "hit": false, + "result": "Cache Miss", + "hashes": { + "output count": "C4CA4238A0B923820DCC509A6F75849B", + "runtime attribute": { + "docker": "N/A", + "continueOnReturnCode": "CFCD208495D565EF66E7DFF9F98764DA", + "failOnStderr": "68934A3E9455FA72420237EB05902327" + }, + "output expression": { + "File count_file": "EF1B47FFA9990E8D058D177073939DF7" + }, + "input count": "C4CA4238A0B923820DCC509A6F75849B", + "backend name": "509820290D57F333403F490DDE7316F4", + "command template": "FD00A1B0AB6A0C97B0737C83F179DDE7", + "input": { + "File input_file": "d3410ade53df34c78488544285cf743c" + } + }, + "effectiveCallCachingMode": "ReadAndWriteCache" + }, "inputs": { "input_file": "f" }, "returnCode": 0, + "jobId": "34495", "backend": "Local", "end": "2016-02-04T13:47:56.000-05:00", "stderr": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-1/stderr", @@ -3113,6 +3253,7 @@ Content-Length: 7286 { "executionStatus": "Done", "stdout": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_gather/stdout", + "backendStatus": "Done", "shardIndex": -1, "outputs": { "sum": 12 @@ -3121,6 +3262,29 @@ Content-Length: 7286 "failOnStderr": "true", "continueOnReturnCode": "0" }, + "callCaching": { + "allowResultReuse": true, + "hit": false, + "result": "Cache Miss", + "hashes": { + "output count": "C4CA4238A0B923820DCC509A6F75849B", + "runtime attribute": { + "docker": "N/A", + "continueOnReturnCode": "CFCD208495D565EF66E7DFF9F98764DA", + "failOnStderr": "68934A3E9455FA72420237EB05902327" + }, + "output expression": { + "File count_file": "EF1B47FFA9990E8D058D177073939DF7" + }, + "input count": "C4CA4238A0B923820DCC509A6F75849B", + "backend name": "509820290D57F333403F490DDE7316F4", + "command template": "FD00A1B0AB6A0C97B0737C83F179DDE7", + "input": { + "File input_file": "e0ef752ab4824939d7947f6012b7c141" + } + }, + "effectiveCallCachingMode": "ReadAndWriteCache" + }, "inputs": { "input_files": [ "/home/jdoe/cromwell/cromwell-test-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_scatter/shard-0/attempt-2/output.txt", @@ -3128,6 +3292,7 @@ Content-Length: 7286 ] }, "returnCode": 0, + "jobId": "34494", "backend": "Local", "end": "2016-02-04T13:47:57.000-05:00", "stderr": "/home/jdoe/cromwell/cromwell-executions/sc_test/8e592ed8-ebe5-4be0-8dcb-4073a41fe180/call-do_gather/stderr", @@ -3152,6 +3317,10 @@ Content-Length: 7286 "inputs": { "sc_test.do_prepare.input_file": "/home/jdoe/cromwell/11.txt" }, + "labels": { + "cromwell-workflow-name": "sc_test", + "cromwell-workflow-id": "cromwell-17633f21-11a9-414f-a95b-2e21431bd67d" + }, "submission": "2016-02-04T13:47:55.000-05:00", "status": "Succeeded", "end": "2016-02-04T13:47:57.000-05:00", @@ -3326,6 +3495,176 @@ Server: spray-can/1.3.3 } ``` +## GET /api/workflows/:version/callcaching/diff + +**Disclaimer**: This endpoint depends on hash values being published to the metadata, which only happens as of Cromwell 28. +Workflows run with prior versions of Cromwell cannot be used with this endpoint. +A `404 NotFound` will be returned when trying to use this endpoint if either workflow has been run on a prior version. + +This endpoint returns the hash differences between 2 *completed* (successfully or not) calls. +The following query parameters are supported: + +| Parameter | Description | Required | +|:---------:|:-----------------------------------------------------------------------------------------:|:--------:| +| workflowA | Workflow ID of the first call | yes | +| callA | Fully qualified name of the first call. **Including workflow name**. (see example below) | yes | +| indexA | Shard index of the first call | depends | +| workflowB | Workflow ID of the second call | yes | +| callB | Fully qualified name of the second call. **Including workflow name**. (see example below) | yes | +| indexB | Shard index of the second call | depends | + +About the `indexX` parameters: It is required if the call was in a scatter. Otherwise it should *not* be specified. +If an index parameter is wrongly specified, the call will not be found and the request will result in a 404 response. + +cURL: + +``` +$ curl "http://localhost:8000/api/workflows/v1/callcaching/diff?workflowA=85174842-4a44-4355-a3a9-3a711ce556f1&callA=wf_hello.hello&workflowB=7479f8a8-efa4-46e4-af0d-802addc66e5d&callB=wf_hello.hello" +``` + +HTTPie: + +``` +$ http "http://localhost:8000/api/workflows/v1/callcaching/diff?workflowA=85174842-4a44-4355-a3a9-3a711ce556f1&callA=wf_hello.hello&workflowB=7479f8a8-efa4-46e4-af0d-802addc66e5d&callB=wf_hello.hello" +``` + +Response: +``` +HTTP/1.1 200 OK +Content-Length: 1274 +Content-Type: application/json; charset=UTF-8 +Date: Tue, 06 Jun 2017 16:44:33 GMT +Server: spray-can/1.3.3 + +{ + "callA": { + "executionStatus": "Done", + "workflowId": "85174842-4a44-4355-a3a9-3a711ce556f1", + "callFqn": "wf_hello.hello", + "jobIndex": -1, + "allowResultReuse": true + }, + "callB": { + "executionStatus": "Done", + "workflowId": "7479f8a8-efa4-46e4-af0d-802addc66e5d", + "callFqn": "wf_hello.hello", + "jobIndex": -1, + "allowResultReuse": true + }, + "hashDifferential": [ + { + "command template": { + "callA": "4EAADE3CD5D558C5A6CFA4FD101A1486", + "callB": "3C7A0CA3D7A863A486DBF3F7005D4C95" + } + }, + { + "input count": { + "callA": "C4CA4238A0B923820DCC509A6F75849B", + "callB": "C81E728D9D4C2F636F067F89CC14862C" + } + }, + { + "input: String addressee": { + "callA": "D4CC65CB9B5F22D8A762532CED87FE8D", + "callB": "7235E005510D99CB4D5988B21AC97B6D" + } + }, + { + "input: String addressee2": { + "callA": "116C7E36B4AE3EAFD07FA4C536CE092F", + "callB": null + } + } + ] +} +``` + +The response is a JSON object with 3 fields: + +- `callA` reports information about the first call, including its `allowResultReuse` value that will be used to determine whether or not this call can be cached to. +- `callB` reports information about the second call, including its `allowResultReuse` value that will be used to determine whether or not this call can be cached to. +- `hashDifferential` is an array in which each element represents a difference between the hashes of `callA` and `callB`. + +*If this array is empty, `callA` and `callB` have the same hashes*. + +Differences can be of 3 kinds: + +- `callA` and `callB` both have the same hash key but their values are different. +For instance, in the example above, + +```json +"input: String addressee": { + "callA": "D4CC65CB9B5F22D8A762532CED87FE8D", + "callB": "7235E005510D99CB4D5988B21AC97B6D" +} +``` + +indicates that both `callA` and `callB` have a `String` input called `addressee`, but different values were used at runtime, resulting in different MD5 hashes. + +- `callA` has a hash key that `callB` doesn't have +For instance, in the example above, + +```json +"input: String addressee2": { + "callA": "116C7E36B4AE3EAFD07FA4C536CE092F", + "callB": null +} +``` + +indicates that `callA` has a `String` input called `addressee2` that doesn't exist in `callB`. For that reason the value of the second field is `null`. + +- `callB` has a hash key that `callA` doesn't have. This is the same case as above but reversed. + +If no cache entry for `callA` or `callB` can be found, the response will be in the following format: + +``` +HTTP/1.1 404 NotFound +Content-Length: 178 +Content-Type: application/json; charset=UTF-8 +Date: Tue, 06 Jun 2017 17:02:15 GMT +Server: spray-can/1.3.3 + +{ + "status": "error", + "message": "Cannot find a cache entry for 479f8a8-efa4-46e4-af0d-802addc66e5d:wf_hello.hello:-1" +} +``` + +If neither `callA` nor `callB` can be found, the response will be in the following format: + + +``` +HTTP/1.1 404 NotFound +Content-Length: 178 +Content-Type: application/json; charset=UTF-8 +Date: Tue, 06 Jun 2017 17:02:15 GMT +Server: spray-can/1.3.3 + +{ + "status": "error", + "message": "Cannot find cache entries for 5174842-4a44-4355-a3a9-3a711ce556f1:wf_hello.hello:-1, 479f8a8-efa4-46e4-af0d-802addc66e5d:wf_hello.hello:-1" +} +``` + +If the query is malformed and required parameters are missing, the response will be in the following format: + +``` +HTTP/1.1 400 BadRequest +Content-Length: 178 +Content-Type: application/json; charset=UTF-8 +Date: Tue, 06 Jun 2017 17:02:15 GMT +Server: spray-can/1.3.3 +{ + "status": "fail", + "message": "Wrong parameters for call cache diff query:\nmissing workflowA query parameter\nmissing callB query parameter", + "errors": [ + "missing workflowA query parameter", + "missing callB query parameter" + ] +} +``` + ## GET /api/engine/:version/stats This endpoint returns some basic statistics on the current state of the engine. At the moment that includes the number of running workflows and the number of active jobs. @@ -3379,8 +3718,6 @@ Response: } ``` - - ## Error handling Requests that Cromwell can't process return a failure in the form of a JSON response respecting the following JSON schema: diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index 1446ce92b..c33c1733f 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -637,7 +637,9 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta val stderrSizeAndReturnCode = for { returnCodeAsString <- contentAsStringAsync(jobPaths.returnCode) - stderrSize <- sizeAsync(jobPaths.stderr) + // Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that + // may fail due to race conditions on quickly-executing jobs. + stderrSize <- if (failOnStdErr) sizeAsync(jobPaths.stderr) else Future.successful(0L) } yield (stderrSize, returnCodeAsString) stderrSizeAndReturnCode flatMap { diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala index 89e3c48b2..73dd47925 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala @@ -47,18 +47,18 @@ class StandardInitializationActor(val standardParams: StandardInitializationActo override lazy val calls: Set[TaskCall] = standardParams.calls override def beforeAll(): Future[Option[BackendInitializationData]] = { - Future.fromTry(Try(Option(initializationData))) + initializationData map Option.apply } - lazy val initializationData: StandardInitializationData = - new StandardInitializationData(workflowPaths, runtimeAttributesBuilder, classOf[StandardExpressionFunctions]) + lazy val initializationData: Future[StandardInitializationData] = + workflowPaths map { new StandardInitializationData(_, runtimeAttributesBuilder, classOf[StandardExpressionFunctions]) } lazy val expressionFunctions: Class[_ <: StandardExpressionFunctions] = classOf[StandardExpressionFunctions] - lazy val pathBuilders: List[PathBuilder] = List(DefaultPathBuilder) + lazy val pathBuilders: Future[List[PathBuilder]] = Future.successful(List(DefaultPathBuilder)) - lazy val workflowPaths: WorkflowPaths = - WorkflowPathBuilder.workflowPaths(configurationDescriptor, workflowDescriptor, pathBuilders) + lazy val workflowPaths: Future[WorkflowPaths] = + pathBuilders map { WorkflowPathBuilder.workflowPaths(configurationDescriptor, workflowDescriptor, _) } /** * Returns the runtime attribute builder for this backend. diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala index 700c710e0..588028967 100644 --- a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala @@ -21,7 +21,6 @@ import cromwell.core.path.{Path, PathCopier} import cromwell.core.simpleton.{WdlValueBuilder, WdlValueSimpleton} import wdl4s.values.WdlFile -import scala.language.postfixOps import scala.util.{Failure, Success, Try} /** @@ -55,15 +54,52 @@ object StandardCacheHitCopyingActor { sealed trait StandardCacheHitCopyingActorState case object Idle extends StandardCacheHitCopyingActorState - case object WaitingForCopyResponses extends StandardCacheHitCopyingActorState + case object WaitingForIoResponses extends StandardCacheHitCopyingActorState + case object FailedState extends StandardCacheHitCopyingActorState + case object WaitingForOnSuccessResponse extends StandardCacheHitCopyingActorState - case class StandardCacheHitCopyingActorData(copyCommandsToWaitFor: Set[IoCopyCommand], - copiedJobOutputs: CallOutputs, - copiedDetritus: DetritusMap, + // TODO: this mechanism here is very close to the one in CallCacheHashingJobActorData + // Abstracting it might be valuable + /** + * The head subset of commandsToWaitFor is sent to the IoActor as a bulk. + * When a response comes back, the corresponding command is removed from the head set. + * When the head set is empty, it is removed and the next subset is sent, until there is no subset left. + * If at any point a response comes back as a failure. Other responses for the current set will be awaited for + * but subsequent sets will not be sent and the actor will send back a failure message. + */ + case class StandardCacheHitCopyingActorData(commandsToWaitFor: List[Set[IoCommand[_]]], + newJobOutputs: CallOutputs, + newDetritus: DetritusMap, returnCode: Option[Int] ) { - def remove(copyCommand: IoCopyCommand) = copy(copyCommandsToWaitFor = copyCommandsToWaitFor filterNot { _ == copyCommand }) + + /** + * Removes the command from commandsToWaitFor + * returns a pair of the new state data and CommandSetState giving information about what to do next + */ + def commandComplete(command: IoCommand[_]): (StandardCacheHitCopyingActorData, CommandSetState) = commandsToWaitFor match { + // If everything was already done send back current data and AllCommandsDone + case Nil => (this, AllCommandsDone) + case lastSubset :: Nil => + val updatedSubset = lastSubset - command + // If the last subset is now empty, we're done + if (updatedSubset.isEmpty) (this.copy(commandsToWaitFor = List.empty), AllCommandsDone) + // otherwise update commandsToWaitFor and keep waiting + else (this.copy(commandsToWaitFor = List(updatedSubset)), StillWaiting) + case currentSubset :: otherSubsets => + val updatedSubset = currentSubset - command + // This subset is done but there are other ones, remove it from commandsToWaitFor and return the next round of commands + if (updatedSubset.isEmpty) (this.copy(commandsToWaitFor = otherSubsets), NextSubSet(otherSubsets.head)) + // otherwise update the head susbset and keep waiting + else (this.copy(commandsToWaitFor = List(updatedSubset) ++ otherSubsets), StillWaiting) + } } + + // Internal ADT to keep track of command set states + private[callcaching] sealed trait CommandSetState + private[callcaching] case object StillWaiting extends CommandSetState + private[callcaching] case object AllCommandsDone extends CommandSetState + private[callcaching] case class NextSubSet(commands: Set[IoCommand[_]]) extends CommandSetState } class DefaultStandardCacheHitCopyingActor(standardParams: StandardCacheHitCopyingActorParams) extends StandardCacheHitCopyingActor(standardParams) with DefaultIoCommandBuilder @@ -92,37 +128,83 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit when(Idle) { case Event(CopyOutputsCommand(simpletons, jobDetritus, returnCode), None) => - val sourceCallRootPath = lookupSourceCallRootPath(jobDetritus) - - val processed = for { - (callOutputs, simpletonCopyPairs) <- processSimpletons(simpletons, sourceCallRootPath) - (destinationDetritus, detritusCopyPairs) <- processDetritus(jobDetritus) - } yield (callOutputs, destinationDetritus, simpletonCopyPairs ++ detritusCopyPairs) - - processed match { - case Success((callOutputs, destinationDetritus, allCopyPairs)) => - duplicate(allCopyPairs) match { - case Some(Success(_)) => succeedAndStop(returnCode, callOutputs, destinationDetritus) - case Some(Failure(failure)) => failAndStop(failure) - case None => - val allCopyCommands = allCopyPairs map { case (source, destination) => copyCommand(source, destination, overwrite = true) } - - allCopyCommands foreach { sendIoCommand(_) } - goto(WaitingForCopyResponses) using Option(StandardCacheHitCopyingActorData(allCopyCommands, callOutputs, destinationDetritus, returnCode)) + // Try to make a Path of the callRootPath from the detritus + lookupSourceCallRootPath(jobDetritus) match { + case Success(sourceCallRootPath) => + + // process simpletons and detritus to get updated paths and corresponding IoCommands + val processed = for { + (destinationCallOutputs, simpletonIoCommands) <- processSimpletons(simpletons, sourceCallRootPath) + (destinationDetritus, detritusIoCommands) <- processDetritus(jobDetritus) + } yield (destinationCallOutputs, destinationDetritus, simpletonIoCommands ++ detritusIoCommands) + + processed match { + case Success((destinationCallOutputs, destinationDetritus, detritusAndOutputsIoCommands)) => + duplicate(ioCommandsToCopyPairs(detritusAndOutputsIoCommands)) match { + // Use the duplicate override if exists + case Some(Success(_)) => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus) + case Some(Failure(failure)) => failAndStop(failure) + // Otherwise send the first round of IoCommands (file outputs and detritus) if any + case None if detritusAndOutputsIoCommands.nonEmpty => + detritusAndOutputsIoCommands foreach sendIoCommand + + // Add potential additional commands to the list + val additionalCommands = additionalIoCommands(sourceCallRootPath, simpletons, destinationCallOutputs, jobDetritus, destinationDetritus) + val allCommands = List(detritusAndOutputsIoCommands) ++ additionalCommands + + goto(WaitingForIoResponses) using Option(StandardCacheHitCopyingActorData(allCommands, destinationCallOutputs, destinationDetritus, returnCode)) + case _ => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus) + } + + case Failure(failure) => failAndStop(failure) } case Failure(failure) => failAndStop(failure) } } - when(WaitingForCopyResponses) { - case Event(IoSuccess(copyCommand: IoCopyCommand, _), Some(data)) => - val newData = data.remove(copyCommand) - if (newData.copyCommandsToWaitFor.isEmpty) succeedAndStop(data.returnCode, data.copiedJobOutputs, data.copiedDetritus) - else stay() using Option(newData) - case Event(IoFailure(copyCommand: IoCopyCommand, failure), _) => - failAndStop(failure) + when(WaitingForIoResponses) { + case Event(IoSuccess(command: IoCommand[_], _), Some(data)) => + val (newData, commandState) = data.commandComplete(command) + + commandState match { + case StillWaiting => stay() using Option(newData) + case AllCommandsDone => succeedAndStop(newData.returnCode, newData.newJobOutputs, newData.newDetritus) + case NextSubSet(commands) => + commands foreach sendIoCommand + stay() using Option(newData) + } + case Event(IoFailure(command: IoCommand[_], failure), Some(data)) => + // any failure is fatal + context.parent ! JobFailedNonRetryableResponse(jobDescriptor.key, failure, None) + + val (newData, commandState) = data.commandComplete(command) + + commandState match { + // If we're still waiting for some responses, go to failed state + case StillWaiting => goto(FailedState) using Option(newData) + // Otherwise we're done + case _ => + context stop self + stay() + } + // Should not be possible + case Event(IoFailure(_: IoCommand[_], failure), None) => failAndStop(failure) + } + + when(FailedState) { + // At this point success or failure doesn't matter, we've already failed this hit + case Event(response: IoAck[_], Some(data)) => + val (newData, commandState) = data.commandComplete(response.command) + commandState match { + // If we're still waiting for some responses, stay + case StillWaiting => stay() using Option(newData) + // Otherwise we're done + case _ => + context stop self + stay() + } } whenUnhandled { @@ -154,57 +236,79 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit stay() } - private def lookupSourceCallRootPath(sourceJobDetritusFiles: Map[String, String]): Path = { - sourceJobDetritusFiles.get(JobPaths.CallRootPathKey).map(getPath).get recover { - case failure => - throw new RuntimeException(s"${JobPaths.CallRootPathKey} wasn't found for call ${jobDescriptor.call.fullyQualifiedName}", failure) - } get + protected def lookupSourceCallRootPath(sourceJobDetritusFiles: Map[String, String]): Try[Path] = { + sourceJobDetritusFiles.get(JobPaths.CallRootPathKey) match { + case Some(source) => getPath(source) + case None => Failure(new RuntimeException(s"${JobPaths.CallRootPathKey} wasn't found for call ${jobDescriptor.call.fullyQualifiedName}")) + } + } + + private def ioCommandsToCopyPairs(commands: Set[IoCommand[_]]): Set[PathPair] = commands collect { + case copyCommand: IoCopyCommand => copyCommand.source -> copyCommand.destination } /** * Returns a pair of the list of simpletons with copied paths, and copy commands necessary to perform those copies. */ - private def processSimpletons(wdlValueSimpletons: Seq[WdlValueSimpleton], sourceCallRootPath: Path): Try[(CallOutputs, Set[PathPair])] = Try { - val (destinationSimpletons, ioCommands): (List[WdlValueSimpleton], Set[PathPair]) = wdlValueSimpletons.toList.foldMap({ + protected def processSimpletons(wdlValueSimpletons: Seq[WdlValueSimpleton], sourceCallRootPath: Path): Try[(CallOutputs, Set[IoCommand[_]])] = Try { + val (destinationSimpletons, ioCommands): (List[WdlValueSimpleton], Set[IoCommand[_]]) = wdlValueSimpletons.toList.foldMap({ case WdlValueSimpleton(key, wdlFile: WdlFile) => val sourcePath = getPath(wdlFile.value).get val destinationPath = PathCopier.getDestinationFilePath(sourceCallRootPath, sourcePath, destinationCallRootPath) val destinationSimpleton = WdlValueSimpleton(key, WdlFile(destinationPath.pathAsString)) - List(destinationSimpleton) -> Set(sourcePath -> destinationPath) - case nonFileSimpleton => (List(nonFileSimpleton), Set.empty[PathPair]) + List(destinationSimpleton) -> Set(copyCommand(sourcePath, destinationPath, overwrite = true)) + case nonFileSimpleton => (List(nonFileSimpleton), Set.empty[IoCommand[_]]) }) (WdlValueBuilder.toJobOutputs(jobDescriptor.call.task.outputs, destinationSimpletons), ioCommands) } /** - * Returns a pair of the detritus with copied paths, and copy commands necessary to perform those copies. + * Returns the file (and ONLY the file detritus) intersection between the cache hit and this call. */ - private def processDetritus(sourceJobDetritusFiles: Map[String, String]): Try[(Map[String, Path], Set[PathPair])] = Try { + protected final def detritusFileKeys(sourceJobDetritusFiles: Map[String, String]) = { val sourceKeys = sourceJobDetritusFiles.keySet val destinationKeys = destinationJobDetritusPaths.keySet - val fileKeys = sourceKeys.intersect(destinationKeys).filterNot(_ == JobPaths.CallRootPathKey) + sourceKeys.intersect(destinationKeys).filterNot(_ == JobPaths.CallRootPathKey) + } + + /** + * Returns a pair of the detritus with copied paths, and copy commands necessary to perform those copies. + */ + protected def processDetritus(sourceJobDetritusFiles: Map[String, String]): Try[(Map[String, Path], Set[IoCommand[_]])] = Try { + val fileKeys = detritusFileKeys(sourceJobDetritusFiles) - val zero = (Map.empty[String, Path], Set.empty[PathPair]) + val zero = (Map.empty[String, Path], Set.empty[IoCommand[_]]) val (destinationDetritus, ioCommands) = fileKeys.foldLeft(zero)({ case ((detrituses, commands), detritus) => val sourcePath = getPath(sourceJobDetritusFiles(detritus)).get val destinationPath = destinationJobDetritusPaths(detritus) - + val newDetrituses = detrituses + (detritus -> destinationPath) - - (newDetrituses, commands + ((sourcePath, destinationPath))) + + (newDetrituses, commands + copyCommand(sourcePath, destinationPath, overwrite = true)) }) - + (destinationDetritus + (JobPaths.CallRootPathKey -> destinationCallRootPath), ioCommands) } + /** + * Additional IoCommands that will be sent after (and only after) output and detritus commands complete successfully. + * See StandardCacheHitCopyingActorData + */ + protected def additionalIoCommands(sourceCallRootPath: Path, + originalSimpletons: Seq[WdlValueSimpleton], + newOutputs: CallOutputs, + originalDetritus: Map[String, String], + newDetritus: Map[String, Path]): List[Set[IoCommand[_]]] = List.empty + override protected def onTimeout(message: Any, to: ActorRef): Unit = { val exceptionMessage = message match { case copyCommand: IoCopyCommand => s"The Cache hit copying actor timed out waiting for a response to copy ${copyCommand.source.pathAsString} to ${copyCommand.destination.pathAsString}" + case touchCommand: IoTouchCommand => s"The Cache hit copying actor timed out waiting for a response to touch ${touchCommand.file.pathAsString}" case other => s"The Cache hit copying actor timed out waiting for an unknown I/O operation: $other" } diff --git a/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala b/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala index 81cce8d0d..0d398a324 100644 --- a/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala @@ -18,6 +18,7 @@ import wdl4s.values.{WdlInteger, WdlValue} */ object CpuValidation { lazy val instance: RuntimeAttributesValidation[Int] = new CpuValidation + lazy val optional: OptionalRuntimeAttributesValidation[Int] = instance.optional lazy val default: WdlValue = WdlInteger(1) def configDefaultWdlValue(config: Option[Config]): Option[WdlValue] = instance.configDefaultWdlValue(config) } diff --git a/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala b/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala index bc0fb32da..142e541d9 100644 --- a/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala @@ -39,31 +39,31 @@ object MemoryValidation { } private[validation] val wrongAmountFormat = - s"Expecting ${RuntimeAttributesKeys.MemoryKey} runtime attribute value greater than 0 but got %s" + "Expecting %s runtime attribute value greater than 0 but got %s" private[validation] val wrongTypeFormat = - s"Expecting ${RuntimeAttributesKeys.MemoryKey} runtime attribute to be an Integer or String with format '8 GB'." + - s" Exception: %s" + "Expecting %s runtime attribute to be an Integer or String with format '8 GB'." + + " Exception: %s" - private[validation] def validateMemoryString(wdlString: WdlString): ErrorOr[MemorySize] = - validateMemoryString(wdlString.value) + private[validation] def validateMemoryString(attributeName: String, wdlString: WdlString): ErrorOr[MemorySize] = + validateMemoryString(attributeName, wdlString.value) - private[validation] def validateMemoryString(value: String): ErrorOr[MemorySize] = { + private[validation] def validateMemoryString(attributeName: String, value: String): ErrorOr[MemorySize] = { MemorySize.parse(value) match { case scala.util.Success(memorySize: MemorySize) if memorySize.amount > 0 => memorySize.to(MemoryUnit.GB).validNel case scala.util.Success(memorySize: MemorySize) => - wrongAmountFormat.format(memorySize.amount).invalidNel + wrongAmountFormat.format(attributeName, memorySize.amount).invalidNel case scala.util.Failure(throwable) => - wrongTypeFormat.format(throwable.getMessage).invalidNel + wrongTypeFormat.format(attributeName, throwable.getMessage).invalidNel } } - private[validation] def validateMemoryInteger(wdlInteger: WdlInteger): ErrorOr[MemorySize] = - validateMemoryInteger(wdlInteger.value) + private[validation] def validateMemoryInteger(attributeName: String, wdlInteger: WdlInteger): ErrorOr[MemorySize] = + validateMemoryInteger(attributeName, wdlInteger.value) - private[validation] def validateMemoryInteger(value: Int): ErrorOr[MemorySize] = { + private[validation] def validateMemoryInteger(attributeName: String, value: Int): ErrorOr[MemorySize] = { if (value <= 0) - wrongAmountFormat.format(value).invalidNel + wrongAmountFormat.format(attributeName, value).invalidNel else MemorySize(value.toDouble, MemoryUnit.Bytes).to(MemoryUnit.GB).validNel } @@ -78,9 +78,9 @@ class MemoryValidation(attributeName: String = RuntimeAttributesKeys.MemoryKey) override def coercion = Seq(WdlIntegerType, WdlStringType) override protected def validateValue: PartialFunction[WdlValue, ErrorOr[MemorySize]] = { - case WdlInteger(value) => MemoryValidation.validateMemoryInteger(value) - case WdlString(value) => MemoryValidation.validateMemoryString(value) + case WdlInteger(value) => MemoryValidation.validateMemoryInteger(key, value) + case WdlString(value) => MemoryValidation.validateMemoryString(key, value) } - override def missingValueMessage: String = wrongTypeFormat.format("Not supported WDL type value") + override def missingValueMessage: String = wrongTypeFormat.format(key, "Not supported WDL type value") } diff --git a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala index 6635747a1..907cf6ea9 100644 --- a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala @@ -65,12 +65,12 @@ object RuntimeAttributesValidation { } } - def parseMemoryString(s: WdlString): ErrorOr[MemorySize] = { - MemoryValidation.validateMemoryString(s) + def parseMemoryString(k: String, s: WdlString): ErrorOr[MemorySize] = { + MemoryValidation.validateMemoryString(k, s) } - def parseMemoryInteger(i: WdlInteger): ErrorOr[MemorySize] = { - MemoryValidation.validateMemoryInteger(i) + def parseMemoryInteger(k: String, i: WdlInteger): ErrorOr[MemorySize] = { + MemoryValidation.validateMemoryInteger(k, i) } def withDefault[ValidatedType](validation: RuntimeAttributesValidation[ValidatedType], diff --git a/backend/src/test/scala/cromwell/backend/BackendSpec.scala b/backend/src/test/scala/cromwell/backend/BackendSpec.scala index bc0f36a75..0e20c15cd 100644 --- a/backend/src/test/scala/cromwell/backend/BackendSpec.scala +++ b/backend/src/test/scala/cromwell/backend/BackendSpec.scala @@ -23,13 +23,13 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito { executeJobAndAssertOutputs(backend, workflow.expectedResponse) } - def buildWorkflowDescriptor(wdl: WdlSource, + def buildWorkflowDescriptor(workflowSource: WdlSource, inputs: Map[String, WdlValue] = Map.empty, options: WorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue])), runtime: String = "") = { BackendWorkflowDescriptor( WorkflowId.randomId(), - WdlNamespaceWithWorkflow.load(wdl.replaceAll("RUNTIME", runtime), Seq.empty[ImportResolver]).get.workflow, // Get ok, this is a test! + WdlNamespaceWithWorkflow.load(workflowSource.replaceAll("RUNTIME", runtime), Seq.empty[ImportResolver]).get.workflow, // Get ok, this is a test! inputs, options, Labels.empty diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 9ab8cf2c2..5439efbdf 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -504,6 +504,16 @@ backend { # gcs { # # A reference to a potentially different auth for manipulating files via engine functions. # auth = "application-default" + # + # caching { + # # When a cache hit is found, the following duplication strategy will be followed to use the cached outputs + # # Possible values: "copy", "reference". Defaults to "copy" + # # "copy": Copy the output files + # # "reference": DO NOT copy the output files but point to the original output files instead. + # # Will still make sure than all the original output files exist and are accessible before + # # going forward with the cache hit. + # duplication-strategy = "copy" + # } # } # } # @@ -548,10 +558,10 @@ services { # performance but will both lead to a higher memory usage as well as increase the risk that metadata events # might not have been persisted in the event of a Cromwell crash. # - # For normal usage the default value of 1 (effectively no batching) should be fine but for larger/production - # environments we recommend a value of at least 500. There'll be no one size fits all number here so we recommend - # benchmarking performance and tuning the value to match your environment - # db-batch-size = 1 + # For normal usage the default value of 200 should be fine but for larger/production environments we recommend a + # value of at least 500. There'll be no one size fits all number here so we recommend benchmarking performance and + # tuning the value to match your environment. + # db-batch-size = 200 # # Periodically the stored metadata events will be forcibly written to the DB regardless of if the batch size # has been reached. This is to prevent situations where events wind up never being written to an incomplete batch diff --git a/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala b/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala index 1a7843e6d..00afb2b61 100644 --- a/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala +++ b/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala @@ -20,5 +20,9 @@ object WorkflowMetadataKeys { val SubmissionSection_Inputs = "inputs" val SubmissionSection_Options = "options" val SubmissionSection_Imports = "imports" + val SubmissionSection_WorkflowType = "workflowType" + val SubmissionSection_Labels = "labels" + val SubmissionSection_WorkflowTypeVersion = "workflowTypeVersion" + val Labels = "labels" } diff --git a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala index 55fa68b47..5025690af 100644 --- a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala +++ b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala @@ -7,53 +7,52 @@ import wdl4s.{WdlJson, WdlSource} */ sealed trait WorkflowSourceFilesCollection { - def wdlSource: WdlSource + def workflowSource: WdlSource def inputsJson: WdlJson def workflowOptionsJson: WorkflowOptionsJson def labelsJson: WdlJson - + def workflowType: Option[WorkflowType] + def workflowTypeVersion: Option[WorkflowTypeVersion] def importsZipFileOption: Option[Array[Byte]] = this match { case _: WorkflowSourceFilesWithoutImports => None - case WorkflowSourceFilesWithDependenciesZip(_, _, _, _, importsZip) => Option(importsZip) // i.e. Some(importsZip) if our wiring is correct + case w: WorkflowSourceFilesWithDependenciesZip => Option(w.importsZip) // i.e. Some(importsZip) if our wiring is correct } def copyOptions(workflowOptions: WorkflowOptionsJson) = this match { - case w: WorkflowSourceFilesWithoutImports => WorkflowSourceFilesWithoutImports( - wdlSource = w.wdlSource, - inputsJson = w.inputsJson, - workflowOptionsJson = workflowOptions, - labelsJson = w.labelsJson) - - case w: WorkflowSourceFilesWithDependenciesZip => WorkflowSourceFilesWithDependenciesZip( - wdlSource = w.wdlSource, - inputsJson = w.inputsJson, - workflowOptionsJson = workflowOptions, - labelsJson = w.labelsJson, - importsZip = w.importsZip) + case w: WorkflowSourceFilesWithoutImports => w.copy(workflowOptionsJson = workflowOptions) + case w: WorkflowSourceFilesWithDependenciesZip => w.copy(workflowOptionsJson = workflowOptions) } } object WorkflowSourceFilesCollection { - def apply(wdlSource: WdlSource, + def apply(workflowSource: WdlSource, + workflowType: Option[WorkflowType], + workflowTypeVersion: Option[WorkflowTypeVersion], inputsJson: WdlJson, workflowOptionsJson: WorkflowOptionsJson, labelsJson: WdlJson, importsFile: Option[Array[Byte]]): WorkflowSourceFilesCollection = importsFile match { - case Some(imports) => WorkflowSourceFilesWithDependenciesZip(wdlSource, inputsJson, workflowOptionsJson, labelsJson, imports) - case None => WorkflowSourceFilesWithoutImports(wdlSource, inputsJson, workflowOptionsJson, labelsJson) + case Some(imports) => + WorkflowSourceFilesWithDependenciesZip(workflowSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson, imports) + case None => + WorkflowSourceFilesWithoutImports(workflowSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson) } } -final case class WorkflowSourceFilesWithoutImports(wdlSource: WdlSource, +final case class WorkflowSourceFilesWithoutImports(workflowSource: WdlSource, + workflowType: Option[WorkflowType], + workflowTypeVersion: Option[WorkflowTypeVersion], inputsJson: WdlJson, workflowOptionsJson: WorkflowOptionsJson, labelsJson: WdlJson) extends WorkflowSourceFilesCollection -final case class WorkflowSourceFilesWithDependenciesZip(wdlSource: WdlSource, +final case class WorkflowSourceFilesWithDependenciesZip(workflowSource: WdlSource, + workflowType: Option[WorkflowType], + workflowTypeVersion: Option[WorkflowTypeVersion], inputsJson: WdlJson, workflowOptionsJson: WorkflowOptionsJson, labelsJson: WdlJson, importsZip: Array[Byte]) extends WorkflowSourceFilesCollection { - override def toString = s"WorkflowSourceFilesWithDependenciesZip($wdlSource, $inputsJson, $workflowOptionsJson, $labelsJson, <>)" + override def toString = s"WorkflowSourceFilesWithDependenciesZip($workflowSource, $inputsJson, $workflowOptionsJson, $labelsJson, <>)" } diff --git a/core/src/main/scala/cromwell/core/callcaching/HashResultMessage.scala b/core/src/main/scala/cromwell/core/callcaching/HashResultMessage.scala index 0867d916f..9233061cd 100644 --- a/core/src/main/scala/cromwell/core/callcaching/HashResultMessage.scala +++ b/core/src/main/scala/cromwell/core/callcaching/HashResultMessage.scala @@ -1,6 +1,14 @@ package cromwell.core.callcaching -case class HashKey(key: String, checkForHitOrMiss: Boolean = true) + +object HashKey { + def apply(keyComponents: String*) = new HashKey(true, keyComponents.toList) + def apply(checkForHitOrMiss: Boolean, keyComponents: String*) = new HashKey(checkForHitOrMiss, keyComponents.toList) +} + +case class HashKey(checkForHitOrMiss: Boolean, keyComponents: List[String]) { + val key = keyComponents.mkString(": ") +} case class HashValue(value: String) case class HashResult(hashKey: HashKey, hashValue: HashValue) diff --git a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala new file mode 100644 index 000000000..a05961b4a --- /dev/null +++ b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala @@ -0,0 +1,25 @@ +package cromwell.core.io + +import better.files.File.OpenOptions +import cromwell.core.path.Path + +object DefaultIoCommand { + case class DefaultIoCopyCommand(override val source: Path, + override val destination: Path, + override val overwrite: Boolean) extends IoCopyCommand( + source, destination, overwrite + ) + case class DefaultIoContentAsStringCommand(override val file: Path) extends IoContentAsStringCommand(file) + case class DefaultIoSizeCommand(override val file: Path) extends IoSizeCommand(file) + case class DefaultIoWriteCommand(override val file: Path, + override val content: String, + override val openOptions: OpenOptions) extends IoWriteCommand( + file, content, openOptions + ) + case class DefaultIoDeleteCommand(override val file: Path, + override val swallowIOExceptions: Boolean) extends IoDeleteCommand( + file, swallowIOExceptions + ) + case class DefaultIoHashCommand(override val file: Path) extends IoHashCommand(file) + case class DefaultIoTouchCommand(override val file: Path) extends IoTouchCommand(file) +} diff --git a/core/src/main/scala/cromwell/core/io/IoClientHelper.scala b/core/src/main/scala/cromwell/core/io/IoClientHelper.scala index 01cdabf2a..b6f13b50e 100644 --- a/core/src/main/scala/cromwell/core/io/IoClientHelper.scala +++ b/core/src/main/scala/cromwell/core/io/IoClientHelper.scala @@ -19,7 +19,11 @@ trait IoClientHelper extends RobustClientHelper { this: Actor with ActorLogging def ioReceive = robustReceive orElse ioResponseReceive - def sendIoCommand(ioCommand: IoCommand[_], timeout: FiniteDuration = RobustClientHelper.DefaultRequestLostTimeout) = { + def sendIoCommand(ioCommand: IoCommand[_]) = { + sendIoCommandWithCustomTimeout(ioCommand, RobustClientHelper.DefaultRequestLostTimeout) + } + + def sendIoCommandWithCustomTimeout(ioCommand: IoCommand[_], timeout: FiniteDuration) = { robustSend(ioCommand, ioActor, timeout) } diff --git a/core/src/main/scala/cromwell/core/io/IoCommand.scala b/core/src/main/scala/cromwell/core/io/IoCommand.scala index 6244bb79a..fcc2d72a3 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommand.scala @@ -39,21 +39,21 @@ trait IoCommand[+T] { * Copy source -> destination * Will create the destination directory if it doesn't exist. */ -class IoCopyCommand(val source: Path, val destination: Path, val overwrite: Boolean) extends IoCommand[Unit] { +abstract class IoCopyCommand(val source: Path, val destination: Path, val overwrite: Boolean) extends IoCommand[Unit] { override def toString = s"copy ${source.pathAsString} to ${destination.pathAsString} with overwrite = $overwrite" } /** * Read file as a string (load the entire content in memory) */ -class IoContentAsStringCommand(val file: Path) extends IoCommand[String] { +abstract class IoContentAsStringCommand(val file: Path) extends IoCommand[String] { override def toString = s"read content of ${file.pathAsString}" } /** * Return the size of file */ -class IoSizeCommand(val file: Path) extends IoCommand[Long] { +abstract class IoSizeCommand(val file: Path) extends IoCommand[Long] { override def toString = s"get size of ${file.pathAsString}" } @@ -61,20 +61,27 @@ class IoSizeCommand(val file: Path) extends IoCommand[Long] { * Write content in file * Will create the destination directory if it doesn't exist. */ -class IoWriteCommand(val file: Path, val content: String, val openOptions: OpenOptions) extends IoCommand[Unit] { +abstract class IoWriteCommand(val file: Path, val content: String, val openOptions: OpenOptions) extends IoCommand[Unit] { override def toString = s"write to ${file.pathAsString}" } /** * Delete file */ -class IoDeleteCommand(val file: Path, val swallowIOExceptions: Boolean) extends IoCommand[Unit] { +abstract class IoDeleteCommand(val file: Path, val swallowIOExceptions: Boolean) extends IoCommand[Unit] { override def toString = s"delete ${file.pathAsString}" } /** * Get Hash value for file */ -class IoHashCommand(val file: Path) extends IoCommand[String] { +abstract class IoHashCommand(val file: Path) extends IoCommand[String] { override def toString = s"get hash of ${file.pathAsString}" } + +/** + * Touch a file + */ +abstract class IoTouchCommand(val file: Path) extends IoCommand[Unit] { + override def toString = s"touch ${file.pathAsString}" +} diff --git a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala index d79e67438..ad26449b4 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala @@ -1,5 +1,6 @@ package cromwell.core.io +import cromwell.core.io.DefaultIoCommand._ import cromwell.core.path.BetterFileMethods.OpenOptions import cromwell.core.path.Path @@ -10,13 +11,15 @@ trait IoCommandBuilder { def deleteCommand(path: Path, swallowIoExceptions: Boolean): IoDeleteCommand def copyCommand(src: Path, dest: Path, overwrite: Boolean): IoCopyCommand def hashCommand(file: Path): IoHashCommand + def touchCommand(file: Path): IoTouchCommand } trait DefaultIoCommandBuilder extends IoCommandBuilder { - def contentAsStringCommand(path: Path) = new IoContentAsStringCommand(path) - def writeCommand(path: Path, content: String, options: OpenOptions) = new IoWriteCommand(path, content, options) - def sizeCommand(path: Path) = new IoSizeCommand(path) - def deleteCommand(path: Path, swallowIoExceptions: Boolean) = new IoDeleteCommand(path, swallowIoExceptions) - def copyCommand(src: Path, dest: Path, overwrite: Boolean) = new IoCopyCommand(src, dest, overwrite) - def hashCommand(file: Path) = new IoHashCommand(file) + def contentAsStringCommand(path: Path): IoContentAsStringCommand = DefaultIoContentAsStringCommand(path) + def writeCommand(path: Path, content: String, options: OpenOptions): IoWriteCommand = DefaultIoWriteCommand(path, content, options) + def sizeCommand(path: Path): IoSizeCommand = DefaultIoSizeCommand(path) + def deleteCommand(path: Path, swallowIoExceptions: Boolean): IoDeleteCommand = DefaultIoDeleteCommand(path, swallowIoExceptions) + def copyCommand(src: Path, dest: Path, overwrite: Boolean): IoCopyCommand = DefaultIoCopyCommand(src, dest, overwrite) + def hashCommand(file: Path): IoHashCommand = DefaultIoHashCommand(file) + def touchCommand(file: Path): IoTouchCommand = DefaultIoTouchCommand(file) } diff --git a/core/src/main/scala/cromwell/core/labels/Label.scala b/core/src/main/scala/cromwell/core/labels/Label.scala index 4fc1e2959..cee9ac91a 100644 --- a/core/src/main/scala/cromwell/core/labels/Label.scala +++ b/core/src/main/scala/cromwell/core/labels/Label.scala @@ -5,69 +5,40 @@ import cats.data.Validated._ import cats.syntax.cartesian._ import cats.syntax.validated._ +import scala.util.matching.Regex + sealed abstract case class Label(key: String, value: String) object Label { - // Yes, 63. Not a typo for 64. - // See 'labels' in https://cloud.google.com/genomics/reference/rpc/google.genomics.v1alpha2#google.genomics.v1alpha2.RunPipelineArgs - private val MaxLabelLength = 63 - val LabelRegexPattern = "[a-z]([-a-z0-9]*[a-z0-9])?" + val MaxLabelLength = 63 + val LabelKeyRegex = "[a-z]([-a-z0-9]*[a-z0-9])?" + val LabelValueRegex = "([a-z0-9]*[-a-z0-9]*[a-z0-9])?" - def validateName(s: String): ErrorOr[String] = { - if (LabelRegexPattern.r.pattern.matcher(s).matches) { - if (s.length <= MaxLabelLength) s.validNel else s"Invalid label: $s was ${s.length} characters. The maximum is $MaxLabelLength".invalidNel - } else { - s"Invalid label: $s did not match the regex $LabelRegexPattern".invalidNel - } - } + val LabelExpectationsMessage = + s"A Label key must match the pattern `$LabelKeyRegex` and a label value must match the pattern `$LabelValueRegex`." - def validateLabel(key: String, value: String): ErrorOr[Label] = { - val validatedKey = validateName(key) - val validatedValue = validateName(value) - - (validatedKey |@| validatedValue) map { case (k, v) => new Label(k, v) {} } + def validateLabelRegex(s: String, regexAllowed: Regex): ErrorOr[String] = { + (regexAllowed.pattern.matcher(s).matches, s.length <= MaxLabelLength) match { + case (true, true) => s.validNel + case (false, false) => s"Invalid label: `$s` did not match regex $regexAllowed and it is ${s.length} characters. The maximum is $MaxLabelLength.".invalidNel + case (false, _) => s"Invalid label: `$s` did not match the regex $regexAllowed.".invalidNel + case (_, false) => s"Invalid label: `$s` is ${s.length} characters. The maximum is $MaxLabelLength.".invalidNel + } } - /** - * Change to meet the constraint: - * - To match the regex LabelRegexPattern - * - To be between 1 and MaxLabelLength characters total - */ - def safeName(mainText: String): String = { + def validateLabelKey(key: String): ErrorOr[String] = validateLabelRegex(key, LabelKeyRegex.r) - validateName(mainText) match { - case Valid(labelText) => labelText - case _ => - def appendSafe(current: String, nextChar: Char): String = { - nextChar match { - case c if c.isLetterOrDigit || c == '-' => current + c.toLower - case _ => current + '-' - } - } + def validateLabelValue(key: String): ErrorOr[String] = validateLabelRegex(key, LabelValueRegex.r) - val foldResult = mainText.toCharArray.foldLeft("")(appendSafe) - - val startsValid = foldResult.headOption.exists(_.isLetter) - val endsValid = foldResult.lastOption.exists(_.isLetterOrDigit) - - val validStart = if (startsValid) foldResult else "x--" + foldResult - val validStartAndEnd = if (endsValid) validStart else validStart + "--x" - - val length = validStartAndEnd.length - val tooLong = length > MaxLabelLength + def validateLabel(key: String, value: String): ErrorOr[Label] = { + val validatedKey = validateLabelKey(key) + val validatedValue = validateLabelValue(value) - if (tooLong) { - val middleSeparator = "---" - val subSectionLength = (MaxLabelLength - middleSeparator.length) / 2 - validStartAndEnd.substring(0, subSectionLength) + middleSeparator + validStartAndEnd.substring(length - subSectionLength, length) - } else { - validStartAndEnd - } - } + (validatedKey |@| validatedValue) map { case (k, v) => new Label(k, v) {} } } - def safeLabel(key: String, value: String): Label = { - new Label(safeName(key), safeName(value)) {} + def apply(key: String, value: String) = { + new Label(key, value) {} } } diff --git a/core/src/main/scala/cromwell/core/labels/Labels.scala b/core/src/main/scala/cromwell/core/labels/Labels.scala index 8e891fb51..e5fe3f575 100644 --- a/core/src/main/scala/cromwell/core/labels/Labels.scala +++ b/core/src/main/scala/cromwell/core/labels/Labels.scala @@ -1,18 +1,31 @@ package cromwell.core.labels +import cats.data.Validated._ +import cats.instances.vector._ +import cats.syntax.traverse._ +import lenthall.validation.ErrorOr +import lenthall.validation.ErrorOr.ErrorOr + import scala.collection.JavaConverters._ case class Labels(value: Vector[Label]) { - def asJesLabels = (value map { label => label.key -> label.value }).toMap.asJava + def asTuple: Vector[(String, String)] = value.map(label => label.key -> label.value) + + def asMap: Map[String, String] = asTuple.toMap + + def asJavaMap = asMap.asJava def ++(that: Labels) = Labels(value ++ that.value) } object Labels { def apply(values: (String, String)*): Labels = { - val kvps: Seq[(String, String)] = values.toSeq - Labels((kvps map { case (k, v) => Label.safeLabel(k, v) }).to[Vector]) + Labels(values.toVector map (Label.apply _).tupled) + } + + def validateMapOfLabels(labels: Map[String, String]): ErrorOr[Labels] = { + labels.toVector traverse { Label.validateLabel _ }.tupled map Labels.apply } def empty = Labels(Vector.empty) diff --git a/core/src/main/scala/cromwell/core/package.scala b/core/src/main/scala/cromwell/core/package.scala index def878003..625fab0d5 100644 --- a/core/src/main/scala/cromwell/core/package.scala +++ b/core/src/main/scala/cromwell/core/package.scala @@ -1,13 +1,34 @@ package cromwell +import cats.data.Validated._ +import cats.syntax.validated._ +import lenthall.validation.ErrorOr.ErrorOr import wdl4s.values.WdlValue +import scala.util.{Failure, Success, Try} + package object core { type LocallyQualifiedName = String type FullyQualifiedName = String type WorkflowOutputs = Map[FullyQualifiedName, JobOutput] type WorkflowOptionsJson = String + type WorkflowType = String + type WorkflowTypeVersion = String type CallOutputs = Map[LocallyQualifiedName, JobOutput] type HostInputs = Map[String, WdlValue] type EvaluatedRuntimeAttributes = Map[String, WdlValue] + + implicit class toErrorOr[A](val trySomething: Try[A]) { + def tryToErrorOr: ErrorOr[A] = trySomething match { + case Success(options) => options.validNel + case Failure(err) => err.getMessage.invalidNel + } + } + + implicit class toTry[A](val validatedSomething: ErrorOr[A]) { + def errorOrToTry: Try[A] = validatedSomething match { + case Valid(options) => Success(options) + case Invalid(err) => Failure(new RuntimeException(s"Error(s): ${err.toList.mkString(",")}")) + } + } } diff --git a/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala b/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala index 5339fae3c..234a19d79 100644 --- a/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala +++ b/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala @@ -3,6 +3,8 @@ package cromwell.core.path import akka.actor.ActorSystem import cromwell.core.WorkflowOptions +import scala.concurrent.{ExecutionContext, Future} + case object DefaultPathBuilderFactory extends PathBuilderFactory { - override def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem) = DefaultPathBuilder + override def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem, ec: ExecutionContext) = Future.successful(DefaultPathBuilder) } diff --git a/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala b/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala index 7ee20eb2d..63a91e02b 100644 --- a/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala +++ b/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala @@ -3,9 +3,11 @@ package cromwell.core.path import akka.actor.ActorSystem import cromwell.core.WorkflowOptions +import scala.concurrent.{ExecutionContext, Future} + /** * Provide a method that can instantiate a path builder with the specified workflow options. */ trait PathBuilderFactory { - def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem): PathBuilder + def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[PathBuilder] } diff --git a/core/src/test/scala/cromwell/core/callcaching/HashKeySpec.scala b/core/src/test/scala/cromwell/core/callcaching/HashKeySpec.scala new file mode 100644 index 000000000..a25b311a6 --- /dev/null +++ b/core/src/test/scala/cromwell/core/callcaching/HashKeySpec.scala @@ -0,0 +1,35 @@ +package cromwell.core.callcaching + +import org.scalatest.{FlatSpec, Matchers} + +class HashKeySpec extends FlatSpec with Matchers { + + "HashKey" should "produce consistent key value" in { + val keys = Set( + HashKey("command template"), + HashKey("backend name"), + HashKey("input count"), + HashKey("output count"), + HashKey("runtime attribute", "failOnStderr"), + HashKey(checkForHitOrMiss = false, "runtime attribute", "cpu"), + HashKey("runtime attribute", "continueOnReturnCode"), + HashKey("input", "String stringInput"), + HashKey("output", "String myOutput"), + HashKey("runtime attribute", "docker") + ) + + keys map { _.key } should contain theSameElementsAs Set( + "command template", + "backend name", + "input count", + "output count", + "runtime attribute: failOnStderr", + "runtime attribute: cpu", + "runtime attribute: continueOnReturnCode", + "input: String stringInput", + "output: String myOutput", + "runtime attribute: docker" + ) + } + +} diff --git a/core/src/test/scala/cromwell/core/io/IoClientHelperSpec.scala b/core/src/test/scala/cromwell/core/io/IoClientHelperSpec.scala index 6f92f3003..1411a47c6 100644 --- a/core/src/test/scala/cromwell/core/io/IoClientHelperSpec.scala +++ b/core/src/test/scala/cromwell/core/io/IoClientHelperSpec.scala @@ -3,6 +3,7 @@ package cromwell.core.io import akka.actor.{Actor, ActorLogging, ActorRef} import akka.testkit.{TestActorRef, TestProbe} import cromwell.core.TestKitSuite +import cromwell.core.io.DefaultIoCommand.DefaultIoSizeCommand import cromwell.core.path.Path import org.scalatest.mockito.MockitoSugar import org.scalatest.{FlatSpecLike, Matchers} @@ -22,7 +23,7 @@ class IoClientHelperSpec extends TestKitSuite with FlatSpecLike with Matchers wi val testActor = TestActorRef(new IoClientHelperTestActor(ioActorProbe.ref, delegateProbe.ref, backpressureTimeout, noResponseTimeout)) - val command = new IoSizeCommand(mock[Path]) + val command = DefaultIoSizeCommand(mock[Path]) val response = IoSuccess(command, 5) // Send the command @@ -53,7 +54,7 @@ class IoClientHelperSpec extends TestKitSuite with FlatSpecLike with Matchers wi val testActor = TestActorRef(new IoClientHelperTestActor(ioActorProbe.ref, delegateProbe.ref, backpressureTimeout, noResponseTimeout)) val commandContext = "context" - val command = new IoSizeCommand(mock[Path]) + val command = DefaultIoSizeCommand(mock[Path]) val response = IoSuccess(command, 5) // Send the command @@ -91,7 +92,7 @@ class IoClientHelperSpec extends TestKitSuite with FlatSpecLike with Matchers wi } def sendMessage(command: IoCommand[_]) = { - sendIoCommand(command, noResponseTimeout) + sendIoCommandWithCustomTimeout(command, noResponseTimeout) } def sendMessageWithContext(context: Any, command: IoCommand[_]) = { diff --git a/core/src/test/scala/cromwell/core/labels/LabelSpec.scala b/core/src/test/scala/cromwell/core/labels/LabelSpec.scala index 47060b6d6..52c331bec 100644 --- a/core/src/test/scala/cromwell/core/labels/LabelSpec.scala +++ b/core/src/test/scala/cromwell/core/labels/LabelSpec.scala @@ -10,40 +10,44 @@ class LabelSpec extends FlatSpec with Matchers { /** * In the format 'to validate', 'expected result' */ - val goodLabelStrings = List( + val goodLabelKeys = List( "cromwell-root-workflow-id", "cromwell-11f2468c-39d6-4be3-85c8-32735c01e66b", "just-the-right-length-just-the-right-length-just-the-right-leng" ) - val badLabelConversions = List( - "11f2468c-39d6-4be3-85c8-32735c01e66b" -> "x--11f2468c-39d6-4be3-85c8-32735c01e66b", - "0-cromwell-root-workflow-id" -> "x--0-cromwell-root-workflow-id", - "" -> "x----x", - "cromwell-root-workflow-id-" -> "cromwell-root-workflow-id---x", - "0-cromwell-root-workflow-id-" -> "x--0-cromwell-root-workflow-id---x", - "Cromwell-root-workflow-id" -> "cromwell-root-workflow-id", - "cromwell_root_workflow_id" -> "cromwell-root-workflow-id", - "too-long-too-long-too-long-too-long-too-long-too-long-too-long-t" -> "too-long-too-long-too-long-too---g-too-long-too-long-too-long-t", - "0-too-long-and-invalid-too-long-and-invalid-too-long-and-invali+" -> "x--0-too-long-and-invalid-too----nvalid-too-long-and-invali---x" + val goodLabelValues = List( + "11f2468c-39d6-4be3-85c8-32735c01e66b", + "" ) - goodLabelStrings foreach { label => - it should s"validate the good label string '$label'" in { - Label.validateName(label) should be(Valid(label)) + val badLabelKeys = List( + "11f2468c-39d6-4be3-85c8-32735c01e66b", + "0-cromwell-root-workflow-id", + "", + "cromwell-root-workflow-id-", + "0-cromwell-root-workflow-id-", + "Cromwell-root-workflow-id" + ) + + goodLabelKeys foreach { key => + it should s"validate a good label key '$key'" in { + Label.validateLabelKey(key) should be(Valid(key)) } } - badLabelConversions foreach { case (label: String, conversion: String) => - it should s"not validate the bad label string '$label'" in { - Label.validateName(label) match { - case Invalid(_) => // Good! - case Valid(_) => fail(s"Label validation succeeded but should have failed.") - } + goodLabelValues foreach { value => + it should s"validate a good label value '$value'" in { + Label.validateLabelValue(value) should be(Valid(value)) } + } - it should s"convert the bad label string '$label' into the safe label string '$conversion'" in { - Label.safeName(label) should be(conversion) + badLabelKeys foreach { key => + it should s"not validate a bad label key $key" in { + Label.validateLabelKey(key) match { + case Invalid(_) => // Good! + case Valid(_) => fail(s"Label key validation succeeded but should have failed.") + } } } } diff --git a/core/src/test/scala/cromwell/util/SampleWdl.scala b/core/src/test/scala/cromwell/util/SampleWdl.scala index 5a3940b81..06bc98283 100644 --- a/core/src/test/scala/cromwell/util/SampleWdl.scala +++ b/core/src/test/scala/cromwell/util/SampleWdl.scala @@ -13,9 +13,21 @@ import wdl4s.values._ import scala.language.postfixOps trait SampleWdl extends TestFileUtil { - def wdlSource(runtime: String = ""): WdlSource - def asWorkflowSources(runtime: String = "", workflowOptions: String = "{}", labels: String = "{}") = - WorkflowSourceFilesWithoutImports(wdlSource = wdlSource(runtime), inputsJson = wdlJson, workflowOptionsJson = workflowOptions, labelsJson = labels) + def workflowSource(runtime: String = ""): WdlSource + def asWorkflowSources(runtime: String = "", + workflowOptions: String = "{}", + labels: String = "{}", + workflowType: Option[String] = Option("WDL"), + workflowTypeVersion: Option[String] = None) = { + WorkflowSourceFilesWithoutImports( + workflowSource = workflowSource(runtime), + inputsJson = wdlJson, + workflowOptionsJson = workflowOptions, + labelsJson = labels, + workflowType = workflowType, + workflowTypeVersion = workflowTypeVersion) + } + val rawInputs: WorkflowRawInputs def name = getClass.getSimpleName.stripSuffix("$") @@ -62,7 +74,7 @@ trait SampleWdl extends TestFileUtil { object SampleWdl { object HelloWorld extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" |task hello { | String addressee @@ -87,7 +99,7 @@ object SampleWdl { } object HelloWorldWithoutWorkflow extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" |task hello { | String addressee @@ -107,7 +119,7 @@ object SampleWdl { } object GoodbyeWorld extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = """ |task goodbye { | command { @@ -128,7 +140,7 @@ object SampleWdl { } object EmptyString extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" |task hello { | command { @@ -170,13 +182,13 @@ object SampleWdl { object EmptyWorkflow extends SampleWdl { - override def wdlSource(runtime: String = "") = "workflow empty_workflow {}" + override def workflowSource(runtime: String = "") = "workflow empty_workflow {}" val rawInputs = Map.empty[String, Any] } object CoercionNotDefined extends SampleWdl { - override def wdlSource(runtime: String = "") = { + override def workflowSource(runtime: String = "") = { s""" |task summary { | String bfile @@ -207,7 +219,7 @@ object SampleWdl { } trait ThreeStepTemplate extends SampleWdl { - override def wdlSource(runtime: String = "") = sourceString().replaceAll("RUNTIME", runtime) + override def workflowSource(runtime: String = "") = sourceString().replaceAll("RUNTIME", runtime) private val outputSectionPlaceholder = "OUTPUTSECTIONPLACEHOLDER" def sourceString(outputsSection: String = "") = { val withPlaceholders = @@ -268,7 +280,7 @@ object SampleWdl { object ThreeStep extends ThreeStepTemplate object ThreeStepWithOutputsSection extends ThreeStepTemplate { - override def wdlSource(runtime: String = "") = sourceString(outputsSection = + override def workflowSource(runtime: String = "") = sourceString(outputsSection = """ |output { | cgrep.count @@ -278,7 +290,7 @@ object SampleWdl { } object ThreeStepWithInputsInTheOutputsSection extends ThreeStepTemplate { - override def wdlSource(runtime: String = "") = sourceString(outputsSection = + override def workflowSource(runtime: String = "") = sourceString(outputsSection = """ |output { | cgrep.pattern @@ -293,7 +305,7 @@ object SampleWdl { object WorkflowOutputsWithFiles extends SampleWdl { // ASCII art from http://www.chris.com/ascii/joan/www.geocities.com/SoHo/7373/flag.html with pipes // replaced by exclamation points to keep stripMargin from removing the flagpole. - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = """ task A { command { @@ -360,7 +372,7 @@ object SampleWdl { } object WorkflowScatterOutputsWithFileArrays extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = """ |task A { | command { @@ -387,7 +399,7 @@ object SampleWdl { object DeclarationsWorkflow extends SampleWdl { - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s""" |task cat { | File file @@ -441,7 +453,7 @@ object SampleWdl { } trait ZeroOrMorePostfixQuantifier extends SampleWdl { - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s""" |task hello { | Array[String] person @@ -472,7 +484,7 @@ object SampleWdl { } trait OneOrMorePostfixQuantifier extends SampleWdl { - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s""" |task hello { | Array[String]+ person @@ -499,7 +511,7 @@ object SampleWdl { } object CurrentDirectory extends SampleWdl { - override def wdlSource(runtime: String): String = + override def workflowSource(runtime: String): String = """ |task whereami { | command { @@ -520,7 +532,7 @@ object SampleWdl { } object ArrayIO extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" |task serialize { | Array[String] strs @@ -547,7 +559,7 @@ object SampleWdl { createFileArray(catRootDir) def cleanup() = cleanupFileArray(catRootDir) - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" |task cat { | Array[File]+ files @@ -572,7 +584,7 @@ object SampleWdl { createFileArray(catRootDir) def cleanup() = cleanupFileArray(catRootDir) - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" |task write_map { | Map[File, String] file_to_name @@ -661,7 +673,7 @@ object SampleWdl { |} """.stripMargin - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""$tasks | |workflow w { @@ -679,7 +691,7 @@ object SampleWdl { } object SiblingsScatterWdl extends ScatterWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""$tasks | |workflow w { @@ -700,7 +712,7 @@ object SampleWdl { } object SimpleScatterWdl extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""task echo_int { | Int int | command {echo $${int}} @@ -723,7 +735,7 @@ object SampleWdl { } object SimpleScatterWdlWithOutputs extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""task echo_int { | Int int | command {echo $${int}} @@ -748,7 +760,7 @@ object SampleWdl { } case class PrepareScatterGatherWdl(salt: String = UUID.randomUUID().toString) extends SampleWdl { - override def wdlSource(runtime: String = "") = { + override def workflowSource(runtime: String = "") = { s""" |# |# Goal here is to split up the input file into files of 1 line each (in the prepare) then in parallel call wc -w on each newly created file and count the words into another file then in the gather, sum the results of each parallel call to come up with @@ -816,7 +828,7 @@ object SampleWdl { } object FileClobber extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""task read_line { | File in | command { cat $${in} } @@ -841,7 +853,7 @@ object SampleWdl { } object FilePassingWorkflow extends SampleWdl { - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s"""task a { | File in | String out_name = "out" @@ -881,7 +893,7 @@ object SampleWdl { * different */ case class CallCachingWorkflow(salt: String) extends SampleWdl { - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s"""task a { | File in | String out_name = "out" @@ -933,7 +945,7 @@ object SampleWdl { |k3\tv3 """.stripMargin.trim - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s""" |task a { | Array[String] array @@ -969,7 +981,7 @@ object SampleWdl { } object ArrayOfArrays extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""task subtask { | Array[File] a | command { @@ -1009,7 +1021,7 @@ object SampleWdl { } object CallCachingHashingWdl extends SampleWdl { - override def wdlSource(runtime: String): WdlSource = + override def workflowSource(runtime: String): WdlSource = s"""task t { | Int a | Float b @@ -1047,7 +1059,7 @@ object SampleWdl { } object ExpressionsInInputs extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s"""task echo { | String inString | command { @@ -1077,7 +1089,7 @@ object SampleWdl { } object WorkflowFailSlow extends SampleWdl { - override def wdlSource(runtime: String = "") = + override def workflowSource(runtime: String = "") = s""" task shouldCompleteFast { | Int a diff --git a/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala b/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala index c2a0c536b..b8bbb4dd8 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala @@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try} class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit actorSystem: ActorSystem, materializer: ActorMaterializer) { + lazy val engineEndpoint = s"$cromwellUrl/api/engine/$apiVersion" lazy val submitEndpoint = s"$cromwellUrl/api/workflows/$apiVersion" // Everything else is a suffix off the submit endpoint: lazy val batchSubmitEndpoint = s"$submitEndpoint/batch" @@ -29,21 +30,25 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto def statusEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "status") def metadataEndpoint(workflowId: WorkflowId): String = workflowSpecificEndpoint(workflowId, "metadata") lazy val backendsEndpoint = s"$submitEndpoint/backends" + lazy val versionEndpoint = s"$engineEndpoint/version" import model.CromwellStatusJsonSupport._ import model.CromwellBackendsJsonSupport._ + import model.CromwellVersionJsonSupport._ private def requestEntityForSubmit(workflowSubmission: WorkflowSubmission) = { import cromwell.api.model.LabelsJsonFormatter._ val sourceBodyParts = Map( - "wdlSource" -> Option(workflowSubmission.wdl), + "workflowSource" -> Option(workflowSubmission.wdl), + "workflowType" -> workflowSubmission.workflowType, + "workflowTypeVersion" -> workflowSubmission.workflowTypeVersion, "workflowInputs" -> workflowSubmission.inputsJson, "workflowOptions" -> insertSecrets(workflowSubmission.options, workflowSubmission.refreshToken), "customLabels" -> Option(workflowSubmission.customLabels.toJson.toString) ) collect { case (name, Some(source: String)) => Multipart.FormData.BodyPart(name, HttpEntity(MediaTypes.`application/json`, ByteString(source))) } val zipBodyParts = Map( - "wdlDependencies" -> workflowSubmission.zippedImports + "workflowDependencies" -> workflowSubmission.zippedImports ) collect { case (name, Some(file)) => Multipart.FormData.BodyPart.fromPath(name, MediaTypes.`application/zip`, file.path) } val multipartFormData = Multipart.FormData((sourceBodyParts ++ zipBodyParts).toSeq : _*) @@ -64,7 +69,15 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto val requestEntity = requestEntityForSubmit(workflow) // Make a set of submissions that represent the batch (so we can zip with the results later): - val submissionSet = workflow.inputsBatch.map(inputs => WorkflowSingleSubmission(workflow.wdl, Option(inputs), workflow.options, workflow.customLabels, workflow.zippedImports, workflow.refreshToken)) + val submissionSet = workflow.inputsBatch.map(inputs => WorkflowSingleSubmission( + wdl = workflow.wdl, + workflowType = workflow.workflowType, + workflowTypeVersion = workflow.workflowTypeVersion, + inputsJson = Option(inputs), + options = workflow.options, + customLabels = workflow.customLabels, + zippedImports = workflow.zippedImports, + refreshToken = workflow.refreshToken)) makeRequest[List[CromwellStatus]](HttpRequest(HttpMethods.POST, batchSubmitEndpoint, List.empty[HttpHeader], requestEntity)) map { statuses => val zipped = submissionSet.zip(statuses) @@ -78,6 +91,7 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto def status(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStatus] = getRequest[CromwellStatus](statusEndpoint(workflowId)) map WorkflowStatus.apply def metadata(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowMetadata] = getRequest[String](metadataEndpoint(workflowId)) map WorkflowMetadata def backends(implicit ec: ExecutionContext): Future[CromwellBackends] = getRequest[CromwellBackends](backendsEndpoint) + def version(implicit ec: ExecutionContext): Future[CromwellVersion] = getRequest[CromwellVersion](versionEndpoint) /** * diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellVersion.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellVersion.scala new file mode 100644 index 000000000..d6c71a065 --- /dev/null +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/CromwellVersion.scala @@ -0,0 +1,9 @@ +package cromwell.api.model + +import spray.json.DefaultJsonProtocol + +object CromwellVersionJsonSupport extends DefaultJsonProtocol { + implicit val CromwellVersionFormat = jsonFormat1(CromwellVersion) +} + +case class CromwellVersion(cromwell: String) diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala index 7cb72dd94..fb5e97669 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala @@ -5,9 +5,10 @@ import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFor object LabelsJsonFormatter extends DefaultJsonProtocol { implicit object LabelJsonFormat extends RootJsonFormat[List[Label]] { def write(l: List[Label]) = JsObject(l map { label => label.key -> JsString(label.value)} :_* ) - def read(value: JsValue) = value match { - case JsObject(x) => x map { case (k, JsString(v)) => Label(k, v) } toList - } + def read(value: JsValue) = value.asJsObject.fields map { + case (k, JsString(v)) => Label(k, v) + case other => throw new UnsupportedOperationException(s"Cannot deserialize $other to a Label") + } toList } } diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala index 5a0b69bc6..f20a7aa15 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala @@ -4,6 +4,8 @@ import better.files.File sealed trait WorkflowSubmission { val wdl: String + val workflowType: Option[String] + val workflowTypeVersion: Option[String] val inputsJson: Option[String] val options: Option[String] val customLabels: Option[List[Label]] @@ -12,6 +14,8 @@ sealed trait WorkflowSubmission { } final case class WorkflowSingleSubmission(wdl: String, + workflowType: Option[String], + workflowTypeVersion: Option[String], inputsJson: Option[String], options: Option[String], customLabels: Option[List[Label]], @@ -19,11 +23,13 @@ final case class WorkflowSingleSubmission(wdl: String, refreshToken: Option[String]) extends WorkflowSubmission final case class WorkflowBatchSubmission(wdl: String, - inputsBatch: List[String], - options: Option[String], - customLabels: Option[List[Label]], - zippedImports: Option[File], - refreshToken: Option[String]) extends WorkflowSubmission { + workflowType: Option[String], + workflowTypeVersion: Option[String], + inputsBatch: List[String], + options: Option[String], + customLabels: Option[List[Label]], + zippedImports: Option[File], + refreshToken: Option[String]) extends WorkflowSubmission { - override val inputsJson: Option[String] = Option(inputsBatch.mkString(start="[", sep=",", end="]")) + override val inputsJson: Option[String] = Option(inputsBatch.mkString(start = "[", sep = ",", end = "]")) } diff --git a/database/migration/src/main/resources/changelog.xml b/database/migration/src/main/resources/changelog.xml index fc864a848..03883abb7 100644 --- a/database/migration/src/main/resources/changelog.xml +++ b/database/migration/src/main/resources/changelog.xml @@ -65,6 +65,7 @@ +