Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement running tests against existing invocation #1401

Merged
merged 6 commits into from Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/commands.rst
Expand Up @@ -67,5 +67,6 @@ documentation describes these commands.
.. include:: commands/workflow_edit.rst
.. include:: commands/workflow_job_init.rst
.. include:: commands/workflow_lint.rst
.. include:: commands/workflow_test_check.rst
.. include:: commands/workflow_test_init.rst
.. include:: commands/workflow_upload.rst
8 changes: 8 additions & 0 deletions docs/planemo.commands.rst
Expand Up @@ -500,6 +500,14 @@ planemo.commands.cmd\_workflow\_lint module
:undoc-members:
:show-inheritance:

planemo.commands.cmd\_workflow\_test\_check module
--------------------------------------------------

.. automodule:: planemo.commands.cmd_workflow_test_check
:members:
:undoc-members:
:show-inheritance:

planemo.commands.cmd\_workflow\_test\_init module
-------------------------------------------------

Expand Down
42 changes: 42 additions & 0 deletions planemo/commands/cmd_workflow_test_check.py
@@ -0,0 +1,42 @@
"""Module describing the planemo ``workflow_test_check`` command."""
import click

from planemo import options
from planemo.cli import command_function
from planemo.engine.factory import engine_context
from planemo.galaxy.activity import invocation_to_run_response
from planemo.galaxy.test.actions import handle_reports_and_summary
from planemo.runnable import definition_to_test_case
from planemo.runnable_resolve import for_runnable_identifier
from planemo.test.results import StructuredData


@click.command("workflow_test_check")
mvdbeek marked this conversation as resolved.
Show resolved Hide resolved
@options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST DEFINITION")
@options.required_workflow_arg()
@options.galaxy_url_option(required=True)
@options.galaxy_user_key_option(required=True)
@options.test_index_option()
@options.test_options()
@command_function
def cli(ctx, path, workflow_identifier, test_index, **kwds):
"""Run defined tests against existing workflow invocation."""
with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config:
user_gi = config.user_gi
invocation = user_gi.invocations.show_invocation(workflow_identifier)
runnable = for_runnable_identifier(ctx, invocation["workflow_id"], kwds)
test_cases = definition_to_test_case(path, runnable)
assert (
len(test_cases) >= test_index
), f"Selected test case {test_index}, but only found {len(test_cases)} test case(s)."
test_case = test_cases[test_index - 1]
run_response = invocation_to_run_response(ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation)
structured_data = test_case.structured_test_data(run_response)
test_data = {
"version": "0.1",
"tests": [structured_data],
}
structured_results = StructuredData(data=test_data)
structured_results.calculate_summary_data()
return_value = handle_reports_and_summary(ctx, structured_results.structured_data, kwds=kwds)
ctx.exit(return_value)
93 changes: 59 additions & 34 deletions planemo/galaxy/activity.py
Expand Up @@ -152,6 +152,7 @@ def _execute( # noqa C901
) -> "GalaxyBaseRunResponse":
user_gi = config.user_gi
admin_gi = config.gi
run_response = None

start_datetime = datetime.now()
try:
Expand Down Expand Up @@ -205,46 +206,31 @@ def _execute( # noqa C901
allow_tool_state_corrections=True,
inputs_by="name",
)
invocation_id = invocation["id"]

ctx.vlog("Waiting for invocation [%s]" % invocation_id)
polling_backoff = kwds.get("polling_backoff", 0)
no_wait = kwds.get("no_wait", False)

final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
run_response = invocation_to_run_response(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
config.user_gi,
runnable,
invocation,
polling_backoff=kwds.get("polling_backoff", 0),
no_wait=kwds.get("no_wait", False),
start_datetime=start_datetime,
log=log_contents_str(config),
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

response_kwds = {
"workflow_id": workflow_id,
"invocation_id": invocation_id,
"history_state": job_state if not no_wait else None,
"invocation_state": final_invocation_state,
"error_message": error_message,
}

else:
raise NotImplementedError()

run_response = response_class(
ctx=ctx,
runnable=runnable,
user_gi=user_gi,
history_id=history_id,
log=log_contents_str(config),
start_datetime=start_datetime,
end_datetime=datetime.now(),
**response_kwds,
)
if not run_response:
run_response = response_class(
ctx=ctx,
runnable=runnable,
user_gi=user_gi,
history_id=history_id,
log=log_contents_str(config),
start_datetime=start_datetime,
end_datetime=datetime.now(),
**response_kwds,
)
if kwds.get("download_outputs"):
output_directory = kwds.get("output_directory", None)
ctx.vlog("collecting outputs from run...")
Expand All @@ -253,6 +239,45 @@ def _execute( # noqa C901
return run_response


def invocation_to_run_response(
ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None
):
start_datetime = start_datetime or datetime.now()
invocation_id = invocation["id"]
history_id = invocation["history_id"]
workflow_id = invocation["workflow_id"]

ctx.vlog("Waiting for invocation [%s]" % invocation_id)

final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

return GalaxyWorkflowRunResponse(
ctx,
runnable=runnable,
user_gi=user_gi,
history_id=history_id,
workflow_id=workflow_id,
invocation_id=invocation_id,
history_state=job_state if not no_wait else None,
invocation_state=final_invocation_state,
error_message=error_message,
log=log,
start_datetime=start_datetime,
end_datetime=datetime.now(),
)


def stage_in(
ctx: "PlanemoCliContext", runnable: Runnable, config: "BaseGalaxyConfig", job_path: str, **kwds
) -> Tuple[Dict[str, Any], str]:
Expand Down
15 changes: 10 additions & 5 deletions planemo/options.py
Expand Up @@ -467,12 +467,13 @@ def docker_extra_volume_option():
)


def galaxy_url_option():
def galaxy_url_option(required: bool = False):
return planemo_option(
"--galaxy_url",
use_global_config=True,
extra_global_config_vars=["galaxy_url"],
use_env_var=True,
required=required,
type=str,
help="Remote Galaxy URL to use with external Galaxy engine.",
)
Expand All @@ -489,12 +490,13 @@ def galaxy_admin_key_option():
)


def galaxy_user_key_option():
def galaxy_user_key_option(required: bool = False):
return planemo_option(
"--galaxy_user_key",
use_global_config=True,
extra_global_config_vars=["admin_key"],
use_env_var=True,
required=required,
type=str,
help="User key to use with external Galaxy engine.",
)
Expand Down Expand Up @@ -824,7 +826,7 @@ def convert(self, value, param, ctx):
return super().convert(value, param, ctx)


def optional_tools_arg(multiple=False, allow_uris=False):
def optional_tools_arg(multiple=False, allow_uris=False, metavar="TOOL_PATH"):
"""Decorate click method as optionally taking in the path to a tool
or directory of tools. If no such argument is given the current working
directory will be treated as a directory of tools.
Expand All @@ -844,7 +846,7 @@ def optional_tools_arg(multiple=False, allow_uris=False):
nargs = -1 if multiple else 1
return click.argument(
name,
metavar="TOOL_PATH",
metavar=metavar,
type=arg_type,
nargs=nargs,
callback=_optional_tools_default,
Expand Down Expand Up @@ -1599,6 +1601,10 @@ def profile_database_options():
)


def test_index_option():
return planemo_option("--test_index", default=1, type=int, help="Select which test to run. Counting starts at 1")
mvdbeek marked this conversation as resolved.
Show resolved Hide resolved


def test_options():
return _compose(
planemo_option(
Expand All @@ -1607,7 +1613,6 @@ def test_options():
help="Update test-data directory with job outputs (normally"
" written to directory --job_output_files if specified.)",
),
paste_test_data_paths_option(),
nsoranzo marked this conversation as resolved.
Show resolved Hide resolved
test_report_options(),
planemo_option(
"--test_output_json",
Expand Down
13 changes: 8 additions & 5 deletions planemo/runnable.py
Expand Up @@ -265,6 +265,12 @@ def cases(runnable: Runnable) -> List["AbstractTestCase"]:
cases.append(ExternalGalaxyToolTestCase(runnable, tool_id, tool_version, i, test_dict))
return cases

return definition_to_test_case(tests_path=tests_path, runnable=runnable)


def definition_to_test_case(tests_path, runnable):
Copy link
Member

@nsoranzo nsoranzo Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add type annotation here please?

with open(tests_path) as f:
tests_def = yaml.safe_load(f)
tests_directory = os.path.abspath(os.path.dirname(tests_path))

def normalize_to_tests_path(path: str) -> str:
Expand All @@ -274,13 +280,11 @@ def normalize_to_tests_path(path: str) -> str:
absolute_path = path
return os.path.normpath(absolute_path)

with open(tests_path) as f:
tests_def = yaml.safe_load(f)

if not isinstance(tests_def, list):
message = TEST_FILE_NOT_LIST_MESSAGE % tests_path
raise Exception(message)

cases = []
for i, test_def in enumerate(tests_def):
if "job" not in test_def:
message = TEST_FIELD_MISSING_MESSAGE % (i + 1, tests_path, "job")
Expand All @@ -305,7 +309,6 @@ def normalize_to_tests_path(path: str) -> str:
doc=doc,
)
cases.append(case)

return cases


Expand Down Expand Up @@ -434,7 +437,7 @@ def _test_id(self) -> str:
]:
return get_tool_source(self.runnable.path).parse_id()
else:
return os.path.basename(self.runnable.path)
return os.path.basename(self.runnable.uri)


class ExternalGalaxyToolTestCase(AbstractTestCase):
Expand Down
15 changes: 15 additions & 0 deletions tests/test_cmds_with_workflow_id.py
Expand Up @@ -78,3 +78,18 @@ def test_serve_workflow(self):
with open(output_json_path) as f:
output = json.load(f)
assert "tests" in output
test_index = 1
mvdbeek marked this conversation as resolved.
Show resolved Hide resolved
invocation_id = output["tests"][test_index]["data"]["invocation_details"]["details"]["invocation_id"]
test_path = os.path.join(TEST_DATA_DIR, "wf11-remote.gxwf-test.yml")
workflow_test_check_command = [
"workflow_test_check",
"--galaxy_url",
f"http://localhost:{self._port}",
"--galaxy_user_key",
api.DEFAULT_ADMIN_API_KEY,
"--test_index",
str(test_index),
test_path,
invocation_id,
]
self._check_exit_code(workflow_test_check_command, exit_code=0)