Skip to content

Commit

Permalink
Merge pull request #1306 from lldelisle/lint_tool_shed_tools
Browse files Browse the repository at this point in the history
workflow_lint: Lint tools in public shed tools
  • Loading branch information
mvdbeek committed Oct 31, 2022
2 parents 96de718 + 0465f8a commit 6f16888
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 14 deletions.
3 changes: 2 additions & 1 deletion docs/best_practices_workflows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ Tools

The tools used within a workflow should be packaged with Galaxy by default or published
to the main Galaxy ToolShed. Using private tool sheds or the test tool shed limits the
ability of other Galaxy's to use the workflow.
ability of other Galaxy's to use the workflow. In addition, ``workflow_lint`` will check
that the tools starting with ``toolshed.g2.bx.psu.edu`` are available in the toolshed.

Syntax
~~~~~~
Expand Down
23 changes: 10 additions & 13 deletions planemo/autoupdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
error,
info,
)
from planemo.workflow_lint import (
find_repos_from_tool_id,
MAIN_TOOLSHED_URL,
)

if TYPE_CHECKING:
from planemo.cli import PlanemoCliContext
from planemo.galaxy.config import LocalGalaxyConfig
from planemo.runnable import Runnable

AUTOUPDATE_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


def find_macros(xml_tree: ElementTree) -> List[Any]:
"""
Expand Down Expand Up @@ -288,16 +290,11 @@ def outdated_tools(
ctx: "PlanemoCliContext", wf_dict: Dict[str, Any], ts: ToolShedInstance
) -> Dict[str, Dict[str, str]]:
def check_tool_step(step, ts): # return a dict with current and newest tool version, in case they don't match
if not step["tool_id"].startswith(AUTOUPDATE_TOOLSHED_URL[8:]):
return {} # assume a built in tool
try:
repos = ts.repositories._get(params={"tool_ids": step["tool_id"]})
except Exception:
ctx.log(f"The ToolShed returned an error when searching for the most recent version of {step['tool_id']}")
return {}
warning_msg, repos = find_repos_from_tool_id(step["tool_id"], ts)
if warning_msg != "":
ctx.log(warning_msg)
if len(repos) == 0:
ctx.log(f"The tool {step['tool_id']} is not in the toolshed (may have been tagged as invalid).")
return {}
return repos
base_id = "/".join(step["tool_id"].split("/")[:-1])
tool_ids_found = {
tool["guid"] for repo in repos.values() if type(repo) == dict for tool in repo.get("tools", [])
Expand Down Expand Up @@ -334,7 +331,7 @@ def get_tools_to_update(
with open(workflow.path) as f:
wf_dict = yaml.load(f, Loader=yaml.SafeLoader)

ts = toolshed.ToolShedInstance(url=AUTOUPDATE_TOOLSHED_URL)
ts = toolshed.ToolShedInstance(url=MAIN_TOOLSHED_URL)
tools_to_update = outdated_tools(ctx, wf_dict, ts)
return {tool: versions for tool, versions in tools_to_update.items() if tool not in tools_to_skip}

Expand Down Expand Up @@ -388,7 +385,7 @@ def fix_workflow_gxformat2(original_wf: Dict[str, Any], updated_wf: Dict[str, An
if (
updated_wf["steps"][str(step_index + len(original_wf["inputs"]))]
.get("tool_id", "")
.startswith(AUTOUPDATE_TOOLSHED_URL[8:])
.startswith(MAIN_TOOLSHED_URL[8:])
):
step["tool_version"] = updated_wf["steps"][str(step_index + len(original_wf["inputs"]))]["tool_version"]
step["tool_id"] = updated_wf["steps"][str(step_index + len(original_wf["inputs"]))]["tool_id"]
Expand Down
54 changes: 54 additions & 0 deletions planemo/workflow_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)

import yaml
from bioblend import toolshed
from bioblend.toolshed import ToolShedInstance
from galaxy.tool_util.lint import LintContext
from galaxy.tool_util.loader_directory import EXCLUDE_WALK_DIRS
from galaxy.tool_util.verify import asserts
Expand Down Expand Up @@ -46,6 +49,8 @@
POTENTIAL_WORKFLOW_FILES = re.compile(r"^.*(\.yml|\.yaml|\.ga)$")
DOCKSTORE_REGISTRY_CONF_VERSION = "1.2"

MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


class WorkflowLintContext(LintContext):
# Setup training topic for linting - probably should pass this through
Expand Down Expand Up @@ -106,6 +111,7 @@ def structure(path, lint_context):
lint_context.lint("lint_structure", structure, potential_workflow_artifact_path)
lint_context.lint("lint_best_practices", _lint_best_practices, potential_workflow_artifact_path)
lint_context.lint("lint_tests", _lint_tsts, potential_workflow_artifact_path)
lint_context.lint("lint_tool_ids", _lint_tool_ids, potential_workflow_artifact_path)
else:
# Allow linting ro crates and such also
pass
Expand Down Expand Up @@ -387,3 +393,51 @@ def find_potential_workflow_files(directory: str) -> List[str]:
else:
matches.append(directory)
return matches


def find_repos_from_tool_id(tool_id: str, ts: ToolShedInstance) -> Tuple[str, Dict[str, Any]]:
"""
Return a string which indicates what failed and dict with all revisions for a given tool id
"""
if not tool_id.startswith(MAIN_TOOLSHED_URL[8:]):
return ("", {}) # assume a built in tool
try:
repos = ts.repositories._get(params={"tool_ids": tool_id})
except Exception:
return (f"The ToolShed returned an error when searching for the most recent version of {tool_id}", {})
if len(repos) == 0:
return (f"The tool {tool_id} is not in the toolshed (may have been tagged as invalid).", {})
else:
return ("", repos)


def _lint_tool_ids(path: str, lint_context: WorkflowLintContext) -> None:
def _lint_tool_ids_steps(lint_context: WorkflowLintContext, wf_dict: Dict, ts: ToolShedInstance) -> bool:
"""Returns whether a single tool_id was invalid"""
failed = False
steps = wf_dict.get("steps", {})
for step in steps.values():
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
warning_msg, _ = find_repos_from_tool_id(step["tool_id"], ts)
if warning_msg != "":
lint_context.error(warning_msg)
failed = True
elif step.get("type") == "subworkflow": # GA SWF
sub_failed = _lint_tool_ids_steps(lint_context, step["subworkflow"], ts)
if sub_failed:
failed = True
elif step.get("run", {}).get("class") == "GalaxyWorkflow": # gxformat2 SWF
sub_failed = _lint_tool_ids_steps(lint_context, step["run"], ts)
if sub_failed:
failed = True
else:
continue
return failed

with open(path) as f:
workflow_dict = ordered_load(f)
ts = toolshed.ToolShedInstance(url=MAIN_TOOLSHED_URL)
failed = _lint_tool_ids_steps(lint_context, workflow_dict, ts)
if not failed:
lint_context.valid("All tools_id appear to be valid.")
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{
"a_galaxy_workflow": "true",
"annotation": "",
"format-version": "0.1",
"name": "Workflow with unexisting tool",
"steps": {
"0": {
"annotation": "",
"content_id": null,
"errors": null,
"id": 0,
"input_connections": {},
"inputs": [
{
"description": "",
"name": "input"
}
],
"label": "input",
"name": "Input dataset",
"outputs": [],
"position": {
"left": 0,
"top": 0
},
"tool_id": null,
"tool_state": "{\"optional\": false, \"tag\": \"\"}",
"tool_version": null,
"type": "data_input",
"uuid": "62da4422-2462-40d8-a4f6-35b0fd1156f4",
"workflow_outputs": [
{
"label": null,
"output_name": "output",
"uuid": "070bf2da-a153-4b1e-a38c-55fc164ca4f9"
}
]
},
"1": {
"annotation": "",
"content_id": "toolshed.g2.bx.psu.edu/repos/bgruening/unexisting/unexisting/0.1.0",
"errors": null,
"id": 1,
"input_connections": {
"infile": {
"id": 0,
"output_name": "output"
}
},
"inputs": [
{
"description": "runtime parameter for tool Select first",
"name": "infile"
}
],
"label": "fake tool",
"name": "Select first",
"outputs": [
{
"name": "outfile",
"type": "input"
}
],
"position": {
"left": 281,
"top": 47
},
"post_job_actions": {},
"tool_id": "toolshed.g2.bx.psu.edu/repos/bgruening/unexisting/unexisting/0.1.0",
"tool_shed_repository": {
"changeset_revision": "adf54b12c295",
"name": "unexisting",
"owner": "bgruening",
"tool_shed": "toolshed.g2.bx.psu.edu"
},
"tool_state": "{\"complement\": \"\", \"count\": \"10\", \"infile\": {\"__class__\": \"RuntimeValue\"}, \"__page__\": null, \"__rerun_remap_job_id__\": null}",
"tool_version": "0.1.0",
"type": "tool",
"uuid": "c15c0342-6bde-4c83-9399-3b8597de6a83",
"workflow_outputs": [
{
"label": null,
"output_name": "outfile",
"uuid": "61762374-8db1-46bc-8446-0eaca9ab8e92"
}
]
}
},
"tags": [],
"uuid": "e75d0ddc-b376-456f-b5e3-a8afbb7b85ed",
"version": 1
}
8 changes: 8 additions & 0 deletions tests/test_cmd_autoupdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,11 @@ def test_autoupdate_workflow_unexisting_version(self):
# We just want to be sure planemo autoupdate do not raise an error
# Currently it would write to the output that no update are available
# In future versions it could be great that it gives the last valid version.

def test_autoupdate_workflow_unexisting_tool(self):
"""Test autoupdate command for a workflow where the tool is not in the toolshed."""
with self._isolate_with_test_data("wf_repos/autoupdate_tests") as f:
wf_file = os.path.join(f, "workflow_with_unexisting_tool.ga")
autoupdate_command = ["autoupdate", wf_file]
result = self._runner.invoke(self._cli.planemo, autoupdate_command)
assert "No newer tool versions were found, so the workflow was not updated." in result.output
17 changes: 17 additions & 0 deletions tests/test_cmd_workflow_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ def test_assertion_linting(self):
in result.output
)

def test_tool_id_linting_wrong_version(self):
workflow_path = "/".join(
(TEST_DATA_DIR, "wf_repos", "autoupdate_tests", "workflow_with_unexisting_version_of_tool.ga")
)
lint_cmd = ["workflow_lint", workflow_path]
result = self._runner.invoke(self._cli.planemo, lint_cmd)
assert (
"ERROR: The tool toolshed.g2.bx.psu.edu/repos/bgruening/text_processing/tp_head_tool/0.1.0 is not in the toolshed"
in result.output
)

def test_tool_id_linting_wrong_tool(self):
workflow_path = "/".join((TEST_DATA_DIR, "wf_repos", "autoupdate_tests", "workflow_with_unexisting_tool.ga"))
lint_cmd = ["workflow_lint", workflow_path]
result = self._runner.invoke(self._cli.planemo, lint_cmd)
assert "ERROR: The ToolShed returned an error when searching" in result.output


def _wf_repo(rel_path):
return os.path.join(TEST_DATA_DIR, "wf_repos", rel_path)

0 comments on commit 6f16888

Please sign in to comment.