Permalink
Browse files

Rework Galaxy testing to use structured data instead of XUnit data.

This switches planemo in a subtle way from to depending more on the sturctured data json than the XUnit - this will be benefical for non-Galaxy tool testing when XUnit will be a product of the reporting stuff and not an input to it. This refactoring/reworking should allow most of planemo/galaxy/test/actions to be used with the newer planemo-driven testing stuff.

Adds tests and doc strings.
  • Loading branch information...
jmchilton committed May 12, 2016
1 parent e38c436 commit 4d29bf10882f4b4c6f1560badd201bb1de401c7f
@@ -70,13 +70,17 @@ quick-test: ## run quickest tests with the default Python
tox: ## run tests with tox in the specified ENV, defaults to py27
$(IN_VENV) tox -e $(ENV) -- $(ARGS)

coverage: ## check code coverage quickly with the default Python
_coverage-report: ## build coverage report with the default Python
coverage run --source $(SOURCE_DIR) setup.py $(TEST_DIR)
coverage report -m
coverage html

_open-coverage: ## open coverage report using open
open htmlcov/index.html || xdg-open htmlcov/index.html

ready-docs:
coverage: _coverage-report open-coverage ## check code coverage quickly with the default Python

ready-docs: ## rebuild docs folder ahead of running docs or lint-docs
rm -f docs/$(SOURCE_DIR).rst
rm -f docs/planemo_ext.rst
rm -f docs/modules.rst
@@ -1,6 +1,13 @@
"""Entry point and interface for ``planemo.galaxy.test`` package."""
from .actions import run_in_config
from .structures import StructuredData
from .actions import handle_reports
from .actions import handle_reports_and_summary


__all__ = ["run_in_config", "StructuredData", "handle_reports"]
__all__ = [
"handle_reports",
"handle_reports_and_summary",
"run_in_config",
"StructuredData",
]
@@ -5,6 +5,11 @@

from . import structures as test_structures
from planemo.io import error, info, warn, shell_join
from planemo.exit_codes import (
EXIT_CODE_OK,
EXIT_CODE_GENERIC_FAILURE,
EXIT_CODE_NO_SUCH_TARGET,
)
from planemo.galaxy.run import (
run_galaxy_command,
setup_venv,
@@ -91,31 +96,52 @@ def run_in_config(ctx, config, run=run_galaxy_command, **kwds):
shell('cp -r "%s"/* "%s"' % update_cp_args)

_check_test_outputs(xunit_report_file_tracker, structured_report_file_tracker)

test_results = test_structures.GalaxyTestResults(
structured_report_file,
xunit_report_file,
html_report_file,
return_code,
)

test_data = test_results.structured_data
handle_reports(ctx, test_data, kwds)
_handle_summary(
test_results,
**kwds
structured_data = test_results.structured_data
return handle_reports_and_summary(
ctx,
structured_data,
exit_code=test_results.exit_code,
kwds=kwds
)

return return_code

def handle_reports_and_summary(ctx, structured_data, exit_code=None, kwds={}):
"""Produce reports and print summary, return 0 if tests passed.
If ``exit_code`` is set - use underlying test source for return
code and test success determination, otherwise infer from supplied
test data.
"""
handle_reports(ctx, structured_data, kwds)
summary_exit_code = _handle_summary(
structured_data,
**kwds
)
return exit_code if exit_code is not None else summary_exit_code


def handle_reports(ctx, test_data, kwds):
def handle_reports(ctx, structured_data, kwds):
"""Write reports based on user specified kwds."""
exceptions = []
structured_report_file = kwds.get("test_output_json", None)
if structured_report_file and not os.path.exists(structured_report_file):
try:
with open(structured_report_file, "w") as f:
json.dump(structured_report_file, f)
except Exception as e:
exceptions.append(e)

for report_type in ["html", "markdown", "text"]:
try:
_handle_test_output_file(
ctx, report_type, test_data, kwds
ctx, report_type, structured_data, kwds
)
except Exception as e:
exceptions.append(e)
@@ -159,48 +185,48 @@ def _handle_test_output_file(ctx, report_type, test_data, kwds):


def _handle_summary(
test_results,
structured_data,
**kwds
):
summary_dict = _get_dict_value("summary", structured_data)
num_tests = _get_dict_value("num_tests", summary_dict)
num_failures = _get_dict_value("num_failures", summary_dict)
num_errors = _get_dict_value("num_errors", summary_dict)
num_problems = num_failures + num_errors

summary_exit_code = EXIT_CODE_OK
if num_problems > 0:
summary_exit_code = EXIT_CODE_GENERIC_FAILURE
elif num_tests == 0:
summary_exit_code = EXIT_CODE_NO_SUCH_TARGET

summary_style = kwds.get("summary")
if summary_style == "none":
return
if summary_style != "none":
if num_tests == 0:
warn(NO_TESTS_MESSAGE)
elif num_problems == 0:
info(ALL_TESTS_PASSED_MESSAGE % num_tests)
elif num_problems:
html_report_file = kwds.get("test_output")
message_args = (num_problems, num_tests, html_report_file)
message = PROBLEM_COUNT_MESSAGE % message_args
warn(message)

if test_results.has_details:
_summarize_tests_full(
test_results,
structured_data,
**kwds
)
else:
if test_results.exit_code:
warn(GENERIC_PROBLEMS_MESSAGE % test_results.output_html_path)
else:
info(GENERIC_TESTS_PASSED_MESSAGE)

return summary_exit_code


def _summarize_tests_full(
test_results,
structured_data,
**kwds
):
num_tests = test_results.num_tests
num_problems = test_results.num_problems

if num_tests == 0:
warn(NO_TESTS_MESSAGE)
return

if num_problems == 0:
info(ALL_TESTS_PASSED_MESSAGE % num_tests)

if num_problems:
html_report_file = test_results.output_html_path
message_args = (num_problems, num_tests, html_report_file)
message = PROBLEM_COUNT_MESSAGE % message_args
warn(message)

for testcase_el in test_results.xunit_testcase_elements:
structured_data_tests = test_results.structured_data_tests
_summarize_test_case(structured_data_tests, testcase_el, **kwds)
tests = _get_dict_value("tests", structured_data)
for test_case_data in tests:
_summarize_test_case(test_case_data, **kwds)


def passed(xunit_testcase_el):
@@ -211,10 +237,16 @@ def passed(xunit_testcase_el):
return did_pass


def _summarize_test_case(structured_data, testcase_el, **kwds):
def _summarize_test_case(structured_data, **kwds):
summary_style = kwds.get("summary")
test_id = test_structures.case_id(testcase_el)
if not passed(testcase_el):
test_id = test_structures.case_id(
raw_id=_get_dict_value("id", structured_data)
)
status = _get_dict_value(
"status",
_get_dict_value("data", structured_data)
)
if status != "success":
state = click.style("failed", bold=True, fg='red')
else:
state = click.style("passed", bold=True, fg='green')
@@ -223,14 +255,7 @@ def _summarize_test_case(structured_data, testcase_el, **kwds):
_print_command_line(structured_data, test_id)


def _print_command_line(structured_data, test_id):
try:
test = [d for d in structured_data if d["id"] == test_id.id][0]["data"]
except (KeyError, IndexError):
# Failed to find structured data for this test - likely targetting
# and older Galaxy version.
return

def _print_command_line(test, test_id):
execution_problem = test.get("execution_problem", None)
if execution_problem:
click.echo("| command: *could not execute job, no command generated* ")
@@ -245,6 +270,13 @@ def _print_command_line(structured_data, test_id):
click.echo("| command: %s" % command)


def _get_dict_value(key, data):
try:
return data[key]
except (KeyError, TypeError):
raise KeyError("No key [%s] in [%s]" % (key, data))


def _check_test_outputs(
xunit_report_file_tracker,
structured_report_file_tracker
@@ -321,4 +353,5 @@ def changed(self):
__all__ = [
"run_in_config",
"handle_reports",
"handle_reports_and_summary",
]
@@ -172,11 +172,9 @@ def __init__(
self.structured_data_tests = sd.structured_data_tests
self.structured_data_by_id = sd.structured_data_by_id

if output_xml_path:
self.xunit_tree = parse_xunit_report(output_xml_path)
sd.merge_xunit(self._xunit_root)
else:
self.xunit_tree = ET.fromstring("<testsuite />")
self.xunit_tree = parse_xunit_report(output_xml_path)
sd.merge_xunit(self._xunit_root)

self.sd.set_exit_code(exit_code)
self.sd.read_summary()
self.sd.update()
@@ -223,13 +221,15 @@ def find_cases(xunit_root):
return xunit_root.findall("testcase")


def case_id(testcase_el):
name_raw = testcase_el.attrib["name"]
if "TestForTool_" in name_raw:
raw_id = name_raw
else:
class_name = testcase_el.attrib["classname"]
raw_id = "{0}.{1}".format(class_name, name_raw)
def case_id(testcase_el=None, raw_id=None):
if raw_id is None:
assert testcase_el is not None
name_raw = testcase_el.attrib["name"]
if "TestForTool_" in name_raw:
raw_id = name_raw
else:
class_name = testcase_el.attrib["classname"]
raw_id = "{0}.{1}".format(class_name, name_raw)

name = None
num = None
@@ -72,6 +72,7 @@ def _echo(message, err=False):


def shell_join(*args):
"""Join potentially empty commands together with ;."""
return "; ".join([c for c in args if c])


@@ -46,6 +46,7 @@ def get_var(var_name):
'planemo.reports',
'planemo.shed',
'planemo.shed2tap',
'planemo.test',
'planemo.xml',
]
ENTRY_POINTS = '''
Oops, something went wrong.

0 comments on commit 4d29bf1

Please sign in to comment.