Skip to content

Commit

Permalink
Merge 70e03b0 into df7005b
Browse files Browse the repository at this point in the history
  • Loading branch information
jaeddy committed Sep 3, 2018
2 parents df7005b + 70e03b0 commit b82b278
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 55 deletions.
6 changes: 4 additions & 2 deletions synorchestrator/config.py
Expand Up @@ -69,7 +69,8 @@ def add_queue(queue_id,
wf_url=None,
wf_attachments=None,
wes_default='local',
wes_opts=None):
wes_opts=None,
target_queue=None):
"""
Register a workflow evaluation queue to the orchestrator's
scope of work.
Expand All @@ -86,7 +87,8 @@ def add_queue(queue_id,
'workflow_url': wf_url,
'workflow_attachments': wf_attachments,
'wes_default': wes_default,
'wes_opts': wes_opts}
'wes_opts': wes_opts,
'target_queue': target_queue}
set_yaml('queues', queue_id, config)


Expand Down
69 changes: 45 additions & 24 deletions synorchestrator/orchestrator.py
Expand Up @@ -19,9 +19,11 @@
from synorchestrator.config import queue_config
from synorchestrator.util import get_json, ctime2datetime, convert_timedelta
from synorchestrator.wes.wrapper import WES
from trs2wes import fetch_queue_workflow
from synorchestrator.trs2wes import fetch_queue_workflow
from synorchestrator.trs2wes import store_verification
from synorchestrator.queue import get_submission_bundle
from synorchestrator.queue import get_submissions
from synorchestrator.queue import create_submission
from synorchestrator.queue import update_submission
from synorchestrator.queue import submission_queue

Expand All @@ -41,6 +43,9 @@ def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None):
wf_attachments += add_attachments
wf_attachments = list(set(wf_attachments))

submission_id = create_submission(queue_id=queue_id,
submission_data=wf_jsonyaml,
wes_id=wes_id)
wes_instance = WES(wes_id)
request = {'workflow_url': wf_config['workflow_url'],
'workflow_params': wf_jsonyaml,
Expand All @@ -49,6 +54,9 @@ def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None):
run_log['start_time'] = dt.datetime.now().ctime()
run_status = wes_instance.get_run_status(run_log['run_id'])['state']
run_log['status'] = run_status

update_submission(queue_id, submission_id, 'run_log', run_log)
update_submission(queue_id, submission_id, 'status', 'SUBMITTED')
return run_log


Expand Down Expand Up @@ -129,7 +137,13 @@ def monitor_queue(queue_id):
update_submission(queue_id, sub_id, 'run_log', run_log)

if run_log['status'] == 'COMPLETE':
update_submission(queue_id, sub_id, 'status', 'VALIDATED')
wf_config = queue_config()[queue_id]
sub_status = run_log['status']
if 'target_queue' in wf_config:
store_verification(wf_config['target_queue'],
submission['wes_id'])
sub_status = 'VALIDATED'
update_submission(queue_id, sub_id, 'status', sub_status)

run_log['wes_id'] = submission['wes_id']
queue_log[sub_id] = run_log
Expand All @@ -142,26 +156,33 @@ def monitor():
Monitor progress of workflow jobs.
"""
import pandas as pd
pd.set_option('display.width', 100)
pd.set_option('display.width', 1000)
pd.set_option('display.max_columns', 10)
pd.set_option('display.expand_frame_repr', False)

try:
while True:
statuses = []

clear_output(wait=True)
os.system('clear')

for queue_id in queue_config():
statuses.append(monitor_queue(queue_id))
if all([status == {} for status in statuses]):
print("No jobs running...")
else:
status_tracker = pd.DataFrame.from_dict(
{i: status[i]
for status in statuses
for i in status},
orient='index')

display(status_tracker)
sys.stdout.flush()

time.sleep(1)
except KeyboardInterrupt:
print("\nDone")
return

statuses = []

for queue_id in queue_config():
statuses.append(monitor_queue(queue_id))
status_tracker = pd.DataFrame.from_dict(
{i: status[i]
for status in statuses
for i in status},
orient='index')

clear_output(wait=True)
os.system('clear')
display(status_tracker)
sys.stdout.flush()
if any(status_tracker['status']
.isin(['QUEUED', 'INITIALIZING', 'RUNNING'])):
time.sleep(1)
monitor()
else:
print("Done")
return statuses
13 changes: 3 additions & 10 deletions synorchestrator/testbed.py
Expand Up @@ -11,6 +11,7 @@
from synorchestrator.config import queue_config
from synorchestrator.config import trs_config
from synorchestrator.config import wes_config
from synorchestrator.config import set_yaml
from synorchestrator.trs.wrapper import TRS
from synorchestrator.wes.wrapper import WES
from synorchestrator.queue import create_submission
Expand Down Expand Up @@ -80,7 +81,8 @@ def check_workflow(queue_id, wes_id):
wf_id=checker_id,
version_id=wf_config['version_id'],
wes_default=wf_config['wes_default'],
wes_opts=wf_config['wes_opts'])
wes_opts=wf_config['wes_opts'],
target_queue=queue_id)

checker_job = trs_instance.get_workflow_tests(id=checker_id,
version_id=wf_config['version_id'],
Expand All @@ -103,12 +105,3 @@ def check_all(workflow_wes_map):
return submission_logs


# def post_verification(self, id, version_id, type, relative_path, requests):
# """
# Annotate test JSON with information on whether it ran successfully on particular platforms plus metadata
# """
# id = _format_workflow_id(id)
# endpoint ='extended/{}/versions/{}/{}/tests/{}'.format(
# id, version_id, type, relative_path
# )
# return _post_to_endpoint(self, endpoint, requests)
2 changes: 1 addition & 1 deletion synorchestrator/trs/client.py
Expand Up @@ -24,7 +24,7 @@ def _init_http_client(service_id=None, opts=None):
"""
auth_header = {'token': 'Authorization',
'api_key': 'X-API-KEY',
'': ''}
None: ''}
if service_id:
opts = _get_trs_opts(service_id)

Expand Down
20 changes: 19 additions & 1 deletion synorchestrator/trs2wes.py
Expand Up @@ -33,4 +33,22 @@ def fetch_queue_workflow(queue_id):
set_yaml('queues', queue_id, wf_config)
return wf_config



def store_verification(queue_id, wes_id):
"""
Record checker status for selected workflow and environment.
"""
wf_config = queue_config()[queue_id]
wf_config.setdefault('wes_verified', []).append(wes_id)
set_yaml('queues', queue_id, wf_config)


# def post_verification(self, id, version_id, type, relative_path, requests):
# """
# Annotate test JSON with information on whether it ran successfully on particular platforms plus metadata
# """
# id = _format_workflow_id(id)
# endpoint ='extended/{}/versions/{}/{}/tests/{}'.format(
# id, version_id, type, relative_path
# )
# return _post_to_endpoint(self, endpoint, requests)
2 changes: 1 addition & 1 deletion synorchestrator/wes/client.py
Expand Up @@ -24,7 +24,7 @@ def _init_http_client(service_id=None, opts=None):
"""
auth_header = {'token': 'Authorization',
'api_key': 'X-API-KEY',
'': ''}
None: ''}
if service_id:
opts = _get_wes_opts(service_id)

Expand Down
6 changes: 4 additions & 2 deletions tests/conftest.py
Expand Up @@ -23,7 +23,8 @@ def mock_queue_config():
'workflow_url': None,
'workflow_attachments': None,
'wes_default': 'local',
'wes_opts': ['local']
'wes_opts': ['local'],
'target_queue': None
},
'mock_queue_2': {
'trs_id': 'mock_trs',
Expand All @@ -33,7 +34,8 @@ def mock_queue_config():
'workflow_url': None,
'workflow_attachments': None,
'wes_default': 'local',
'wes_opts': ['local']
'wes_opts': ['local'],
'target_queue': None
}
}
yield mock_queue_config
Expand Down
3 changes: 2 additions & 1 deletion tests/test_config.py
Expand Up @@ -73,7 +73,8 @@ def test_add_queue(mock_orchestratorconfig, monkeypatch):
'workflow_url': None,
'workflow_attachments': None,
'wes_default': 'local',
'wes_opts': ['local']}
'wes_opts': ['local'],
'target_queue': None}

# THEN the evaluation queue config should be stored in the config file
with open(str(mock_orchestratorconfig), 'r') as f:
Expand Down
16 changes: 4 additions & 12 deletions tests/test_orchestrator.py
Expand Up @@ -20,10 +20,12 @@ def test_run_job(mock_queue_config, mock_submission, mock_wes, monkeypatch):
lambda: mock_queue_config)
monkeypatch.setattr('synorchestrator.orchestrator.fetch_queue_workflow',
lambda x: mock_queue_config[x])
monkeypatch.setattr('synorchestrator.orchestrator.get_submission_bundle',
lambda x,y: mock_submission['mock_sub'])
monkeypatch.setattr('synorchestrator.testbed.create_submission',
lambda **kwargs: None)
monkeypatch.setattr('synorchestrator.orchestrator.WES',
lambda wes_id: mock_wes)
monkeypatch.setattr('synorchestrator.orchestrator.update_submission',
lambda w,x,y,z: None)

mock_request = {'workflow_url': None,
'workflow_params': mock_submission['mock_sub']['data'],
Expand Down Expand Up @@ -144,15 +146,5 @@ def test_monitor_queue(mock_submission, mock_queue_log, mock_wes, monkeypatch):
assert test_queue_log == mock_queue_log


def test_monitor(mock_queue_config, mock_queue_log, monkeypatch):
monkeypatch.setattr('synorchestrator.orchestrator.queue_config',
lambda: mock_queue_config)
mock_queue_log['mock_sub']['status'] = 'COMPLETE'
monkeypatch.setattr('synorchestrator.orchestrator.monitor_queue',
lambda x: mock_queue_log)

test_statuses = monitor()
assert test_statuses == [mock_queue_log, mock_queue_log]



2 changes: 2 additions & 0 deletions tests/test_testbed.py
Expand Up @@ -44,6 +44,8 @@ def test_check_workflow(mock_queue_config,
lambda x,y: 'mock_wf_checker')
monkeypatch.setattr('synorchestrator.testbed.add_queue',
lambda **kwargs: None)
monkeypatch.setattr('synorchestrator.testbed.create_submission',
lambda **kwargs: None)
mock_trs.get_workflow_tests.return_value = [{'content': '', 'url': ''}]

mock_submission_log = {
Expand Down
3 changes: 2 additions & 1 deletion tests/test_trs2wes.py
Expand Up @@ -34,7 +34,8 @@ def test_fetch_queue_workflow(mock_orchestratorconfig,
'workflow_url': 'mock_wf_url',
'workflow_attachments': ['mock_file_url'],
'wes_default': 'local',
'wes_opts': ['local']}
'wes_opts': ['local'],
'target_queue': None}

with open(str(mock_orchestratorconfig), 'r') as f:
test_config = yaml.load(f)['queues']
Expand Down

0 comments on commit b82b278

Please sign in to comment.