From 50019e503adf811a54b54011d64ef92b8a610b13 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 18 Sep 2018 11:04:37 -0400 Subject: [PATCH 1/2] Refactor method for running format 2 workflows into populators.py for reuse. --- test/api/test_workflows.py | 61 ++------------------------------- test/base/populators.py | 70 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 70 insertions(+), 61 deletions(-) diff --git a/test/api/test_workflows.py b/test/api/test_workflows.py index fab6a838fce6..b9ac6ef438c4 100644 --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -2,11 +2,9 @@ import json import time -from collections import namedtuple from json import dumps from uuid import uuid4 -import yaml from requests import delete, put from base import api # noqa: I100,I202 @@ -14,7 +12,6 @@ from base.populators import ( # noqa: I100 DatasetCollectionPopulator, DatasetPopulator, - load_data_dict, skip_without_tool, wait_on, WorkflowPopulator @@ -31,7 +28,6 @@ WORKFLOW_WITH_RULES_1, ) from galaxy.exceptions import error_codes # noqa: I201 -from galaxy.tools.verify.test_data import TestDataResolver SIMPLE_NESTED_WORKFLOW_YAML = """ @@ -208,59 +204,11 @@ def _invocation_details(self, workflow_id, invocation_id, **kwds): invocation_details = invocation_details_response.json() return invocation_details - def _run_jobs(self, has_workflow, history_id=None, wait=True, source_type=None, jobs_descriptions=None, expected_response=200, assert_ok=True): - def read_test_data(test_dict): - test_data_resolver = TestDataResolver() - filename = test_data_resolver.get_filename(test_dict["value"]) - content = open(filename, "r").read() - return content - + def _run_jobs(self, has_workflow, history_id=None, **kwds): if history_id is None: history_id = self.history_id - workflow_id = self._upload_yaml_workflow( - has_workflow, source_type=source_type - ) - if jobs_descriptions is None: - assert source_type != "path" - jobs_descriptions = yaml.safe_load(has_workflow) - - test_data = jobs_descriptions.get("test_data", {}) - parameters = test_data.pop('step_parameters', {}) - replacement_parameters = test_data.pop("replacement_parameters", {}) - inputs, label_map, has_uploads = load_data_dict(history_id, test_data, self.dataset_populator, self.dataset_collection_populator) - workflow_request = dict( - history="hist_id=%s" % history_id, - workflow_id=workflow_id, - ) - workflow_request["inputs"] = dumps(label_map) - workflow_request["inputs_by"] = 'name' - if parameters: - workflow_request["parameters"] = dumps(parameters) - workflow_request["parameters_normalized"] = True - if replacement_parameters: - workflow_request["replacement_params"] = dumps(replacement_parameters) - if has_uploads: - self.dataset_populator.wait_for_history(history_id, assert_ok=True) - url = "workflows/%s/usage" % (workflow_id) - invocation_response = self._post(url, data=workflow_request) - self._assert_status_code_is(invocation_response, expected_response) - invocation = invocation_response.json() - invocation_id = invocation.get('id') - if invocation_id: - # Wait for workflow to become fully scheduled and then for all jobs - # complete. - if wait: - self.workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id, assert_ok=assert_ok) - jobs = self._history_jobs(history_id) - return RunJobsSummary( - history_id=history_id, - workflow_id=workflow_id, - invocation_id=invocation_id, - inputs=inputs, - jobs=jobs, - invocation=invocation, - workflow_request=workflow_request - ) + + return self.workflow_populator.run_workflow(has_workflow, history_id=history_id, **kwds) def _history_jobs(self, history_id): return self._get("jobs", {"history_id": history_id, "order_by": "create_time"}).json() @@ -3386,6 +3334,3 @@ def _all_user_invocation_ids(self): self._assert_status_code_is(all_invocations_for_user, 200) invocation_ids = [i["id"] for i in all_invocations_for_user.json()] return invocation_ids - - -RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'invocation_id', 'inputs', 'jobs', 'invocation', 'workflow_request']) diff --git a/test/base/populators.py b/test/base/populators.py index ed0bb33a864c..ab02e4222c24 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -5,6 +5,7 @@ import string import time import unittest +from collections import namedtuple from functools import wraps from operator import itemgetter @@ -14,6 +15,7 @@ def nottest(x): return x import requests +import yaml from pkg_resources import resource_string from six import StringIO @@ -239,7 +241,7 @@ def cancel_history_jobs(self, history_id, wait=True): self.cancel_job(active_job["id"]) def history_jobs(self, history_id): - query_params = {"history_id": history_id} + query_params = {"history_id": history_id, "order_by": "create_time"} jobs_response = self._get("jobs", query_params) assert jobs_response.status_code == 200 return jobs_response.json() @@ -558,13 +560,17 @@ def wait_for_workflow(self, workflow_id, invocation_id, history_id, assert_ok=Tr self.wait_for_invocation(workflow_id, invocation_id, timeout=timeout) self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=assert_ok, timeout=timeout) + def invoke_workflow_raw(self, workflow_id, request): + url = "workflows/%s/usage" % (workflow_id) + invocation_response = self._post(url, data=request) + return invocation_response + def invoke_workflow(self, history_id, workflow_id, inputs={}, request={}, assert_ok=True): request["history"] = "hist_id=%s" % history_id, if inputs: request["inputs"] = json.dumps(inputs) request["inputs_by"] = 'step_index' - url = "workflows/%s/usage" % (workflow_id) - invocation_response = self._post(url, data=request) + invocation_response = self.invoke_workflow_raw(request) if assert_ok: api_asserts.assert_status_code_is(invocation_response, 200) invocation_id = invocation_response.json()["id"] @@ -572,12 +578,69 @@ def invoke_workflow(self, history_id, workflow_id, inputs={}, request={}, assert else: return invocation_response + def run_workflow(self, has_workflow, test_data=None, history_id=None, wait=True, source_type=None, jobs_descriptions=None, expected_response=200, assert_ok=True): + """High-level wrapper around workflow API, etc. to invoke format 2 workflows.""" + workflow_populator = self + + def read_test_data(test_dict): + test_data_resolver = TestDataResolver() + filename = test_data_resolver.get_filename(test_dict["value"]) + content = open(filename, "r").read() + return content + + workflow_id = workflow_populator.upload_yaml_workflow(has_workflow, source_type=source_type) + if jobs_descriptions is None: + assert source_type != "path" + jobs_descriptions = yaml.safe_load(has_workflow) + + if test_data is None: + test_data = jobs_descriptions.get("test_data", {}) + parameters = test_data.pop('step_parameters', {}) + replacement_parameters = test_data.pop("replacement_parameters", {}) + inputs, label_map, has_uploads = load_data_dict(history_id, test_data, self.dataset_populator, self.dataset_collection_populator) + workflow_request = dict( + history="hist_id=%s" % history_id, + workflow_id=workflow_id, + ) + workflow_request["inputs"] = json.dumps(label_map) + workflow_request["inputs_by"] = 'name' + if parameters: + workflow_request["parameters"] = json.dumps(parameters) + workflow_request["parameters_normalized"] = True + if replacement_parameters: + workflow_request["replacement_params"] = json.dumps(replacement_parameters) + if has_uploads: + self.dataset_populator.wait_for_history(history_id, assert_ok=True) + invocation_response = workflow_populator.invoke_workflow_raw(workflow_id, workflow_request) + api_asserts.assert_status_code_is(invocation_response, expected_response) + invocation = invocation_response.json() + invocation_id = invocation.get('id') + if invocation_id: + # Wait for workflow to become fully scheduled and then for all jobs + # complete. + if wait: + workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id, assert_ok=assert_ok) + jobs = self.dataset_populator.history_jobs(history_id) + return RunJobsSummary( + history_id=history_id, + workflow_id=workflow_id, + invocation_id=invocation_id, + inputs=inputs, + jobs=jobs, + invocation=invocation, + workflow_request=workflow_request + ) + + +RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'invocation_id', 'inputs', 'jobs', 'invocation', 'workflow_request']) + class WorkflowPopulator(BaseWorkflowPopulator, ImporterGalaxyInterface): def __init__(self, galaxy_interactor): self.galaxy_interactor = galaxy_interactor self.dataset_populator = DatasetPopulator(galaxy_interactor) + self.dataset_collection_populator = DatasetCollectionPopulator(galaxy_interactor) def _post(self, route, data={}): return self.galaxy_interactor.post(route, data) @@ -1124,6 +1187,7 @@ def __init__(self, gi): """Construct a dataset collection populator from a bioblend GalaxyInstance.""" self._gi = gi self.dataset_populator = GiDatasetPopulator(gi) + self.dataset_collection_populator = GiDatasetCollectionPopulator(gi) def _create_collection(self, payload): create_response = self._post("dataset_collections", data=payload) From 3e774713a902fb59b4164e4deac2688b9678f0ce Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 18 Sep 2018 11:05:06 -0400 Subject: [PATCH 2/2] Fill in needed state defaults when importing manually curated workflows. --- lib/galaxy/managers/workflows.py | 5 ++- lib/galaxy/web/base/controller.py | 3 +- lib/galaxy/webapps/galaxy/api/workflows.py | 29 +++++++++++++----- lib/galaxy/workflow/modules.py | 14 ++++++++- test/api/test_workflows_from_yaml.py | 34 +++++++++++++++++++++ test/base/populators.py | 2 +- test/base/workflows_format_2/main.py | 5 ++- test/functional/tools/disambiguate_cond.xml | 7 ++++- 8 files changed, 86 insertions(+), 13 deletions(-) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index 86984d1b9ed6..cb4081469166 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -244,6 +244,7 @@ def build_workflow_from_dict( publish=False, create_stored_workflow=True, exact_tools=True, + fill_defaults=False, ): # Put parameters in workflow mode trans.workflow_building_mode = workflow_building_modes.ENABLED @@ -257,6 +258,7 @@ def build_workflow_from_dict( data, name=name, exact_tools=exact_tools, + fill_defaults=fill_defaults, ) if 'uuid' in data: workflow.uuid = data['uuid'] @@ -298,7 +300,7 @@ def build_workflow_from_dict( missing_tools=missing_tool_tups ) - def update_workflow_from_dict(self, trans, stored_workflow, workflow_data): + def update_workflow_from_dict(self, trans, stored_workflow, workflow_data, **kwds): # Put parameters in workflow mode trans.workflow_building_mode = workflow_building_modes.ENABLED @@ -306,6 +308,7 @@ def update_workflow_from_dict(self, trans, stored_workflow, workflow_data): trans, workflow_data, name=stored_workflow.name, + **kwds ) if missing_tool_tups: diff --git a/lib/galaxy/web/base/controller.py b/lib/galaxy/web/base/controller.py index 14bf58dda185..8b60c1ab0701 100644 --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -1237,7 +1237,7 @@ def _import_shared_workflow(self, trans, stored): session.flush() return imported_stored - def _workflow_from_dict(self, trans, data, source=None, add_to_menu=False, publish=False, exact_tools=True): + def _workflow_from_dict(self, trans, data, source=None, add_to_menu=False, publish=False, exact_tools=True, fill_defaults=False): """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ @@ -1250,6 +1250,7 @@ def _workflow_from_dict(self, trans, data, source=None, add_to_menu=False, publi add_to_menu=add_to_menu, publish=publish, exact_tools=exact_tools, + fill_defaults=fill_defaults, ) return created_workflow.stored_workflow, created_workflow.missing_tools diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index 2043480838fe..8f965777d993 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -515,10 +515,12 @@ def update(self, trans, id, payload, **kwds): if 'steps' in workflow_dict: try: + from_dict_kwds = self.__import_or_update_kwds(payload) workflow, errors = self.workflow_contents_manager.update_workflow_from_dict( trans, stored_workflow, workflow_dict, + **from_dict_kwds ) except workflows.MissingToolsException: raise exceptions.MessageException("This workflow contains missing tools. It cannot be saved until they have been removed from the workflow or installed.") @@ -578,18 +580,17 @@ def __api_import_new_workflow(self, trans, payload, **kwd): import_tools = util.string_as_bool(payload.get("import_tools", False)) if import_tools and not trans.user_is_admin(): raise exceptions.AdminRequiredException() + + from_dict_kwds = self.__import_or_update_kwds(payload) + publish = util.string_as_bool(payload.get("publish", False)) # If 'publish' set, default to importable. importable = util.string_as_bool(payload.get("importable", publish)) - # Galaxy will try to upgrade tool versions that don't match exactly during import, - # this prevents that. - exact_tools = util.string_as_bool(payload.get("exact_tools", True)) + if publish and not importable: raise exceptions.RequestParameterInvalidException("Published workflow must be importable.") - from_dict_kwds = dict( - publish=publish, - exact_tools=exact_tools, - ) + + from_dict_kwds["publish"] = publish workflow, missing_tool_tups = self._workflow_from_dict(trans, data, **from_dict_kwds) if importable: self._make_item_accessible(trans.sa_session, workflow) @@ -629,6 +630,20 @@ def __api_import_new_workflow(self, trans, payload, **kwd): payload) return item + def __import_or_update_kwds(self, payload): + # Galaxy will try to upgrade tool versions that don't match exactly during import, + # this prevents that. + exact_tools = util.string_as_bool(payload.get("exact_tools", True)) + + # Fill in missing tool state for hand built so the workflow can run, default of this + # should become True at some point in the future I imagine. + fill_defaults = util.string_as_bool(payload.get("fill_defaults", False)) + + return { + 'exact_tools': exact_tools, + 'fill_defaults': fill_defaults, + } + @expose_api def import_shared_workflow_deprecated(self, trans, payload, **kwd): """ diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index d1170086c4ca..a02dad02ba10 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -77,7 +77,7 @@ def __init__(self, trans, content_id=None, **kwds): @classmethod def from_dict(Class, trans, d, **kwds): module = Class(trans, **kwds) - module.recover_state(d.get("tool_state")) + module.recover_state(d.get("tool_state"), **kwds) module.label = d.get("label") return module @@ -848,6 +848,18 @@ def get_post_job_actions(self, incoming): # ---- Run time --------------------------------------------------------- + def recover_state(self, state, **kwds): + """ Recover state `dict` from simple dictionary describing configuration + state (potentially from persisted step state). + + Sub-classes should supply a `default_state` method which contains the + initial state `dict` with key, value pairs for all available attributes. + """ + super(ToolModule, self).recover_state(state, **kwds) + if kwds.get("fill_defaults", False) and self.tool: + self.compute_runtime_state(self.trans, step_updates=None) + self.tool.check_and_update_param_values(self.state.inputs, self.trans, workflow_building_mode=True) + def get_runtime_state(self): state = DefaultToolState() state.inputs = self.state.inputs diff --git a/test/api/test_workflows_from_yaml.py b/test/api/test_workflows_from_yaml.py index 71b66c40c3b0..9140064cdfdb 100644 --- a/test/api/test_workflows_from_yaml.py +++ b/test/api/test_workflows_from_yaml.py @@ -3,6 +3,7 @@ import json import os +from base.populators import uses_test_history from base.workflow_fixtures import ( WORKFLOW_RUNTIME_PARAMETER_SIMPLE, ) @@ -300,6 +301,39 @@ def test_implicit_connections(self): workflow = self._get("workflows/%s/download" % workflow_id).json() print(workflow) + @uses_test_history() + def test_conditional_ints(self, history_id): + self._run_jobs(""" +class: GalaxyWorkflow +steps: + - label: test_input + tool_id: disambiguate_cond + state: + p3: + use: true + files: + attach_files: false +""", test_data={}, history_id=history_id) + content = self.dataset_populator.get_history_dataset_content(history_id) + assert "no file specified" in content + assert "7 7 4" in content + + self._run_jobs(""" +class: GalaxyWorkflow +steps: + - label: test_input + tool_id: disambiguate_cond + state: + p3: + use: true + p3v: 5 + files: + attach_files: false +""", test_data={}, history_id=history_id) + content = self.dataset_populator.get_history_dataset_content(history_id) + assert "no file specified" in content + assert "7 7 5" in content + def _steps_by_label(self, workflow_as_dict): by_label = {} assert "steps" in workflow_as_dict, workflow_as_dict diff --git a/test/base/populators.py b/test/base/populators.py index ab02e4222c24..49e9aaa2fc95 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -570,7 +570,7 @@ def invoke_workflow(self, history_id, workflow_id, inputs={}, request={}, assert if inputs: request["inputs"] = json.dumps(inputs) request["inputs_by"] = 'step_index' - invocation_response = self.invoke_workflow_raw(request) + invocation_response = self.invoke_workflow_raw(workflow_id, request) if assert_ok: api_asserts.assert_status_code_is(invocation_response, 200) invocation_id = invocation_response.json()["id"] diff --git a/test/base/workflows_format_2/main.py b/test/base/workflows_format_2/main.py index 79411dfcac9d..e0af7e40470e 100644 --- a/test/base/workflows_format_2/main.py +++ b/test/base/workflows_format_2/main.py @@ -35,7 +35,10 @@ def convert_and_import_workflow(has_workflow, **kwds): workflow["name"] = name publish = kwds.get("publish", False) exact_tools = kwds.get("exact_tools", False) - import_kwds = {} + fill_defaults = kwds.get("fill_defaults", True) + import_kwds = { + "fill_defaults": fill_defaults + } if publish: import_kwds["publish"] = True if exact_tools: diff --git a/test/functional/tools/disambiguate_cond.xml b/test/functional/tools/disambiguate_cond.xml index 2465721b6d4c..a85e91ce7c04 100644 --- a/test/functional/tools/disambiguate_cond.xml +++ b/test/functional/tools/disambiguate_cond.xml @@ -1,6 +1,11 @@ - echo "$p1.p1v $p2.p2v $p3.p3v" > $out_file1; cat "$files.p4.file" >> $out_file1; + echo "$p1.p1v $p2.p2v $p3.p3v" > $out_file1; + #if $files.attach_files + cat "$files.p4.file" >> $out_file1; + #else + echo "no file specified" >> $out_file1; + #end if