diff --git a/lib/galaxy/dependencies/pipfiles/default/Pipfile b/lib/galaxy/dependencies/pipfiles/default/Pipfile index 25084d86dbc3..615d75c0edfe 100644 --- a/lib/galaxy/dependencies/pipfiles/default/Pipfile +++ b/lib/galaxy/dependencies/pipfiles/default/Pipfile @@ -73,6 +73,7 @@ paramiko = "*" python-genomespaceclient = "<2.0" social_auth_core = {version = "==1.5.0", extras = ['openidconnect']} cloudauthz = "<=0.2.0" +gxformat2 = "*" [requires] python_version = "2.7" diff --git a/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt b/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt index 8bc340b0c4aa..45c32b80c5e0 100644 --- a/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt @@ -55,6 +55,7 @@ functools32==3.2.3.post2; python_version == '2.7' future==0.16.0 futures==3.2.0; python_version == '2.6' or python_version == '2.7' galaxy-sequence-utils==1.1.3 +gxformat2==0.3.0 h5py==2.8.0 html5lib==1.0.1 idna==2.7 diff --git a/test/base/populators.py b/test/base/populators.py index 49e9aaa2fc95..d66c473234dc 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -16,16 +16,16 @@ def nottest(x): return x import requests import yaml +from gxformat2 import ( + convert_and_import_workflow, + ImporterGalaxyInterface, +) from pkg_resources import resource_string from six import StringIO from galaxy.tools.verify.test_data import TestDataResolver from galaxy.util import unicodify from . import api_asserts -from .workflows_format_2 import ( - convert_and_import_workflow, - ImporterGalaxyInterface, -) # Simple workflow that takes an input and call cat wrapper on it. diff --git a/test/base/workflows_format_2/README.txt b/test/base/workflows_format_2/README.txt deleted file mode 100644 index 146f6195a8ab..000000000000 --- a/test/base/workflows_format_2/README.txt +++ /dev/null @@ -1,12 +0,0 @@ -Format 2 Workflows ---------------------------------- - -This module defines a high-level Galaxy workflow description deemed "Format 2". At this point, these workflows are defined entirely client side and -transcoded into traditional (or Format 1?) Galaxy workflows. - -The traditional Galaxy workflow description is not meant to be concise and is neither readily human readable or human writable. Format 2 addresses all three -of these limitations. - -Format 2 workflow is a highly experimental format and will change rapidly in -potentially backward incompatible ways until the code is merged into the -Galaxy server and enabled by default. diff --git a/test/base/workflows_format_2/__init__.py b/test/base/workflows_format_2/__init__.py deleted file mode 100644 index c5e73c59a2dd..000000000000 --- a/test/base/workflows_format_2/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -""" This module defines the public interface or entry point for the -Format 2 workflow code. -""" -from .interface import ImporterGalaxyInterface -from .main import convert_and_import_workflow - - -__all__ = ( - 'convert_and_import_workflow', - 'ImporterGalaxyInterface', -) diff --git a/test/base/workflows_format_2/converter.py b/test/base/workflows_format_2/converter.py deleted file mode 100644 index 6072bf2a6108..000000000000 --- a/test/base/workflows_format_2/converter.py +++ /dev/null @@ -1,547 +0,0 @@ -"""Functionality for converting a Format 2 workflow into a standard Galaxy workflow.""" -from __future__ import print_function - -import json -import os -import sys -import uuid -from collections import OrderedDict - -import yaml - - -STEP_TYPES = [ - "subworkflow", - "data_input", - "data_collection_input", - "tool", - "pause", - "parameter_input", -] - -STEP_TYPE_ALIASES = { - 'input': 'data_input', - 'input_collection': 'data_collection_input', - 'parameter': 'parameter_input', -} - -RUN_ACTIONS_TO_STEPS = { - 'GalaxyWorkflow': 'run_workflow_to_step', -} - - -def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory): - """Convert a Format 2 workflow into standard Galaxy format from supplied stream.""" - as_python = yaml.safe_load(has_yaml) - return python_to_workflow(as_python, galaxy_interface, workflow_directory) - - -def python_to_workflow(as_python, galaxy_interface, workflow_directory): - """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary.""" - if workflow_directory is None: - workflow_directory = os.path.abspath(".") - - conversion_context = ConversionContext( - galaxy_interface, - workflow_directory, - ) - return _python_to_workflow(as_python, conversion_context) - - -def _python_to_workflow(as_python, conversion_context): - - if not isinstance(as_python, dict): - raise Exception("This is not a not a valid Galaxy workflow definition.") - - if "class" not in as_python: - raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.") - - if as_python["class"] != "GalaxyWorkflow": - raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.") - - _ensure_defaults(as_python, { - "a_galaxy_workflow": "true", - "format-version": "0.1", - "annotation": "", - "name": "Workflow", - "uuid": str(uuid.uuid4()), - }) - - steps = as_python["steps"] - - # If an inputs section is defined, build steps for each - # and add to steps array. - if "inputs" in as_python: - inputs = as_python["inputs"] - convert_inputs_to_steps(inputs, steps) - - if isinstance(steps, list): - steps_as_dict = OrderedDict() - for i, step in enumerate(steps): - steps_as_dict[str(i)] = step - if "id" not in step: - step["id"] = i - - if "label" in step: - label = step["label"] - conversion_context.labels[label] = i - - if "position" not in step: - # TODO: this really should be optional in Galaxy API. - step["position"] = { - "left": 10 * i, - "top": 10 * i - } - - as_python["steps"] = steps_as_dict - steps = steps_as_dict - - for step in steps.values(): - step_type = step.get("type", None) - if "run" in step: - if step_type is not None: - raise Exception("Steps specified as run actions cannot specify a type.") - run_action = step.get("run") - if "@import" in run_action: - if len(run_action) > 1: - raise Exception("@import must be only key if present.") - - run_action_path = run_action["@import"] - runnable_path = os.path.join(conversion_context.workflow_directory, run_action_path) - with open(runnable_path, "r") as f: - runnable_description = yaml.safe_load(f) - run_action = runnable_description - - run_class = run_action["class"] - run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class]) - - run_to_step_function(conversion_context, step, run_action) - del step["run"] - - for step in steps.values(): - step_type = step.get("type", "tool") - step_type = STEP_TYPE_ALIASES.get(step_type, step_type) - if step_type not in STEP_TYPES: - raise Exception("Unknown step type encountered %s" % step_type) - step["type"] = step_type - eval("transform_%s" % step_type)(conversion_context, step) - - for output in as_python.get("outputs", []): - assert isinstance(output, dict), "Output definition must be dictionary" - assert "source" in output, "Output definition must specify source" - - if "label" in output and "id" in output: - raise Exception("label and id are aliases for outputs, may only define one") - if "label" not in output and "id" not in output: - label = "" - - raw_label = output.pop("label", None) - raw_id = output.pop("id", None) - label = raw_label or raw_id - - source = output["source"] - id, output_name = conversion_context.step_output(source) - step = steps[str(id)] - if "workflow_output" not in step: - step["workflow_outputs"] = [] - - step["workflow_outputs"].append({ - "output_name": output_name, - "label": label, - "uuid": output.get("uuid", None) - }) - - return as_python - - -def convert_inputs_to_steps(inputs, steps): - new_steps = [] - for input_def_raw in inputs: - input_def = input_def_raw.copy() - - if "label" in input_def and "id" in input_def: - raise Exception("label and id are aliases for inputs, may only define one") - if "label" not in input_def and "id" not in input_def: - raise Exception("Input must define a label.") - - raw_label = input_def.pop("label", None) - raw_id = input_def.pop("id", None) - label = raw_label or raw_id - - if not label: - raise Exception("Input label must not be empty.") - - input_type = input_def.pop("type", "data") - if input_type in ["File", "data", "data_input"]: - step_type = "data_input" - elif input_type in ["collection", "data_collection", "data_collection_input"]: - step_type = "data_collection_input" - elif input_type in ["text", "integer", "float", "color", "boolean"]: - step_type = "parameter_input" - input_def["parameter_type"] = input_type - else: - raise Exception("Input type must be a data file or collection.") - - step_def = input_def - step_def.update({ - "type": step_type, - "label": label, - }) - new_steps.append(step_def) - - for i, new_step in enumerate(new_steps): - steps.insert(i, new_step) - - -def run_workflow_to_step(conversion_context, step, run_action): - subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step) - - step["type"] = "subworkflow" - step["subworkflow"] = _python_to_workflow( - run_action, - subworkflow_conversion_context, - ) - - -def transform_data_input(context, step): - transform_input(context, step, default_name="Input dataset") - - -def transform_data_collection_input(context, step): - transform_input(context, step, default_name="Input dataset collection") - - -def transform_parameter_input(context, step): - transform_input(context, step, default_name="input_parameter") - - -def transform_input(context, step, default_name): - default_name = step.get("label", default_name) - _ensure_defaults(step, { - "annotation": "", - }) - - _ensure_inputs_connections(step) - - if "inputs" not in step: - step["inputs"] = [{}] - - step_inputs = step["inputs"][0] - if "name" in step_inputs: - name = step_inputs["name"] - else: - name = default_name - - _ensure_defaults(step_inputs, { - "name": name, - "description": "", - }) - tool_state = { - "name": name - } - for attrib in ["collection_type", "parameter_type", "optional"]: - if attrib in step: - tool_state[attrib] = step[attrib] - - _populate_tool_state(step, tool_state) - - -def transform_pause(context, step, default_name="Pause for dataset review"): - default_name = step.get("label", default_name) - _ensure_defaults(step, { - "annotation": "", - }) - - _ensure_inputs_connections(step) - - if "inputs" not in step: - step["inputs"] = [{}] - - step_inputs = step["inputs"][0] - if "name" in step_inputs: - name = step_inputs["name"] - else: - name = default_name - - _ensure_defaults(step_inputs, { - "name": name, - }) - tool_state = { - "name": name - } - - connect = _init_connect_dict(step) - _populate_input_connections(context, step, connect) - _populate_tool_state(step, tool_state) - - -def transform_subworkflow(context, step): - _ensure_defaults(step, { - "annotation": "", - }) - - _ensure_inputs_connections(step) - - tool_state = { - } - - connect = _init_connect_dict(step) - _populate_input_connections(context, step, connect) - _populate_tool_state(step, tool_state) - - -def _runtime_value(): - return {"__class__": "RuntimeValue"} - - -def transform_tool(context, step): - if "tool_id" not in step: - raise Exception("Tool steps must define a tool_id.") - - _ensure_defaults(step, { - "annotation": "", - "name": step['tool_id'], - "post_job_actions": {}, - "tool_version": None, - }) - post_job_actions = step["post_job_actions"] - - tool_state = { - # TODO: Galaxy should not require tool state actually specify a __page__. - "__page__": 0, - } - - connect = _init_connect_dict(step) - - def append_link(key, value): - if key not in connect: - connect[key] = [] - connect[key].append(value["$link"]) - - def replace_links(value, key=""): - if _is_link(value): - append_link(key, value) - # Filled in by the connection, so to force late - # validation of the field just mark as RuntimeValue. - # It would be better I guess if this were some other - # value dedicated to this purpose (e.g. a ficitious - # {"__class__": "ConnectedValue"}) that could be further - # validated by Galaxy. - return _runtime_value() - if isinstance(value, dict): - new_values = {} - for k, v in value.items(): - new_key = _join_prefix(key, k) - new_values[k] = replace_links(v, new_key) - return new_values - elif isinstance(value, list): - new_values = [] - for i, v in enumerate(value): - # If we are a repeat we need to modify the key - # but not if values are actually $links. - if _is_link(v): - append_link(key, v) - new_values.append(None) - else: - new_key = "%s_%d" % (key, i) - new_values.append(replace_links(v, new_key)) - return new_values - else: - return value - - runtime_inputs = step.get("runtime_inputs", []) - - if "state" in step or runtime_inputs: - step_state = step.pop("state", {}) - step_state = replace_links(step_state) - - for key, value in step_state.items(): - tool_state[key] = json.dumps(value) - for runtime_input in runtime_inputs: - tool_state[runtime_input] = json.dumps(_runtime_value()) - - # Fill in input connections - _populate_input_connections(context, step, connect) - - _populate_tool_state(step, tool_state) - - # Handle outputs. - if "outputs" in step: - for name, output in step.get("outputs", {}).items(): - if output.get("hide", False): - action_name = "HideDatasetAction%s" % name - action = _action( - "HideDatasetAction", - name, - ) - post_job_actions[action_name] = action - - if output.get("rename", None): - new_name = output.get("rename") - action_name = "RenameDatasetAction%s" % name - arguments = dict(newname=new_name) - action = _action( - "RenameDatasetAction", - name, - arguments, - ) - post_job_actions[action_name] = action - - if output.get("delete_intermediate_datasets", None): - action_name = "DeleteIntermediatesAction%s" % name - arguments = dict() - action = _action( - "DeleteIntermediatesAction", - name, - arguments, - ) - post_job_actions[action_name] = action - - add_tags = output.get("add_tags", []) - if add_tags: - action_name = "TagDatasetAction%s" % name - arguments = dict(tags=",".join(add_tags)) - action = _action( - "TagDatasetAction", - name, - arguments - ) - post_job_actions[action_name] = action - - remove_tags = output.get("remove_tags", []) - if remove_tags: - action_name = "RemoveTagDatasetAction%s" % name - arguments = dict(tags=",".join(remove_tags)) - action = _action( - "RemoveTagDatasetAction", - name, - arguments - ) - post_job_actions[action_name] = action - - del step["outputs"] - - -def run_tool_to_step(conversion_context, step, run_action): - tool_description = conversion_context.galaxy_interface.import_tool( - run_action - ) - step["type"] = "tool" - step["tool_id"] = tool_description["tool_id"] - step["tool_version"] = tool_description["tool_version"] - step["tool_hash"] = tool_description["tool_hash"] - - -class ConversionContext(object): - - def __init__(self, galaxy_interface, workflow_directory): - self.labels = {} - self.subworkflow_conversion_contexts = {} - self.galaxy_interface = galaxy_interface - self.workflow_directory = workflow_directory - - def step_id(self, label_or_id): - if label_or_id in self.labels: - id = self.labels[label_or_id] - else: - id = label_or_id - return int(id) - - def step_output(self, value): - value_parts = str(value).split("#") - if len(value_parts) == 1: - value_parts.append("output") - id = self.step_id(value_parts[0]) - return id, value_parts[1] - - def get_subworkflow_conversion_context(self, step): - step_id = step["id"] - if step_id not in self.subworkflow_conversion_contexts: - subworkflow_conversion_context = ConversionContext( - self.galaxy_interface, - self.workflow_directory, - ) - self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context - return self.subworkflow_conversion_contexts[step_id] - - -def _action(type, name, arguments={}): - return { - "action_arguments": arguments, - "action_type": type, - "output_name": name, - } - - -def _is_link(value): - return isinstance(value, dict) and "$link" in value - - -def _join_prefix(prefix, key): - if prefix: - new_key = "%s|%s" % (prefix, key) - else: - new_key = key - return new_key - - -def _init_connect_dict(step): - if "connect" not in step: - step["connect"] = {} - - connect = step["connect"] - del step["connect"] - return connect - - -def _populate_input_connections(context, step, connect): - _ensure_inputs_connections(step) - input_connections = step["input_connections"] - is_subworkflow_step = step.get("type") == "subworkflow" - - for key, values in connect.items(): - input_connection_value = [] - if not isinstance(values, list): - values = [values] - for value in values: - if not isinstance(value, dict): - if key == "$step": - value += "#__NO_INPUT_OUTPUT_NAME__" - id, output_name = context.step_output(value) - value = {"id": id, "output_name": output_name} - if is_subworkflow_step: - subworkflow_conversion_context = context.get_subworkflow_conversion_context(step) - input_subworkflow_step_id = subworkflow_conversion_context.step_id(key) - value["input_subworkflow_step_id"] = input_subworkflow_step_id - input_connection_value.append(value) - if key == "$step": - key = "__NO_INPUT_OUTPUT_NAME__" - input_connections[key] = input_connection_value - - -def _ensure_inputs_connections(step): - if "input_connections" not in step: - step["input_connections"] = {} - - -def _ensure_defaults(in_dict, defaults): - for key, value in defaults.items(): - if key not in in_dict: - in_dict[key] = value - - -def _populate_tool_state(step, tool_state): - step["tool_state"] = json.dumps(tool_state) - - -def main(argv): - print(json.dumps(yaml_to_workflow(argv[0]))) - - -if __name__ == "__main__": - main(sys.argv) - -__all__ = ( - 'yaml_to_workflow', - 'python_to_workflow', -) diff --git a/test/base/workflows_format_2/interface.py b/test/base/workflows_format_2/interface.py deleted file mode 100644 index c734867383bc..000000000000 --- a/test/base/workflows_format_2/interface.py +++ /dev/null @@ -1,75 +0,0 @@ -"""This module contains an interface and implementation describing Galaxy interactions used by gxformat2. - -The interface is :class:`ImporterGalaxyInterface` and the default -implementation based on `BioBlend `__ -is :class:`BioBlendImporterGalaxyInterface`. -""" -import abc - -import bioblend -import six - - -@six.add_metaclass(abc.ABCMeta) -class ImporterGalaxyInterface(object): - """An abstract interface describing Galaxy operations used by gxformat2. - - Specifically containing definitions of operations required to load - workflows into Galaxy. - """ - - @abc.abstractmethod - def import_workflow(self, workflow, **kwds): - """Import a workflow via POST /api/workflows or comparable interface into Galaxy.""" - - -class BioBlendImporterGalaxyInterface(object): - """Implementation of :class:`ImporterGalaxyInterface` using bioblend.""" - - def __init__(self, **kwds): - """Build a :class:`bioblend.GalaxyInstance` from supplied arguments.""" - url = None - - admin_key = None - admin_gi = None - if "admin_gi" in kwds: - admin_gi = kwds["admin_gi"] - elif "gi" in kwds: - admin_gi = kwds["gi"] - elif "url" in kwds and "admin_key" in kwds: - url = kwds["url"] - admin_key = kwds["admin_key"] - - if admin_gi is None: - assert url is not None - assert admin_key is not None - admin_gi = bioblend.GalaxyInstance(url=url, key=admin_key) - - user_key = None - user_gi = None - if "user_gi" in kwds: - user_gi = kwds["user_gi"] - elif "gi" in kwds: - user_gi = kwds["gi"] - elif "url" in kwds and "user_key" in kwds: - url = kwds["url"] - user_key = kwds["user_key"] - - if user_gi is None: - assert url is not None - assert user_key is not None - user_gi = bioblend.GalaxyInstance(url=url, key=user_key) - - self._admin_gi = admin_gi - self._user_gi = user_gi - - def import_workflow(self, workflow, **kwds): - """Import Galaxy workflow using instance :class:`bioblend.GalaxyInstance` object.""" - return self._user_gi.workflows.import_workflow_json( - workflow, - **kwds - ) - - def import_tool(self, tool_representation): - """Import Galaxy tool using instance :class:`bioblend.GalaxyInstance` object.""" - pass diff --git a/test/base/workflows_format_2/main.py b/test/base/workflows_format_2/main.py deleted file mode 100644 index e0af7e40470e..000000000000 --- a/test/base/workflows_format_2/main.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Module containing :func:`convert_and_import_workflow`.""" -import os - -import yaml - -from .converter import python_to_workflow, yaml_to_workflow -from .interface import BioBlendImporterGalaxyInterface - - -def convert_and_import_workflow(has_workflow, **kwds): - """Function is main entry for conversion and import of Format 2 workflows.""" - galaxy_interface = kwds.get("galaxy_interface", None) - if galaxy_interface is None: - galaxy_interface = BioBlendImporterGalaxyInterface(**kwds) - - source_type = kwds.get("source_type", None) - workflow_directory = kwds.get("workflow_directory", None) - if source_type == "path": - workflow_path = has_workflow - if workflow_directory is None: - workflow_directory = os.path.dirname(has_workflow) - with open(workflow_path, "r") as f: - has_workflow = yaml.safe_load(f) - - if workflow_directory is not None: - workflow_directory = os.path.abspath(workflow_directory) - - if isinstance(has_workflow, dict): - workflow = python_to_workflow(has_workflow, galaxy_interface, workflow_directory) - else: - workflow = yaml_to_workflow(has_workflow, galaxy_interface, workflow_directory) - - name = kwds.get("name", None) - if name is not None: - workflow["name"] = name - publish = kwds.get("publish", False) - exact_tools = kwds.get("exact_tools", False) - fill_defaults = kwds.get("fill_defaults", True) - import_kwds = { - "fill_defaults": fill_defaults - } - if publish: - import_kwds["publish"] = True - if exact_tools: - import_kwds["exact_tools"] = True - return galaxy_interface.import_workflow(workflow, **import_kwds) - - -__all__ = ( - 'convert_and_import_workflow', -)