Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testbed module #38

Merged
merged 17 commits into from
Aug 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,5 @@ ENV/
# local config files
*.config
*.evals

.DS_Store
75 changes: 13 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,77 +1,28 @@
# Synapse Workflow Orchestrator
# GA4GH Cloud Orchestrator

| **Service \ Branch** | **master** | **develop** |
| -: | - | - |
| CI status | [![Travis-CI Build Status](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator.svg?branch=master)](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator) | [![Travis-CI Build Status](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator.svg?branch=develop)](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator) |
| Test coverage | [![Coverage Status](https://coveralls.io/repos/github/Sage-Bionetworks/synapse-orchestrator/badge.svg?branch=master)](https://coveralls.io/Sage-Bionetworks/synapse-orchestrator?branch=master) | [![Coverage Status](https://coveralls.io/repos/github/Sage-Bionetworks/synapse-orchestrator/badge.svg?branch=develop)](https://coveralls.io/Sage-Bionetworks/synapse-orchestrator?branch=develop) |
| Docs status | *pending* | *pending* |

This application serves as a "workflow orchestrator" for GA4GH-style workflows, using the Synapse [**Evaluation Services**](http://docs.synapse.org/rest/#org.sagebionetworks.repo.web.controller.EvaluationController) to manage queues and submissions.
The initial use case for this app will be to act as a workflow orchestrator and bridge between TRS and WES endpoints for the [**Testbed Interoperability Platform**](https://docs.google.com/document/d/12Mq4v7o5VKF-DkFTQwsUQ-aWZ5aBeIcl_5YrhbaSv7M/edit?usp=sharing), a core deliverable of the GA4GH Cloud Workstream for 2018.

## GA4GH Workflow Portability Testbed
## Overview

The initial use case for this app will be to act as a workflow orchestrator for the [**Testbed Interoperability Platform**](https://docs.google.com/document/d/12Mq4v7o5VKF-DkFTQwsUQ-aWZ5aBeIcl_5YrhbaSv7M/edit?usp=sharing), a core deliverable of the GA4GH Cloud Workstream for 2018. This platform and the orchestrator will also support the next round of the **GA4GH/DREAM Workflow Execution Challenge**.
In the context of the testbed, the orchestrator performs 3 primary tasks:

Per the requirements linked in the document above, the orchestrator needs to perform at least 3 basic tasks:
1. Look up a workflow registered in a TRS implementation, identify its corresponding "checker" workflow, and retrieve any data required to run the checker workflow;
2. Format checker workflow data and initiate new workflow runs on one or more WES endpoints;
3. Reports results.

1. Makes TRS call to fetch a workflow
2. Makes WES call to run (and check) a workflow
3. Reports results
Additionally, the application supports the following operations:

Some other obvious functionality that we'll want to include (that we haven't necessarily implemented yet):
+ Register and configure new TRS endpoints;
+ Register and configure new WES endpoints;
+ Onboard/register a new workflow (by creating and configuring a queue with workflow details).

+ The ability to onboard/register a new workflow (~ create and configure a queue)
+ The ability to register a new WES endpoint
+ The ability to submit or trigger a new workflow execution job (run)
+ A central mechanism for reporting results across workflows, WES endpoints, and specific runs/parameters
## Installation & Usage

These latter features are where existing Synapse systems come into play. Most of what we plan to do (at least for a proof of concept demonstration) can be accomplished with existing functionality of the `evaluation` API, the Synapse leaderboards, and various scripts/functions that have been designed to use these services to manage DREAM Challenges.
*Coming soon...*

### Organization

Current modules:
+ **`eval`**:
+ creates, configures, and gets information about Synapse evaluation queues
+ retrieves and manages information (e.g. status) about individual submissions to an evaluation queue
+ currently just stubs and stores submissions and queues locally — Synapse integration coming soon
+ **`trs`**:
+ acts as a lightweight client for the TRS API
+ retrieves (and possibly updates) information about a workflow from a tool registry service (e.g. Dockstore)
+ submits `requests` based on a subset of the Swagger spec for the GA4GH Tool Registry Service schema (as opposed to any CLI that might exist for a TRS implemenation)
+ **`wes`**:
+ acts as a lightweight client for the WES API
+ manages interactions with a workflow execution service endpoint, including submitting new workflow jobs, monitoring workflow run progress, and collecting results
+ submits `requests` based on a subset of the Swagger spec for the GA4GH Workflow Execution Service schema
+ **`orchestrator`**:
+ functions to glue together the various services above
+ currently able to (1) take a given ID/URL for a workflow registered in a given TRS implementation; (2) prepare the workflow run request, including retrieval (and formatting) of parameters, if not provided; (3) post the workflow run request to a given WES implementation; (4) monitor and report results of the workflow run
+ authentication/authorization might need to be handled here as well
+ **`config`**:
+ includes functions for registering TRS and WES endpoints (adding them to the scope of options for the `orchestrator`)
+ should eventually include more functionality for specifying the parameters of individual evaluation queues and endpoints

### Development

This software is still in pre-alpha phase, with frequent changes being made to the ["development" branch](https://github.com/Sage-Bionetworks/synapse-orchestrator/tree/develop). To work with or contribute to the latest version, clone this repo, check out the `develop` branch (if not active by default), and install from source. If you plan to make changes to the code, use the `-e` mode to make the installation follow the head without having to reinstall (using `conda` or `virtualenv` to create an isolated test environment is recommended).

(example environment setup)
```
conda create -n synorchestrator python=2.7
source activate synorchestrator
```

```
git clone git://github.com/Sage-Bionetworks/synapse-orchestrator.git
cd synapse-orchestrator
pip install -e .
```

### Contribute changes

Switch back to the top level (`synapse-orchestrator`) folder, check out a new branch off of `develop`, edit the code, commit changes, open a pull request.

### TODO

+ add functions/arguments for configuring (registering TRS/WES endpoints, authenticating, etc.) the `orchestrator`
+ figure out how to configure and connect Synapse evaluation queues and submissions to `orchestrator`
+ update `travis.yml` to build `synapse-orchestrator` (and `workflow-service`) before running tests
43 changes: 24 additions & 19 deletions synorchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import logging
import os

from synorchestrator.util import get_yaml, save_yaml, heredoc

logging.basicConfig(level=logging.INFO)
Expand All @@ -14,8 +15,8 @@
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')


def eval_config():
return get_yaml(config_path)['evals']
def queue_config():
return get_yaml(config_path)['queues']


def trs_config():
Expand All @@ -26,40 +27,44 @@ def wes_config():
return get_yaml(config_path)['workflowservices']


def add_eval(wf_name,
wf_type,
wf_url,
wf_jsonyaml,
wf_attachments,
submission_type='params',
trs_id='dockstore',
version_id='develop',
wf_id=''):
def add_queue(queue_id,
wf_type,
trs_id='dockstore',
wf_id=None,
version_id='local',
wf_url=None,
wf_attachments=None,
wes_default='local',
wes_opts=None):
"""
Register a Synapse evaluation queue to the orchestrator's
scope of work.

:param eval_id: integer ID of a Synapse evaluation queue
"""
config = {'submission_type': submission_type,
# TODO: require either workflow url/attachments OR
# TRS information for retrieval
wes_opts = [wes_default] if wes_opts is None else wes_opts
config = {'workflow_type': wf_type,
'trs_id': trs_id,
'version_id': version_id,
'workflow_id': wf_id,
'workflow_type': wf_type,
'version_id': version_id,
'workflow_url': wf_url,
'workflow_jsonyaml': wf_jsonyaml,
'workflow_attachments': wf_attachments}
set_yaml('evals', wf_name, config)
'workflow_attachments': wf_attachments,
'wes_default': wes_default,
'wes_opts': wes_opts}
set_yaml('queues', queue_id, config)


def add_toolregistry(service, auth, host, proto):
def add_toolregistry(service, auth, auth_type, host, proto):
"""
Register a Tool Registry Service endpoint to the orchestrator's
search space for workflows.

:param trs_id: string ID of TRS endpoint (e.g., 'Dockstore')
"""
config = {'auth': auth,
'auth_type': auth,
'host': host,
'proto': proto}
set_yaml('toolregistries', service, config)
Expand Down Expand Up @@ -90,7 +95,7 @@ def show():
Show current application configuration.
"""
orchestrator_config = get_yaml(config_path)
evals = '\n'.join('{}:\t{}\t[{}]'.format(k, orchestrator_config['evals'][k]['workflow_id'], orchestrator_config['evals'][k]['workflow_type']) for k in orchestrator_config['evals'])
workflows = '\n'.join('{}:\t{}\t[{}]'.format(k, orchestrator_config['queues'][k]['workflow_id'], orchestrator_config['queues'][k]['workflow_type']) for k in orchestrator_config['queues'])
trs = '\n'.join('{}: {}'.format(k, orchestrator_config['toolregistries'][k]['host']) for k in orchestrator_config['toolregistries'])
wes = '\n'.join('{}: {}'.format(k, orchestrator_config['workflowservices'][k]['host']) for k in orchestrator_config['workflowservices'])
display = heredoc('''
Expand Down
62 changes: 32 additions & 30 deletions synorchestrator/config.yaml
Original file line number Diff line number Diff line change
@@ -1,45 +1,47 @@
evals:
cwl_md5sum:
submission_type: params
trs_id: dockstore
queues:
queue_1:
trs_id: ''
version_id: develop
wes_default: local
wes_opts:
- local
workflow_attachments:
- file:///home/quokka/git/workflow-service/testdata/md5sum.input
- file:///home/quokka/git/workflow-service/testdata/dockstore-tool-md5sum.cwl
- file://tests/testdata/md5sum.input
- file://tests/testdata/dockstore-tool-md5sum.cwl
workflow_id: ''
workflow_jsonyaml: file:///home/quokka/git/workflow-service/testdata/md5sum.cwl.json
workflow_type: CWL
workflow_url: /home/quokka/git/workflow-service/testdata/md5sum.cwl
wdl_UoM_align:
submission_type: params
workflow_url: file://tests/testdata/md5sum.cwl
queue_2:
trs_id: dockstore
version_id: develop
workflow_attachments: []
workflow_id: ''
workflow_jsonyaml: file:///home/quokka/Desktop/topmed-workflows/aligner/u_of_michigan_aligner/u_of_michigan_aligner.json
workflow_type: WDL
workflow_url: /home/quokka/Desktop/topmed-workflows/aligner/u_of_michigan_aligner/u_of_michigan_aligner.wdl
wes_default: local
wes_opts:
- local
workflow_attachments:
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-tool.cwl'
workflow_id: github.com/dockstore-testing/md5sum-checker
workflow_type: CWL
workflow_url: !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-workflow.cwl'
queue_2_checker:
trs_id: dockstore
version_id: develop
wes_default: local
wes_opts:
- local
workflow_attachments:
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/checker/md5sum-checker.cwl'
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-tool.cwl'
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-workflow.cwl'
workflow_id: !!python/unicode 'github.com/dockstore-testing/md5sum-checker/_cwl_checker'
workflow_type: CWL
workflow_url: !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/checker-workflow-wrapping-workflow.cwl'
toolregistries:
dockstore:
auth: ''
auth_type: ''
host: dockstore.org:8443
proto: https
workflowservices:
arvados-wes:
auth: ''
auth_type: ''
host: wes.qr1hi.arvadosapi.com
proto: http
aws-toil-server:
auth: ''
auth_type: ''
host: 54.193.12.111:8080
proto: http
hca-cromwell:
auth: ''
auth_type: ''
host: g0n2qjnu94.execute-api.us-east-1.amazonaws.com/test
proto: http
local:
auth: ''
auth_type: ''
Expand Down
25 changes: 17 additions & 8 deletions synorchestrator/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

logger = logging.getLogger(__name__)


submission_queue = os.path.join(os.path.dirname(__file__), 'submission_queue.json')


def create_submission(wes_id, submission_data, wf_type='cwl', wf_name='wflow0'):
def create_queue():
pass


def create_submission(queue_id, submission_data, wes_id=None):
"""
Submit a new job request to an evaluation queue.

Expand All @@ -18,20 +23,24 @@ def create_submission(wes_id, submission_data, wf_type='cwl', wf_name='wflow0'):
submissions = get_json(submission_queue)
submission_id = dt.datetime.now().strftime('%d%m%d%H%M%S%f')

submissions.setdefault(wes_id, {})[submission_id] = {'status': 'RECEIVED',
'data': submission_data,
'wf_id': wf_name,
'type': wf_type}
submission = {'status': 'RECEIVED',
'data': submission_data,
'wes_id': wes_id}
submissions.setdefault(queue_id, {})[submission_id] = submission
save_json(submission_queue, submissions)
logger.info(" Queueing Job for '{}' endpoint:"
logger.info(" Queueing job for '{}' endpoint:"
"\n - submission ID: {}".format(wes_id, submission_id))
return submission_id


def get_submissions(wes_id, status='RECEIVED'):
def get_submissions(queue_id, status='RECEIVED'):
"""Return all ids with the requested status."""
submissions = get_json(submission_queue)
return [id for id, bundle in submissions[wes_id].items() if bundle['status'] == status]
try:
return [id for id, bundle in submissions[queue_id].items()
if bundle['status'] == status]
except KeyError:
return []


def get_submission_bundle(wes_id, submission_id):
Expand Down