Skip to content

Commit

Permalink
Merge pull request #43 from Sage-Bionetworks/debug-testbed
Browse files Browse the repository at this point in the history
Debug testbed. In lieu of integration tests... fixing stuff while trying to run various example scenarios.
  • Loading branch information
jaeddy committed Sep 3, 2018
2 parents df7005b + 0365680 commit acc4a3c
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 59 deletions.
188 changes: 184 additions & 4 deletions README.md
@@ -1,12 +1,12 @@
# GA4GH Cloud Orchestrator
# GA4GH Workflow Interoperability

| **Service \ Branch** | **master** | **develop** |
| -: | - | - |
| CI status | [![Travis-CI Build Status](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator.svg?branch=master)](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator) | [![Travis-CI Build Status](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator.svg?branch=develop)](https://travis-ci.org/Sage-Bionetworks/synapse-orchestrator) |
| Test coverage | [![Coverage Status](https://coveralls.io/repos/github/Sage-Bionetworks/synapse-orchestrator/badge.svg?branch=master)](https://coveralls.io/Sage-Bionetworks/synapse-orchestrator?branch=master) | [![Coverage Status](https://coveralls.io/repos/github/Sage-Bionetworks/synapse-orchestrator/badge.svg?branch=develop)](https://coveralls.io/Sage-Bionetworks/synapse-orchestrator?branch=develop) |
| Docs status | *pending* | *pending* |

The initial use case for this app will be to act as a workflow orchestrator and bridge between TRS and WES endpoints for the [**Testbed Interoperability Platform**](https://docs.google.com/document/d/12Mq4v7o5VKF-DkFTQwsUQ-aWZ5aBeIcl_5YrhbaSv7M/edit?usp=sharing), a core deliverable of the GA4GH Cloud Workstream for 2018.

The initial use case for this app will be to act as a workflow orchestrator and bridge between **Tool Registry Service (TRS)** and **Workflow Execution Service (WES)** endpoints for the [**Testbed Interoperability Platform**](https://docs.google.com/document/d/12Mq4v7o5VKF-DkFTQwsUQ-aWZ5aBeIcl_5YrhbaSv7M/edit?usp=sharing), a core deliverable of the GA4GH Cloud Workstream for 2018.

## Overview

Expand All @@ -22,7 +22,187 @@ Additionally, the application supports the following operations:
+ Register and configure new WES endpoints;
+ Onboard/register a new workflow (by creating and configuring a queue with workflow details).

## Installation & Usage
## Installation

*Coming soon...*


## Usage

### Default settings

```python
from synorchestrator import config

config.show()
```

```console
Orchestrator options:

Workflow Evaluation Queues
(queue ID: workflow ID [version])
---------------------------------------------------------------------------
queue_1: None (None)
> workflow URL: file://tests/testdata/md5sum.cwl
> workflow type: CWL
> from TRS: None
> WES options: ['local']

Tool Registries
(TRS ID: host address)
---------------------------------------------------------------------------
dockstore: dockstore.org:8443

Workflow Services
(WES ID: host address)
---------------------------------------------------------------------------
local: 0.0.0.0:8080
```

<details>

<summary>View YAML</summary>

```yaml
queues:
queue_1:
trs_id: null
version_id: null
wes_default: local
wes_opts:
- local
workflow_attachments:
- file://tests/testdata/md5sum.input
- file://tests/testdata/dockstore-tool-md5sum.cwl
workflow_id: null
workflow_type: CWL
workflow_url: file://tests/testdata/md5sum.cwl
toolregistries:
dockstore:
auth: null
auth_type: null
host: dockstore.org:8443
proto: https
workflowservices:
local:
auth: null
auth_type: null
host: 0.0.0.0:8080
proto: http
```

</details>

### Add a workflow

```python
config.add_queue(queue_id='queue_2',
wf_type='CWL',
wf_id='github.com/dockstore-testing/md5sum-checker',
version_id='develop',
trs_id='dockstore')
```

```console
Workflow Evaluation Queues
(queue ID: workflow ID [version])
---------------------------------------------------------------------------
queue_2: github.com/dockstore-testing/md5sum-checker (develop)
> workflow URL: None
> workflow type: CWL
> from TRS: dockstore
> WES options: ['local']
queue_1: None (None)
> workflow URL: file://tests/testdata/md5sum.cwl
> workflow type: CWL
> from TRS: None
> WES options: ['local']

...
```

<details>

<summary>View YAML</summary>

```yaml
queue_2:
trs_id: dockstore
version_id: develop
wes_default: local
wes_opts:
- local
workflow_attachments: null
workflow_id: github.com/dockstore-testing/md5sum-checker
workflow_type: CWL
workflow_url: null
```

</details>

### Add a WES endpoint

```python
config.add_workflowservice(service='arvados-wes',
host='wes.qr1hi.arvadosapi.com',
auth='<my-api-token>',
auth_type='token',
proto='https')
```

```console
Workflow Services
(WES ID: host address)
---------------------------------------------------------------------------
arvados-wes: wes.qr1hi.arvadosapi.com
local: 0.0.0.0:8080
```

#### Connect a WES endpoint to a workflow queue

```python
config.add_wes_opt(queue_ids='queue_2', wes_ids='arvados-wes')
```

```console
Workflow Evaluation Queues
(queue ID: workflow ID [version])
---------------------------------------------------------------------------
queue_2: github.com/dockstore-testing/md5sum-checker (develop)
> workflow URL: None
> workflow type: CWL
> from TRS: dockstore
> WES options: ['local', 'arvados-wes']
```

### Running workflows

In a seperate terminal window or notebook, you can start a `monitor` process to keep track of any active workflow jobs.

```python
from synorchestrator import orchestrator

orchestrator.monitor()
```
#### Check a workflow

To check a workflow in the testbed...

```python
from synorchestrator import testbed

testbed.check_workflow(queue_id='queue_2', wes_id='local')
```

#### Run a workflow job

To run a workflow using a given set of parameters...

```python
from synorchestrator import orchestrator

orchestrator.run_job(queue_id='queue_1',
wes_id='local',
wf_jsonyaml='file://tests/testdata/md5sum.cwl.json')
```
6 changes: 4 additions & 2 deletions synorchestrator/config.py
Expand Up @@ -69,7 +69,8 @@ def add_queue(queue_id,
wf_url=None,
wf_attachments=None,
wes_default='local',
wes_opts=None):
wes_opts=None,
target_queue=None):
"""
Register a workflow evaluation queue to the orchestrator's
scope of work.
Expand All @@ -86,7 +87,8 @@ def add_queue(queue_id,
'workflow_url': wf_url,
'workflow_attachments': wf_attachments,
'wes_default': wes_default,
'wes_opts': wes_opts}
'wes_opts': wes_opts,
'target_queue': target_queue}
set_yaml('queues', queue_id, config)


Expand Down
69 changes: 45 additions & 24 deletions synorchestrator/orchestrator.py
Expand Up @@ -19,9 +19,11 @@
from synorchestrator.config import queue_config
from synorchestrator.util import get_json, ctime2datetime, convert_timedelta
from synorchestrator.wes.wrapper import WES
from trs2wes import fetch_queue_workflow
from synorchestrator.trs2wes import fetch_queue_workflow
from synorchestrator.trs2wes import store_verification
from synorchestrator.queue import get_submission_bundle
from synorchestrator.queue import get_submissions
from synorchestrator.queue import create_submission
from synorchestrator.queue import update_submission
from synorchestrator.queue import submission_queue

Expand All @@ -41,6 +43,9 @@ def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None):
wf_attachments += add_attachments
wf_attachments = list(set(wf_attachments))

submission_id = create_submission(queue_id=queue_id,
submission_data=wf_jsonyaml,
wes_id=wes_id)
wes_instance = WES(wes_id)
request = {'workflow_url': wf_config['workflow_url'],
'workflow_params': wf_jsonyaml,
Expand All @@ -49,6 +54,9 @@ def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None):
run_log['start_time'] = dt.datetime.now().ctime()
run_status = wes_instance.get_run_status(run_log['run_id'])['state']
run_log['status'] = run_status

update_submission(queue_id, submission_id, 'run_log', run_log)
update_submission(queue_id, submission_id, 'status', 'SUBMITTED')
return run_log


Expand Down Expand Up @@ -129,7 +137,13 @@ def monitor_queue(queue_id):
update_submission(queue_id, sub_id, 'run_log', run_log)

if run_log['status'] == 'COMPLETE':
update_submission(queue_id, sub_id, 'status', 'VALIDATED')
wf_config = queue_config()[queue_id]
sub_status = run_log['status']
if 'target_queue' in wf_config:
store_verification(wf_config['target_queue'],
submission['wes_id'])
sub_status = 'VALIDATED'
update_submission(queue_id, sub_id, 'status', sub_status)

run_log['wes_id'] = submission['wes_id']
queue_log[sub_id] = run_log
Expand All @@ -142,26 +156,33 @@ def monitor():
Monitor progress of workflow jobs.
"""
import pandas as pd
pd.set_option('display.width', 100)
pd.set_option('display.width', 1000)
pd.set_option('display.max_columns', 10)
pd.set_option('display.expand_frame_repr', False)

try:
while True:
statuses = []

clear_output(wait=True)
os.system('clear')

for queue_id in queue_config():
statuses.append(monitor_queue(queue_id))
if all([status == {} for status in statuses]):
print("No jobs running...")
else:
status_tracker = pd.DataFrame.from_dict(
{i: status[i]
for status in statuses
for i in status},
orient='index')

display(status_tracker)
sys.stdout.flush()

time.sleep(1)
except KeyboardInterrupt:
print("\nDone")
return

statuses = []

for queue_id in queue_config():
statuses.append(monitor_queue(queue_id))
status_tracker = pd.DataFrame.from_dict(
{i: status[i]
for status in statuses
for i in status},
orient='index')

clear_output(wait=True)
os.system('clear')
display(status_tracker)
sys.stdout.flush()
if any(status_tracker['status']
.isin(['QUEUED', 'INITIALIZING', 'RUNNING'])):
time.sleep(1)
monitor()
else:
print("Done")
return statuses
13 changes: 3 additions & 10 deletions synorchestrator/testbed.py
Expand Up @@ -11,6 +11,7 @@
from synorchestrator.config import queue_config
from synorchestrator.config import trs_config
from synorchestrator.config import wes_config
from synorchestrator.config import set_yaml
from synorchestrator.trs.wrapper import TRS
from synorchestrator.wes.wrapper import WES
from synorchestrator.queue import create_submission
Expand Down Expand Up @@ -80,7 +81,8 @@ def check_workflow(queue_id, wes_id):
wf_id=checker_id,
version_id=wf_config['version_id'],
wes_default=wf_config['wes_default'],
wes_opts=wf_config['wes_opts'])
wes_opts=wf_config['wes_opts'],
target_queue=queue_id)

checker_job = trs_instance.get_workflow_tests(id=checker_id,
version_id=wf_config['version_id'],
Expand All @@ -103,12 +105,3 @@ def check_all(workflow_wes_map):
return submission_logs


# def post_verification(self, id, version_id, type, relative_path, requests):
# """
# Annotate test JSON with information on whether it ran successfully on particular platforms plus metadata
# """
# id = _format_workflow_id(id)
# endpoint ='extended/{}/versions/{}/{}/tests/{}'.format(
# id, version_id, type, relative_path
# )
# return _post_to_endpoint(self, endpoint, requests)
2 changes: 1 addition & 1 deletion synorchestrator/trs/client.py
Expand Up @@ -24,7 +24,7 @@ def _init_http_client(service_id=None, opts=None):
"""
auth_header = {'token': 'Authorization',
'api_key': 'X-API-KEY',
'': ''}
None: ''}
if service_id:
opts = _get_trs_opts(service_id)

Expand Down
20 changes: 19 additions & 1 deletion synorchestrator/trs2wes.py
Expand Up @@ -33,4 +33,22 @@ def fetch_queue_workflow(queue_id):
set_yaml('queues', queue_id, wf_config)
return wf_config



def store_verification(queue_id, wes_id):
"""
Record checker status for selected workflow and environment.
"""
wf_config = queue_config()[queue_id]
wf_config.setdefault('wes_verified', []).append(wes_id)
set_yaml('queues', queue_id, wf_config)


# def post_verification(self, id, version_id, type, relative_path, requests):
# """
# Annotate test JSON with information on whether it ran successfully on particular platforms plus metadata
# """
# id = _format_workflow_id(id)
# endpoint ='extended/{}/versions/{}/{}/tests/{}'.format(
# id, version_id, type, relative_path
# )
# return _post_to_endpoint(self, endpoint, requests)

0 comments on commit acc4a3c

Please sign in to comment.