Skip to content

Commit

Permalink
Merge 6e6c5f8 into e48c26b
Browse files Browse the repository at this point in the history
  • Loading branch information
jaeddy committed Aug 30, 2018
2 parents e48c26b + 6e6c5f8 commit 9bdefdb
Show file tree
Hide file tree
Showing 31 changed files with 1,036 additions and 549 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,5 @@ ENV/
# local config files
*.config
*.evals

.DS_Store
43 changes: 24 additions & 19 deletions synorchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import logging
import os

from synorchestrator.util import get_yaml, save_yaml, heredoc

logging.basicConfig(level=logging.INFO)
Expand All @@ -14,8 +15,8 @@
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')


def eval_config():
return get_yaml(config_path)['evals']
def queue_config():
return get_yaml(config_path)['queues']


def trs_config():
Expand All @@ -26,40 +27,44 @@ def wes_config():
return get_yaml(config_path)['workflowservices']


def add_eval(wf_name,
wf_type,
wf_url,
wf_jsonyaml,
wf_attachments,
submission_type='params',
trs_id='dockstore',
version_id='develop',
wf_id=''):
def add_queue(queue_id,
wf_type,
trs_id='dockstore',
wf_id=None,
version_id='local',
wf_url=None,
wf_attachments=None,
wes_default='local',
wes_opts=None):
"""
Register a Synapse evaluation queue to the orchestrator's
scope of work.
:param eval_id: integer ID of a Synapse evaluation queue
"""
config = {'submission_type': submission_type,
# TODO: require either workflow url/attachments OR
# TRS information for retrieval
wes_opts = [wes_default] if wes_opts is None else wes_opts
config = {'workflow_type': wf_type,
'trs_id': trs_id,
'version_id': version_id,
'workflow_id': wf_id,
'workflow_type': wf_type,
'version_id': version_id,
'workflow_url': wf_url,
'workflow_jsonyaml': wf_jsonyaml,
'workflow_attachments': wf_attachments}
set_yaml('evals', wf_name, config)
'workflow_attachments': wf_attachments,
'wes_default': wes_default,
'wes_opts': wes_opts}
set_yaml('queues', queue_id, config)


def add_toolregistry(service, auth, host, proto):
def add_toolregistry(service, auth, auth_type, host, proto):
"""
Register a Tool Registry Service endpoint to the orchestrator's
search space for workflows.
:param trs_id: string ID of TRS endpoint (e.g., 'Dockstore')
"""
config = {'auth': auth,
'auth_type': auth,
'host': host,
'proto': proto}
set_yaml('toolregistries', service, config)
Expand Down Expand Up @@ -90,7 +95,7 @@ def show():
Show current application configuration.
"""
orchestrator_config = get_yaml(config_path)
evals = '\n'.join('{}:\t{}\t[{}]'.format(k, orchestrator_config['evals'][k]['workflow_id'], orchestrator_config['evals'][k]['workflow_type']) for k in orchestrator_config['evals'])
workflows = '\n'.join('{}:\t{}\t[{}]'.format(k, orchestrator_config['queues'][k]['workflow_id'], orchestrator_config['queues'][k]['workflow_type']) for k in orchestrator_config['queues'])
trs = '\n'.join('{}: {}'.format(k, orchestrator_config['toolregistries'][k]['host']) for k in orchestrator_config['toolregistries'])
wes = '\n'.join('{}: {}'.format(k, orchestrator_config['workflowservices'][k]['host']) for k in orchestrator_config['workflowservices'])
display = heredoc('''
Expand Down
62 changes: 32 additions & 30 deletions synorchestrator/config.yaml
Original file line number Diff line number Diff line change
@@ -1,45 +1,47 @@
evals:
cwl_md5sum:
submission_type: params
trs_id: dockstore
queues:
queue_1:
trs_id: ''
version_id: develop
wes_default: local
wes_opts:
- local
workflow_attachments:
- file:///home/quokka/git/workflow-service/testdata/md5sum.input
- file:///home/quokka/git/workflow-service/testdata/dockstore-tool-md5sum.cwl
- file://tests/testdata/md5sum.input
- file://tests/testdata/dockstore-tool-md5sum.cwl
workflow_id: ''
workflow_jsonyaml: file:///home/quokka/git/workflow-service/testdata/md5sum.cwl.json
workflow_type: CWL
workflow_url: /home/quokka/git/workflow-service/testdata/md5sum.cwl
wdl_UoM_align:
submission_type: params
workflow_url: file://tests/testdata/md5sum.cwl
queue_2:
trs_id: dockstore
version_id: develop
workflow_attachments: []
workflow_id: ''
workflow_jsonyaml: file:///home/quokka/Desktop/topmed-workflows/aligner/u_of_michigan_aligner/u_of_michigan_aligner.json
workflow_type: WDL
workflow_url: /home/quokka/Desktop/topmed-workflows/aligner/u_of_michigan_aligner/u_of_michigan_aligner.wdl
wes_default: local
wes_opts:
- local
workflow_attachments:
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-tool.cwl'
workflow_id: github.com/dockstore-testing/md5sum-checker
workflow_type: CWL
workflow_url: !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-workflow.cwl'
queue_2_checker:
trs_id: dockstore
version_id: develop
wes_default: local
wes_opts:
- local
workflow_attachments:
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/checker/md5sum-checker.cwl'
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-tool.cwl'
- !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/md5sum/md5sum-workflow.cwl'
workflow_id: !!python/unicode 'github.com/dockstore-testing/md5sum-checker/_cwl_checker'
workflow_type: CWL
workflow_url: !!python/unicode 'https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/develop/checker-workflow-wrapping-workflow.cwl'
toolregistries:
dockstore:
auth: ''
auth_type: ''
host: dockstore.org:8443
proto: https
workflowservices:
arvados-wes:
auth: ''
auth_type: ''
host: wes.qr1hi.arvadosapi.com
proto: http
aws-toil-server:
auth: ''
auth_type: ''
host: 54.193.12.111:8080
proto: http
hca-cromwell:
auth: ''
auth_type: ''
host: g0n2qjnu94.execute-api.us-east-1.amazonaws.com/test
proto: http
local:
auth: ''
auth_type: ''
Expand Down
17 changes: 11 additions & 6 deletions synorchestrator/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

logger = logging.getLogger(__name__)


submission_queue = os.path.join(os.path.dirname(__file__), 'submission_queue.json')


def create_submission(wes_id, submission_data, wf_type='cwl', wf_name='wflow0'):
def create_queue():
pass


def create_submission(queue_id, submission_data, wes_id=None):
"""
Submit a new job request to an evaluation queue.
Expand All @@ -18,12 +23,12 @@ def create_submission(wes_id, submission_data, wf_type='cwl', wf_name='wflow0'):
submissions = get_json(submission_queue)
submission_id = dt.datetime.now().strftime('%d%m%d%H%M%S%f')

submissions.setdefault(wes_id, {})[submission_id] = {'status': 'RECEIVED',
'data': submission_data,
'wf_id': wf_name,
'type': wf_type}
submission = {'status': 'RECEIVED',
'data': submission_data,
'wes_id': wes_id}
submissions.setdefault(queue_id, {})[submission_id] = submission
save_json(submission_queue, submissions)
logger.info(" Queueing Job for '{}' endpoint:"
logger.info(" Queueing job for '{}' endpoint:"
"\n - submission ID: {}".format(wes_id, submission_id))
return submission_id

Expand Down
125 changes: 28 additions & 97 deletions synorchestrator/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
import datetime as dt
import re

from StringIO import StringIO
from requests.exceptions import ConnectionError
from IPython.display import display, clear_output

from synorchestrator.config import wes_config, trs_config
from synorchestrator.config import eval_config as queue_config
from synorchestrator.config import queue_config
from synorchestrator.util import get_json, ctime2datetime, convert_timedelta
from synorchestrator.wes.wrapper import WES
from synorchestrator.trs.wrapper import TRS
from synorchestrator.eval import create_submission
from trs2wes import fetch_queue_workflow
from synorchestrator.eval import get_submission_bundle
from synorchestrator.eval import get_submissions
from synorchestrator.eval import update_submission
Expand All @@ -32,27 +31,27 @@
logger = logging.getLogger(__name__)


def no_queue_run(service, wf_name):
def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None):
"""
Put a workflow in the queue and immmediately run it.
:param service:
:param wf_name:
:return:
"""
# fetch workflow params from config file
#
# synorchestrator.config.add_eval() can be used to add a workflow
# to this file
wf = eval_config()[wf_name]
wf_data = {'wf': wf['workflow_url'],
'jsonyaml': wf['workflow_jsonyaml'],
'attachments': wf['workflow_attachments']}
submission_id = create_submission(wes_id=service,
submission_data=wf_data,
wf_name=wf_name,
wf_type=wf['workflow_type'])
run_submission(service, submission_id)
wf_config = queue_config()[queue_id]
if wf_config['workflow_url'] is None:
wf_config = fetch_queue_workflow(queue_id)
wf_attachments = wf_config['workflow_attachments']
if add_attachments is not None:
wf_attachments += add_attachments
wf_attachments = list(set(wf_attachments))

wes_instance = WES(wes_id)
request = {'workflow_url': wf_config['workflow_url'],
'workflow_params': wf_jsonyaml,
'attachment': wf_attachments}
run_data = wes_instance.run_workflow(request)
run_data['start_time'] = dt.datetime.now().ctime()
run_status = wes_instance.get_run_status(run_data['run_id'])['state']
run_data['status'] = run_status
return run_data


def run_submission(queue_id, submission_id, wes_id=None):
Expand All @@ -67,15 +66,13 @@ def run_submission(queue_id, submission_id, wes_id=None):
logger.info(" Submitting to WES endpoint '{}':"
" \n - submission ID: {}"
.format(wes_id, submission_id))
wf_jsonyaml = submission['data']
logger.info(" Job parameters: '{}'".format(wf_jsonyaml))

wes_instance = WES(wes_config()[wes_id])
run_data = wes_instance.run_workflow(submission['data']['wf'],
submission['data']['jsonyaml'],
submission['data']['attachments'])
run_data['start_time'] = dt.datetime.now().ctime()
run_data['status'] = wes_instance.get_run_status(run_data['run_id'])
update_submission(wes_id, submission_id, 'run', run_data)
update_submission(wes_id, submission_id, 'status', 'SUBMITTED')
run_data = run_job(queue_id, wes_id, wf_jsonyaml)

update_submission(queue_id, submission_id, 'run', run_data)
update_submission(queue_id, submission_id, 'status', 'SUBMITTED')
return run_data


Expand All @@ -88,7 +85,6 @@ def run_queue(queue_id, wes_id=None):
for submission_id in get_submissions(queue_id, status='RECEIVED'):
run_data = run_submission(queue_id, submission_id, wes_id)
log_entry = {'queue_id': queue_id,
'job': run_data['type'],
'wes_id': wes_id,
'run_id': run_data['run_id'],
'status': run_data['status'],
Expand All @@ -110,71 +106,6 @@ def run_next_queued(queue_id):
return run_submission(queue_id, sub_id)


def fetch_checker(trs, workflow_id):
checker_workflow = trs.get_workflow_checker(workflow_id)

checker_descriptor = trs.get_workflow_descriptor(
id=checker_workflow['id'],
version_id=workflow_config['version_id'],
type=workflow_config['workflow_type']
)

checker_tests = trs.get_workflow_tests(
id=checker_workflow['id'],
version_id=workflow_config['version_id'],
type=workflow_config['workflow_type']
)
return checker_descriptor, checker_tests


def build_checker_request(checker_descriptor, checker_tests):
if (checker_descriptor['type'] == 'CWL' and
re.search('run:', checker_descriptor['content'])):
checker_descriptor['content'] = get_packed_cwl(checker_descriptor['url'])

checker_request = build_wes_request(
workflow_descriptor=checker_descriptor['content'],
workflow_params=checker_tests[0]['url'],
workflow_type=checker_descriptor['type']
)
return checker_request


def check_queue(queue_id, wes_id):
"""
Run checker workflow for an evaluation queue in a single
environment.
"""
wf_config = queue_config()[queue_id]
wf_config['id'] = wf_config['workflow_id']
logger.info("Preparing checker workflow run request for '{}' from '{}''"
.format(wf_config['id'], wf_config['trs_id']))

trs_instance = TRS(**trs_config()[wf_config['trs_id']])

checker_descriptor, checker_tests = fetch_checker(
trs=trs_instance,
workflow_id=wf_config['id']
)
checker_request = build_checker_request(checker_descriptor, checker_tests)

submission_id = create_submission(
queue_id, checker_request, wes_id, type='checker'
)
return run_queue(queue_id)


def check_all(queue_wes_map):
"""
Check workflows for multiple queues in multiple environments
(cross product of queues, workflow service endpoints).
"""
submission_logs = [check_queue(queue_id, wes_id)
for queue_id in queue_wes_map
for wes_id in queue_wes_map[queue_id]]
return submission_logs


def run_all():
"""
Run all jobs with the status: RECEIVED across all evaluation queues.
Expand All @@ -197,7 +128,7 @@ def monitor_queue(queue_handle, submission_log):
log_entry = submission_log[queue_handle]
queue_status = {}
for sub_id, sub_log in log_entry.items():
wes_instance = WES(**wes_config()[sub_log['wes_id']])
wes_instance = WES(sub_log['wes_id'])
run_status = wes_instance.get_run_status(sub_log['run_id'])
if run_status['state'] in ['QUEUED', 'INITIALIZING', 'RUNNING']:
etime = convert_timedelta(
Expand Down

0 comments on commit 9bdefdb

Please sign in to comment.