diff --git a/synorchestrator/config.py b/synorchestrator/config.py index 3b8d7c3..ee6f89e 100644 --- a/synorchestrator/config.py +++ b/synorchestrator/config.py @@ -69,7 +69,8 @@ def add_queue(queue_id, wf_url=None, wf_attachments=None, wes_default='local', - wes_opts=None): + wes_opts=None, + target_queue=None): """ Register a workflow evaluation queue to the orchestrator's scope of work. @@ -86,7 +87,8 @@ def add_queue(queue_id, 'workflow_url': wf_url, 'workflow_attachments': wf_attachments, 'wes_default': wes_default, - 'wes_opts': wes_opts} + 'wes_opts': wes_opts, + 'target_queue': target_queue} set_yaml('queues', queue_id, config) diff --git a/synorchestrator/orchestrator.py b/synorchestrator/orchestrator.py index 61b95d7..1220f8d 100644 --- a/synorchestrator/orchestrator.py +++ b/synorchestrator/orchestrator.py @@ -19,9 +19,11 @@ from synorchestrator.config import queue_config from synorchestrator.util import get_json, ctime2datetime, convert_timedelta from synorchestrator.wes.wrapper import WES -from trs2wes import fetch_queue_workflow +from synorchestrator.trs2wes import fetch_queue_workflow +from synorchestrator.trs2wes import store_verification from synorchestrator.queue import get_submission_bundle from synorchestrator.queue import get_submissions +from synorchestrator.queue import create_submission from synorchestrator.queue import update_submission from synorchestrator.queue import submission_queue @@ -41,6 +43,9 @@ def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None): wf_attachments += add_attachments wf_attachments = list(set(wf_attachments)) + submission_id = create_submission(queue_id=queue_id, + submission_data=wf_jsonyaml, + wes_id=wes_id) wes_instance = WES(wes_id) request = {'workflow_url': wf_config['workflow_url'], 'workflow_params': wf_jsonyaml, @@ -49,6 +54,9 @@ def run_job(queue_id, wes_id, wf_jsonyaml, add_attachments=None): run_log['start_time'] = dt.datetime.now().ctime() run_status = wes_instance.get_run_status(run_log['run_id'])['state'] run_log['status'] = run_status + + update_submission(queue_id, submission_id, 'run_log', run_log) + update_submission(queue_id, submission_id, 'status', 'SUBMITTED') return run_log @@ -129,7 +137,13 @@ def monitor_queue(queue_id): update_submission(queue_id, sub_id, 'run_log', run_log) if run_log['status'] == 'COMPLETE': - update_submission(queue_id, sub_id, 'status', 'VALIDATED') + wf_config = queue_config()[queue_id] + sub_status = run_log['status'] + if 'target_queue' in wf_config: + store_verification(wf_config['target_queue'], + submission['wes_id']) + sub_status = 'VALIDATED' + update_submission(queue_id, sub_id, 'status', sub_status) run_log['wes_id'] = submission['wes_id'] queue_log[sub_id] = run_log @@ -142,26 +156,33 @@ def monitor(): Monitor progress of workflow jobs. """ import pandas as pd - pd.set_option('display.width', 100) + pd.set_option('display.width', 1000) + pd.set_option('display.max_columns', 10) + pd.set_option('display.expand_frame_repr', False) + + try: + while True: + statuses = [] + + clear_output(wait=True) + os.system('clear') + + for queue_id in queue_config(): + statuses.append(monitor_queue(queue_id)) + if all([status == {} for status in statuses]): + print("No jobs running...") + else: + status_tracker = pd.DataFrame.from_dict( + {i: status[i] + for status in statuses + for i in status}, + orient='index') + + display(status_tracker) + sys.stdout.flush() + + time.sleep(1) + except KeyboardInterrupt: + print("\nDone") + return - statuses = [] - - for queue_id in queue_config(): - statuses.append(monitor_queue(queue_id)) - status_tracker = pd.DataFrame.from_dict( - {i: status[i] - for status in statuses - for i in status}, - orient='index') - - clear_output(wait=True) - os.system('clear') - display(status_tracker) - sys.stdout.flush() - if any(status_tracker['status'] - .isin(['QUEUED', 'INITIALIZING', 'RUNNING'])): - time.sleep(1) - monitor() - else: - print("Done") - return statuses diff --git a/synorchestrator/testbed.py b/synorchestrator/testbed.py index 7f75080..fcaca9a 100644 --- a/synorchestrator/testbed.py +++ b/synorchestrator/testbed.py @@ -11,6 +11,7 @@ from synorchestrator.config import queue_config from synorchestrator.config import trs_config from synorchestrator.config import wes_config +from synorchestrator.config import set_yaml from synorchestrator.trs.wrapper import TRS from synorchestrator.wes.wrapper import WES from synorchestrator.queue import create_submission @@ -80,7 +81,8 @@ def check_workflow(queue_id, wes_id): wf_id=checker_id, version_id=wf_config['version_id'], wes_default=wf_config['wes_default'], - wes_opts=wf_config['wes_opts']) + wes_opts=wf_config['wes_opts'], + target_queue=queue_id) checker_job = trs_instance.get_workflow_tests(id=checker_id, version_id=wf_config['version_id'], @@ -103,12 +105,3 @@ def check_all(workflow_wes_map): return submission_logs -# def post_verification(self, id, version_id, type, relative_path, requests): -# """ -# Annotate test JSON with information on whether it ran successfully on particular platforms plus metadata -# """ -# id = _format_workflow_id(id) -# endpoint ='extended/{}/versions/{}/{}/tests/{}'.format( -# id, version_id, type, relative_path -# ) -# return _post_to_endpoint(self, endpoint, requests) \ No newline at end of file diff --git a/synorchestrator/trs/client.py b/synorchestrator/trs/client.py index cfa0fc8..f93e704 100644 --- a/synorchestrator/trs/client.py +++ b/synorchestrator/trs/client.py @@ -24,7 +24,7 @@ def _init_http_client(service_id=None, opts=None): """ auth_header = {'token': 'Authorization', 'api_key': 'X-API-KEY', - '': ''} + None: ''} if service_id: opts = _get_trs_opts(service_id) diff --git a/synorchestrator/trs2wes.py b/synorchestrator/trs2wes.py index 6f384ea..0d60b68 100644 --- a/synorchestrator/trs2wes.py +++ b/synorchestrator/trs2wes.py @@ -33,4 +33,22 @@ def fetch_queue_workflow(queue_id): set_yaml('queues', queue_id, wf_config) return wf_config - \ No newline at end of file + +def store_verification(queue_id, wes_id): + """ + Record checker status for selected workflow and environment. + """ + wf_config = queue_config()[queue_id] + wf_config.setdefault('wes_verified', []).append(wes_id) + set_yaml('queues', queue_id, wf_config) + + +# def post_verification(self, id, version_id, type, relative_path, requests): +# """ +# Annotate test JSON with information on whether it ran successfully on particular platforms plus metadata +# """ +# id = _format_workflow_id(id) +# endpoint ='extended/{}/versions/{}/{}/tests/{}'.format( +# id, version_id, type, relative_path +# ) +# return _post_to_endpoint(self, endpoint, requests) \ No newline at end of file diff --git a/synorchestrator/wes/client.py b/synorchestrator/wes/client.py index be79a3e..319d23d 100644 --- a/synorchestrator/wes/client.py +++ b/synorchestrator/wes/client.py @@ -24,7 +24,7 @@ def _init_http_client(service_id=None, opts=None): """ auth_header = {'token': 'Authorization', 'api_key': 'X-API-KEY', - '': ''} + None: ''} if service_id: opts = _get_wes_opts(service_id) diff --git a/tests/conftest.py b/tests/conftest.py index 345b268..7cf211f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,7 +23,8 @@ def mock_queue_config(): 'workflow_url': None, 'workflow_attachments': None, 'wes_default': 'local', - 'wes_opts': ['local'] + 'wes_opts': ['local'], + 'target_queue': None }, 'mock_queue_2': { 'trs_id': 'mock_trs', @@ -33,7 +34,8 @@ def mock_queue_config(): 'workflow_url': None, 'workflow_attachments': None, 'wes_default': 'local', - 'wes_opts': ['local'] + 'wes_opts': ['local'], + 'target_queue': None } } yield mock_queue_config diff --git a/tests/test_config.py b/tests/test_config.py index 52b5610..9c461b6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -73,7 +73,8 @@ def test_add_queue(mock_orchestratorconfig, monkeypatch): 'workflow_url': None, 'workflow_attachments': None, 'wes_default': 'local', - 'wes_opts': ['local']} + 'wes_opts': ['local'], + 'target_queue': None} # THEN the evaluation queue config should be stored in the config file with open(str(mock_orchestratorconfig), 'r') as f: diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index d75bc95..e3831d8 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -20,10 +20,12 @@ def test_run_job(mock_queue_config, mock_submission, mock_wes, monkeypatch): lambda: mock_queue_config) monkeypatch.setattr('synorchestrator.orchestrator.fetch_queue_workflow', lambda x: mock_queue_config[x]) - monkeypatch.setattr('synorchestrator.orchestrator.get_submission_bundle', - lambda x,y: mock_submission['mock_sub']) + monkeypatch.setattr('synorchestrator.testbed.create_submission', + lambda **kwargs: None) monkeypatch.setattr('synorchestrator.orchestrator.WES', lambda wes_id: mock_wes) + monkeypatch.setattr('synorchestrator.orchestrator.update_submission', + lambda w,x,y,z: None) mock_request = {'workflow_url': None, 'workflow_params': mock_submission['mock_sub']['data'], @@ -144,15 +146,5 @@ def test_monitor_queue(mock_submission, mock_queue_log, mock_wes, monkeypatch): assert test_queue_log == mock_queue_log -def test_monitor(mock_queue_config, mock_queue_log, monkeypatch): - monkeypatch.setattr('synorchestrator.orchestrator.queue_config', - lambda: mock_queue_config) - mock_queue_log['mock_sub']['status'] = 'COMPLETE' - monkeypatch.setattr('synorchestrator.orchestrator.monitor_queue', - lambda x: mock_queue_log) - - test_statuses = monitor() - assert test_statuses == [mock_queue_log, mock_queue_log] - diff --git a/tests/test_testbed.py b/tests/test_testbed.py index c07eb02..7900348 100644 --- a/tests/test_testbed.py +++ b/tests/test_testbed.py @@ -44,6 +44,8 @@ def test_check_workflow(mock_queue_config, lambda x,y: 'mock_wf_checker') monkeypatch.setattr('synorchestrator.testbed.add_queue', lambda **kwargs: None) + monkeypatch.setattr('synorchestrator.testbed.create_submission', + lambda **kwargs: None) mock_trs.get_workflow_tests.return_value = [{'content': '', 'url': ''}] mock_submission_log = { diff --git a/tests/test_trs2wes.py b/tests/test_trs2wes.py index 37d4766..1bea41e 100644 --- a/tests/test_trs2wes.py +++ b/tests/test_trs2wes.py @@ -34,7 +34,8 @@ def test_fetch_queue_workflow(mock_orchestratorconfig, 'workflow_url': 'mock_wf_url', 'workflow_attachments': ['mock_file_url'], 'wes_default': 'local', - 'wes_opts': ['local']} + 'wes_opts': ['local'], + 'target_queue': None} with open(str(mock_orchestratorconfig), 'r') as f: test_config = yaml.load(f)['queues']