diff --git a/fluster/fluster.py b/fluster/fluster.py index 216c825e..877cdcf6 100644 --- a/fluster/fluster.py +++ b/fluster/fluster.py @@ -16,7 +16,6 @@ # License along with this library. If not, see . import csv -import io import json import os import os.path @@ -358,7 +357,30 @@ def _show_summary_if_needed(self, ctx: Context, results: Dict[str, List[Tuple[De self._generate_md_summary(ctx, results) @staticmethod - def _generate_junit_summary(ctx: Context, results: Dict[str, List[Tuple[Decoder, TestSuite]]]) -> None: + def _calculate_profile_stats(test_vectors: Dict[str, TestVector]) -> Dict[str, Dict[str, int]]: + """Calculate profile statistics from test vectors""" + profile_stats: Dict[str, Dict[str, int]] = {} + for test_vector in test_vectors.values(): + if test_vector.profile is not None: + profile_name = test_vector.profile.name + if profile_name not in profile_stats: + profile_stats[profile_name] = {"passed": 0, "total": 0} + profile_stats[profile_name]["total"] += 1 + if test_vector.test_result == TestVectorResult.SUCCESS: + profile_stats[profile_name]["passed"] += 1 + return profile_stats + + @staticmethod + def _calculate_timeout_adjustment(ctx: Context, test_suite: TestSuite) -> float: + """Calculate timeout adjustment for test suite timing""" + if ctx.jobs == 1: + return sum( + (tv.test_time for tv in test_suite.test_vectors.values() if tv.test_result == TestVectorResult.TIMEOUT), + 0.0, + ) + return 0.0 + + def _generate_junit_summary(self, ctx: Context, results: Dict[str, List[Tuple[Decoder, TestSuite]]]) -> None: try: import junitparser as junitp # type: ignore except ImportError: @@ -391,8 +413,6 @@ def _parse_suite_results( test_suite_name, test_suite_results = test_suite_tuple for suite_decoder_res in test_suite_results: - timeouts = 0 - jsuite = junitp.TestSuite(test_suite_name) jsuite.add_property("decoder", suite_decoder_res[0].name) jsuite.add_property("os", f"{system_info.os_name} {system_info.os_version}") @@ -416,10 +436,9 @@ def _parse_suite_results( jsuite.add_testcase(jcase) - if vector.test_result is TestVectorResult.TIMEOUT and ctx.jobs == 1: - timeouts += ctx.timeout + timeout_time = self._calculate_timeout_adjustment(ctx, suite_decoder_res[1]) - jsuite.time = round(suite_decoder_res[1].time_taken - timeouts, 3) + jsuite.time = round(suite_decoder_res[1].time_taken - timeout_time, 3) jsuites.append(jsuite) @@ -449,212 +468,268 @@ def _generate_csv_summary(self, ctx: Context, results: Dict[str, List[Tuple[Deco ] if system_info.backend_info: - for backend, info in system_info.backend_info.items(): - rows.append([f"Backend-{backend}", info, "", ""]) - rows.append(["", "", "", ""]) + rows.extend([[f"Backend-{backend}", info, "", ""] for backend, info in system_info.backend_info.items()]) - rows.append(["TEST RESULTS", "", "", ""]) + rows.extend([["", "", "", ""], ["TEST RESULTS", "", "", ""]]) for test_suite_name, test_suite_results in results.items(): - rows.append(["", "", "", ""]) - rows.append([f"Test Suite: {test_suite_name}", "", "", ""]) + rows.extend([["", "", "", ""], [f"Test Suite: {test_suite_name}", "", "", ""]]) for decoder, test_suite in test_suite_results: - rows.append(["", "", "", ""]) - rows.append([f"Decoder: {decoder.name}", "", "", ""]) + tv_total = len(test_suite.test_vectors) + tv_passed = test_suite.test_vectors_success + tv_not_run = test_suite.test_vectors_not_run + tv_not_supported = test_suite.test_vectors_not_supported + tv_failed = tv_total - tv_passed - tv_not_run - tv_not_supported + timeout_time = self._calculate_timeout_adjustment(ctx, test_suite) + + # Start building the decoder summary block + rows.extend( + [ + ["", "", "", ""], + [f"Decoder: {decoder.name}", "", "", ""], + ["Total Tests", str(tv_total), "", ""], + ["Passed", str(tv_passed), "", ""], + ] + ) - timeouts = self._calculate_timeout_adjustment(ctx, test_suite) - rows.append(["Total Vectors", str(len(test_suite.test_vectors)), "", ""]) - rows.append(["Success Vectors", str(test_suite.test_vectors_success), "", ""]) - rows.append(["Total Time (s)", f"{test_suite.time_taken - timeouts:.3f}", "", ""]) + # Conditional rows: Only add if they contain data + if tv_not_run > 0: + rows.append(["Not Run", str(tv_not_run), "", ""]) + if tv_not_supported > 0: + rows.append(["Not Supported", str(tv_not_supported), "", ""]) + + # Remaining summary items + rows.extend( + [ + ["Failed\\Error", str(tv_failed), "", ""], + ["Total Time (s)", f"{test_suite.time_taken - timeout_time:.3f}", "", ""], + ] + ) profile_stats = self._calculate_profile_stats(test_suite.test_vectors) if profile_stats: - rows.append(["", "", "", ""]) - rows.append(["Profile", "Success", "Total", ""]) - for profile_name, stats in sorted(profile_stats.items()): - rows.append([profile_name, str(stats["success"]), str(stats["total"]), ""]) - - rows.append(["", "", "", ""]) - rows.append(["Vector Name", "Result", "Time (s)", "Profile"]) - for vector_name, test_vector in sorted(test_suite.test_vectors.items()): - profile_name = test_vector.profile.name if test_vector.profile else "" - time_str = f"{test_vector.test_time:.3f}" if test_vector.test_time else "0" - rows.append([vector_name, RESULT_MAP[test_vector.test_result], time_str, profile_name]) - - rows.append(["", "", "", ""]) - rows.append(["GLOBAL SUMMARY", "", "", ""]) - - all_decoders = [] - decoder_names = set() - for test_suite_results in results.values(): - for decoder, _ in test_suite_results: - if decoder.name not in decoder_names: - all_decoders.append(decoder) - decoder_names.add(decoder.name) + rows.extend([["", "", "", ""], ["Profile", "Passed", "Total", ""]]) + rows.extend( + [ + [profile_name, str(stats["passed"]), str(stats["total"]), ""] + for profile_name, stats in sorted(profile_stats.items()) + ] + ) + + # Detailed Vector Results + rows.extend([["", "", "", ""], ["Vector Name", "Result", "Time (s)", "Profile"]]) + + # Using list comprehension for performance + rows.extend( + [ + [ + tv_name, + RESULT_MAP[tv.test_result], + f"{tv.test_time:.3f}" if tv.test_time else "0", + tv.profile.name if tv.profile else "", + ] + for tv_name, tv in sorted(test_suite.test_vectors.items()) + ] + ) - for decoder in all_decoders: - total_success = 0 - total_vectors = 0 - total_time = 0.0 - decoder_profile_stats: Dict[str, Dict[str, int]] = {} + should_show_summary = len(results) > 1 or any(len(res) > 1 for res in results.values()) + + if should_show_summary: + rows.extend([["", "", "", ""], ["GLOBAL SUMMARY", "", "", ""]]) + + stats_map: Dict[str, Dict[str, Any]] = {} for test_suite_results in results.values(): - for dec, test_suite in test_suite_results: - if dec.name == decoder.name: - total_success += test_suite.test_vectors_success - total_vectors += len(test_suite.test_vectors) - timeouts = self._calculate_timeout_adjustment(ctx, test_suite) - total_time += test_suite.time_taken - timeouts - - test_suite_profile_stats = self._calculate_profile_stats(test_suite.test_vectors) - for profile_name, profile_data in test_suite_profile_stats.items(): - if profile_name not in decoder_profile_stats: - decoder_profile_stats[profile_name] = {"success": 0, "total": 0} - decoder_profile_stats[profile_name]["total"] += profile_data["total"] - decoder_profile_stats[profile_name]["success"] += profile_data["success"] - - rows.append(["", "", "", ""]) - rows.append([f"Decoder: {decoder.name}", "", "", ""]) - rows.append(["Total Success", str(total_success), "", ""]) - rows.append(["Total Vectors", str(total_vectors), "", ""]) - rows.append(["Total Time (s)", f"{total_time:.3f}", "", ""]) - - if decoder_profile_stats: - rows.append(["", "", "", ""]) - rows.append(["Profile", "Success", "Total", ""]) - for profile_name, profile_data in sorted(decoder_profile_stats.items()): - rows.append([profile_name, str(profile_data["success"]), str(profile_data["total"]), ""]) - - # Ensure all rows have exactly 4 columns and format as CSV - csv_lines = [] - for row in rows: - while len(row) < 4: - row.append("") - csv_lines.append(row[:4]) + for decoder, test_suite in test_suite_results: + name = decoder.name + if name not in stats_map: + stats_map[name] = { + "passed": 0, + "not_run": 0, + "not_supported": 0, + "failed_error": 0, + "total": 0, + "time": 0.0, + "profiles": {}, + } + + entry: Dict[str, Any] = stats_map[name] + ts = test_suite + + # Update core counts + entry["total"] += len(ts.test_vectors) + entry["passed"] += ts.test_vectors_success + entry["not_run"] += ts.test_vectors_not_run + entry["not_supported"] += ts.test_vectors_not_supported + + # Update time + timeout_time = self._calculate_timeout_adjustment(ctx, ts) + entry["time"] += ts.time_taken - timeout_time + + # Update profiles + ts_profiles = self._calculate_profile_stats(ts.test_vectors) + for profile_name, profile_data in ts_profiles.items(): + profile_entry: Dict[str, int] = entry["profiles"].setdefault( + profile_name, {"passed": 0, "total": 0} + ) + profile_entry["passed"] += profile_data["passed"] + profile_entry["total"] += profile_data["total"] + + for name, data in stats_map.items(): + failed = data["total"] - data["passed"] - data["not_run"] - data["not_supported"] + + rows.extend( + [ + ["", "", "", ""], + [f"Decoder: {name}", "", "", ""], + ["Total Tests", str(data["total"]), "", ""], + ["Passed", str(data["passed"]), "", ""], + ] + ) - if ctx.summary_output: - with open(ctx.summary_output, mode="w", encoding="utf8", newline="") as file: - writer = csv.writer(file) - writer.writerows(csv_lines) - else: - output = io.StringIO() - writer = csv.writer(output) - writer.writerows(csv_lines) - print(output.getvalue(), end="") + if data["not_run"] > 0: + rows.append(["Not Run", str(data["not_run"]), "", ""]) + if data["not_supported"] > 0: + rows.append(["Not Supported", str(data["not_supported"]), "", ""]) - @staticmethod - def _calculate_profile_stats(test_vectors: Dict[str, TestVector]) -> Dict[str, Dict[str, int]]: - """Calculate profile statistics from test vectors""" - profile_stats: Dict[str, Dict[str, int]] = {} - for test_vector in test_vectors.values(): - if test_vector.profile is not None: - profile_name = test_vector.profile.name - if profile_name not in profile_stats: - profile_stats[profile_name] = {"success": 0, "total": 0} - profile_stats[profile_name]["total"] += 1 - if test_vector.test_result == TestVectorResult.SUCCESS: - profile_stats[profile_name]["success"] += 1 - return profile_stats + rows.extend([["Failed\\Error", str(failed), "", ""], ["Total Time (s)", f"{data['time']:.3f}", "", ""]]) - @staticmethod - def _calculate_timeout_adjustment(ctx: Context, test_suite: TestSuite) -> float: - """Calculate timeout adjustment for test suite timing""" - if ctx.jobs == 1: - return sum( - ctx.timeout for tv in test_suite.test_vectors.values() if tv.test_result == TestVectorResult.TIMEOUT - ) - return 0.0 + if data["profiles"]: + rows.extend([["", "", "", ""], ["Profile", "Passed", "Total", ""]]) + rows.extend( + [ + [profile_name, str(profile_stats["passed"]), str(profile_stats["total"]), ""] + for profile_name, profile_stats in sorted(data["profiles"].items()) + ] + ) + + # Use a generator to normalize rows on the fly + # This ensures exactly 4 columns: (row + 4 empty strings) truncated to 4 + formatted_rows = ((row + [""] * 4)[:4] for row in rows) + + if ctx.summary_output: + with open(ctx.summary_output, "w", encoding="utf-8", newline="") as file: + csv.writer(file).writerows(formatted_rows) + else: + csv.writer(sys.stdout).writerows(formatted_rows) def _generate_json_summary(self, ctx: Context, results: Dict[str, List[Tuple[Decoder, TestSuite]]]) -> None: """Generate JSON summary report with system information""" system_info = SystemInfo() + test_suites_data: Dict[str, Any] = {} + global_summary_data: Dict[str, Any] = {} + json_output = { + "system_info": system_info.to_dict(), + "test_suites": test_suites_data, + "global_summary": global_summary_data, + } - json_output: Dict[str, Any] = {"system_info": system_info.to_dict(), "test_suites": {}} + global_stats: Dict[str, Dict[str, Any]] = {} for test_suite_name, test_suite_results in results.items(): suite_data: Dict[str, Any] = {"decoders": {}} - for decoder, test_suite in test_suite_results: - timeouts = self._calculate_timeout_adjustment(ctx, test_suite) + ts = test_suite + name = decoder.name + tv_total = len(ts.test_vectors) + tv_passed = ts.test_vectors_success + tv_not_run = ts.test_vectors_not_run + tv_not_supported = ts.test_vectors_not_supported + tv_failed = tv_total - tv_passed - tv_not_run - tv_not_supported + timeout_time = self._calculate_timeout_adjustment(ctx, ts) + time_taken = ts.time_taken - timeout_time + + if name not in global_stats: + global_stats[name] = { + "total_tests": 0, + "passed": 0, + "not_run": 0, + "not_supported": 0, + "failed_error": 0, + "time": 0.0, + "profiles": {}, + } + global_entry: Dict[str, Any] = global_stats[name] + global_entry["total_tests"] += tv_total + global_entry["passed"] += tv_passed + global_entry["not_run"] += tv_not_run + global_entry["not_supported"] += tv_not_supported + global_entry["failed_error"] += tv_failed + global_entry["time"] += time_taken + + # Build Decoder Data decoder_data: Dict[str, Any] = { - "decoder_name": decoder.name, - "total_vectors": len(test_suite.test_vectors), - "success_vectors": test_suite.test_vectors_success, - "not_supported_vectors": test_suite.test_vectors_not_supported, - "total_time": round(test_suite.time_taken - timeouts, 3), - "vectors": {}, + "decoder_name": name, + "total_tests": tv_total, + "passed": tv_passed, } - profile_stats = self._calculate_profile_stats(test_suite.test_vectors) + if tv_not_run > 0: + decoder_data["not_run"] = tv_not_run + if tv_not_supported > 0: + decoder_data["not_supported"] = tv_not_supported + + decoder_data["failed_error"] = tv_failed + decoder_data["total_time"] = round(time_taken, 3) + + # Profile Stats + profile_stats = self._calculate_profile_stats(ts.test_vectors) if profile_stats: decoder_data["profile_stats"] = profile_stats - - for vector_name, test_vector in test_suite.test_vectors.items(): - vector_data: Dict[str, Any] = { - "result": RESULT_MAP[test_vector.test_result], - "time": round(test_vector.test_time, 3) if test_vector.test_time else 0, + for profile_name, profile_data in profile_stats.items(): + if profile_name not in global_entry["profiles"]: + global_entry["profiles"][profile_name] = {"passed": 0, "total": 0} + global_profile_entry: Dict[str, int] = global_entry["profiles"][profile_name] + global_profile_entry["passed"] += profile_data["passed"] + global_profile_entry["total"] += profile_data["total"] + + # Vector Details + test_vectors_dict = {} + for tv_name, tv in ts.test_vectors.items(): + vector_data = { + "result": RESULT_MAP[tv.test_result], + "time": round(tv.test_time, 3) if tv.test_time else 0, } + if tv.profile: + vector_data["profile"] = tv.profile.name - if test_vector.profile: - vector_data["profile"] = test_vector.profile.name - - if test_vector.errors: - vector_data["errors"] = [err[0] for err in test_vector.errors] + test_vectors_dict[tv_name] = vector_data - decoder_data["vectors"][vector_name] = vector_data + decoder_data["vectors"] = test_vectors_dict + suite_data["decoders"][name] = decoder_data - suite_data["decoders"][decoder.name] = decoder_data + test_suites_data[test_suite_name] = suite_data - json_output["test_suites"][test_suite_name] = suite_data - - all_decoders = [] - decoder_names = set() - for test_suite_results in results.values(): - for decoder, _ in test_suite_results: - if decoder.name not in decoder_names: - all_decoders.append(decoder) - decoder_names.add(decoder.name) + # Global Summary + if len(results) > 1 or any(len(res) > 1 for res in results.values()): + for name, data in global_stats.items(): + summary_entry: Dict[str, Any] = { + "total_tests": data["total_tests"], + "passed": data["passed"], + } - global_summary: Dict[str, Any] = {} - for decoder in all_decoders: - total_success = 0 - total_vectors = 0 - total_time = 0.0 - decoder_profile_stats: Dict[str, Dict[str, int]] = {} + if data["not_run"] > 0: + summary_entry["not_run"] = data["not_run"] + if data["not_supported"] > 0: + summary_entry["not_supported"] = data["not_supported"] - for test_suite_results in results.values(): - for dec, test_suite in test_suite_results: - if dec.name == decoder.name: - total_success += test_suite.test_vectors_success - total_vectors += len(test_suite.test_vectors) - - timeouts = self._calculate_timeout_adjustment(ctx, test_suite) - total_time += test_suite.time_taken - timeouts - - test_suite_profile_stats = self._calculate_profile_stats(test_suite.test_vectors) - for profile_name, profile_data in test_suite_profile_stats.items(): - if profile_name not in decoder_profile_stats: - decoder_profile_stats[profile_name] = {"success": 0, "total": 0} - decoder_profile_stats[profile_name]["total"] += profile_data["total"] - decoder_profile_stats[profile_name]["success"] += profile_data["success"] - - global_summary[decoder.name] = { - "total_success": total_success, - "total_vectors": total_vectors, - "total_time": round(total_time, 3), - } + summary_entry["failed_error"] = data["failed_error"] + summary_entry["total_time"] = round(data["time"], 3) - if decoder_profile_stats: - global_summary[decoder.name]["profile_stats"] = decoder_profile_stats + if data["profiles"]: + summary_entry["profile_stats"] = data["profiles"] - json_output["global_summary"] = global_summary + global_summary_data[name] = summary_entry + else: + del json_output["global_summary"] + # Output if ctx.summary_output: - with open(ctx.summary_output, "w+", encoding="utf-8") as summary_file: - json.dump(json_output, summary_file, indent=2) - summary_file.write("\n") + with open(ctx.summary_output, "w", encoding="utf-8") as f: + json.dump(json_output, f, indent=2) + f.write("\n") else: print(json.dumps(json_output, indent=2)) @@ -666,35 +741,62 @@ def _global_stats( test_suites: List[TestSuite], ) -> str: separator = f"|-|{'-|' * len(results)}" - output = "|Test|" + output = f"**Test Suite: {test_suites[0].name}**" + "\n\n" + output += "|Test|" for decoder, _ in results: output += f"{decoder.name}|" output += "\n" + separator - output += "\n|TOTAL|" - for test_suite in test_suites: - output += f"{test_suite.test_vectors_success}/{len(test_suite.test_vectors)}|" - output += "\n|NOT SUPPORTED|" - for test_suite in test_suites: - output += f"{test_suite.test_vectors_not_supported}/{len(test_suite.test_vectors)}|" - output += "\n|FAIL/ERROR|" - for test_suite in test_suites: - failed = ( - len(test_suite.test_vectors) - - test_suite.test_vectors_success - - test_suite.test_vectors_not_supported - ) - output += f"{failed}/{len(test_suite.test_vectors)}|" - output += "\n|TOTAL TIME|" - for test_suite in test_suites: + + rows: Dict[str, List[str]] = { + "PASSED": [], + "NOT RUN": [], + "NOT SUPPORTED": [], + "FAILED\\ERROR": [], + "TOTAL TIME": [], + } + + # Flags to track if we need to show these rows + show_not_run = False + show_not_supported = False + + for ts in test_suites: + tv_total = len(ts.test_vectors) + tv_passed = ts.test_vectors_success + tv_not_run = ts.test_vectors_not_run + tv_not_supported = ts.test_vectors_not_supported + tv_failed = tv_total - tv_passed - tv_not_run - tv_not_supported + + # Track if we ever encounter a non-zero value + if tv_not_run > 0: + show_not_run = True + if tv_not_supported > 0: + show_not_supported = True + + # Store formatted strings for each column + rows["PASSED"].append(f"{tv_passed}/{tv_total}") + rows["NOT RUN"].append(f"{tv_not_run}/{tv_total}") + rows["NOT SUPPORTED"].append(f"{tv_not_supported}/{tv_total}") + rows["FAILED\\ERROR"].append(f"{tv_failed}/{tv_total}") + + timeout_time = self._calculate_timeout_adjustment(ctx, ts) # Substract from the total time that took running a test suite on a decoder # the timeouts. This is not ideal since we won't be comparing decoding the # same number of test vectors, but at least it is much better than comparing # total times when timeouts are such a huge part of the global time taken. # Note: we only do this when the number of parallel jobs is 1, because # whenever there are actual parallel jobs, this gets much more complicated. - timeouts = self._calculate_timeout_adjustment(ctx, test_suite) - total_time = test_suite.time_taken - timeouts - output += f"{total_time:.3f}s|" + rows["TOTAL TIME"].append(f"{ts.time_taken - timeout_time:.3f}s") + + # Define the order and filter which rows to actually include + labels_to_include = ["PASSED"] + if show_not_run: + labels_to_include.append("NOT RUN") + if show_not_supported: + labels_to_include.append("NOT SUPPORTED") + labels_to_include.extend(["FAILED\\ERROR", "TOTAL TIME"]) + + # Construct the final string using a join for performance + output += "".join(f"\n|{label}|{'|'.join(rows[label])}|" for label in labels_to_include) return output def _profile_stats( @@ -752,42 +854,53 @@ def _generate_global_summary(results: Dict[str, List[Tuple[Decoder, TestSuite]]] all_decoders.append(decoder) decoder_names.add(decoder.name) - decoder_totals = {dec.name: {"success": 0, "total": 0, "not_supported": 0} for dec in all_decoders} + decoder_totals = { + dec.name: {"passed": 0, "not_run": 0, "not_supported": 0, "total": 0} for dec in all_decoders + } decoder_times = {dec.name: 0.0 for dec in all_decoders} global_profile_stats: Dict[str, Dict[str, Dict[str, int]]] = {dec.name: {} for dec in all_decoders} for test_suite_results in results.values(): for decoder, test_suite in test_suite_results: totals = decoder_totals[decoder.name] - totals["success"] += test_suite.test_vectors_success + totals["passed"] += test_suite.test_vectors_success + totals["not_run"] += test_suite.test_vectors_not_run totals["not_supported"] += test_suite.test_vectors_not_supported totals["total"] += len(test_suite.test_vectors) - timeouts = self._calculate_timeout_adjustment(ctx, test_suite) - decoder_times[decoder.name] += test_suite.time_taken - timeouts + timeout_time = self._calculate_timeout_adjustment(ctx, test_suite) + decoder_times[decoder.name] += test_suite.time_taken - timeout_time test_suite_profile_stats = self._calculate_profile_stats(test_suite.test_vectors) for profile_name, profile_data in test_suite_profile_stats.items(): - stats = global_profile_stats[decoder.name].setdefault(profile_name, {"success": 0, "total": 0}) + stats = global_profile_stats[decoder.name].setdefault(profile_name, {"passed": 0, "total": 0}) + stats["passed"] += profile_data["passed"] stats["total"] += profile_data["total"] - stats["success"] += profile_data["success"] separator = f"|-|{'-|' * len(all_decoders)}" output = "\n# GLOBAL SUMMARY" - output += "\n|TOTALS|" + "".join(f"{dec.name}|" for dec in all_decoders) + "\n" + separator - output += "\n|TOTAL|" + "".join( - f"{decoder_totals[dec.name]['success']}/{decoder_totals[dec.name]['total']}|" for dec in all_decoders - ) - output += "\n|NOT SUPPORTED|" + "".join( - f"{decoder_totals[dec.name]['not_supported']}/{decoder_totals[dec.name]['total']}|" - for dec in all_decoders + output += "\n|Total Tests|" + "".join(f"{dec.name}|" for dec in all_decoders) + "\n" + separator + output += "\n|PASSED|" + "".join( + f"{decoder_totals[dec.name]['passed']}/{decoder_totals[dec.name]['total']}|" for dec in all_decoders ) + # Only add NOT RUN if at least one decoder has a 'not_run' count > 0 + if any(decoder_totals[dec.name]["not_run"] > 0 for dec in all_decoders): + output += "\n|NOT RUN|" + "".join( + f"{decoder_totals[dec.name]['not_run']}/{decoder_totals[dec.name]['total']}|" + for dec in all_decoders + ) + # Only add NOT SUPPORTED if at least one decoder has a 'not_supported' count > 0 + if any(decoder_totals[dec.name]["not_supported"] > 0 for dec in all_decoders): + output += "\n|NOT SUPPORTED|" + "".join( + f"{decoder_totals[dec.name]['not_supported']}/{decoder_totals[dec.name]['total']}|" + for dec in all_decoders + ) fail_error_parts = [] for dec in all_decoders: totals = decoder_totals[dec.name] - failed = totals["total"] - totals["success"] - totals["not_supported"] + failed = totals["total"] - totals["passed"] - totals["not_run"] - totals["not_supported"] fail_error_parts.append(f"{failed}/{totals['total']}|") - output += "\n|FAIL/ERROR|" + "".join(fail_error_parts) + output += "\n|FAILED\\ERROR|" + "".join(fail_error_parts) output += "\n|TOTAL TIME|" + "".join(f"{decoder_times[dec.name]:.3f}s|" for dec in all_decoders) all_profiles: Set[str] = set() @@ -800,8 +913,8 @@ def _generate_global_summary(results: Dict[str, List[Tuple[Decoder, TestSuite]]] for profile in sorted(all_profiles): output += f"\n|{profile}|" for dec in all_decoders: - stats = global_profile_stats[dec.name].get(profile, {"success": 0, "total": 0}) - output += f"{stats['success']}/{stats['total']}|" + stats = global_profile_stats[dec.name].get(profile, {"passed": 0, "total": 0}) + output += f"{stats['passed']}/{stats['total']}|" return output @@ -814,6 +927,10 @@ def _generate_global_summary(results: Dict[str, List[Tuple[Decoder, TestSuite]]] output += _global_stats(test_suite_results, test_suites) output += "\n\n" + profile_output = _profile_stats(test_suite_results) + if profile_output: + output += profile_output + "\n\n" + separator = f"|-|{'-|' * len(test_suite_results)}" output += "|Test|" for decoder, _ in test_suite_results: @@ -826,16 +943,10 @@ def _generate_global_summary(results: Dict[str, List[Tuple[Decoder, TestSuite]]] output += self.emoji[tvector.test_result] + "|" output += "\n\n" - output += _global_stats(test_suite_results, test_suites) - output += "\n\n" - - profile_output = _profile_stats(test_suite_results) - if profile_output: - output += profile_output + "\n\n" - - global_summary = _generate_global_summary(results) - if global_summary: - output += global_summary + "\n\n" + if len(results.keys()) > 1 or any(len(test_suite_res) > 1 for test_suite_res in results.values()): + global_summary = _generate_global_summary(results) + if global_summary: + output += global_summary + "\n\n" if ctx.summary_output: with open(ctx.summary_output, "w+", encoding="utf-8") as summary_file: diff --git a/fluster/test_suite.py b/fluster/test_suite.py index bf8f370b..0fc3eab6 100644 --- a/fluster/test_suite.py +++ b/fluster/test_suite.py @@ -157,6 +157,7 @@ def __init__( self.filename = filename self.resources_dir = resources_dir self.test_vectors_success = 0 + self.test_vectors_not_run = 0 self.test_vectors_not_supported = 0 self.time_taken = 0.0 @@ -177,6 +178,7 @@ def from_json_file(cls: Type["TestSuite"], filename: str, resources_dir: str) -> data["test_method"] = TestMethod(data["test_method"]) # Remove runtime-only fields if present in malformed JSON data.pop("test_vectors_success", None) + data.pop("test_vectors_not_run", None) data.pop("test_vectors_not_supported", None) data.pop("time_taken", None) return cls(filename, resources_dir, **data) @@ -188,6 +190,7 @@ def to_json_file(self, filename: str) -> None: data.pop("resources_dir") data.pop("filename") data.pop("test_vectors_success") + data.pop("test_vectors_not_run") data.pop("test_vectors_not_supported") data.pop("time_taken") if self.failing_test_vectors is None: @@ -388,6 +391,13 @@ def _run_worker(self, test: Test) -> TestVector: test(test_result) self._collect_results(test_result) + + if self.negative_test and not test.skip: + if test.test_vector.test_result == TestVectorResult.SUCCESS: + test.test_vector.test_result = TestVectorResult.FAIL + elif test.test_vector.test_result == TestVectorResult.FAIL: + test.test_vector.test_result = TestVectorResult.SUCCESS + self._rename_test(test, module_orig, qualname_orig) return test.test_vector @@ -439,11 +449,6 @@ def run_test_suite_in_parallel(self, jobs: int, tests: List[Test], failfast: boo with Pool(jobs) as pool: def _callback(test_result: TestVector) -> None: - if self.negative_test: - if test_result.errors: - test_result.test_result = TestVectorResult.SUCCESS - else: - test_result.test_result = TestVectorResult.FAIL print( self._get_result_line( self.name, @@ -465,28 +470,30 @@ def _callback(test_result: TestVector) -> None: self.time_taken = perf_counter() - start print("\n") self.test_vectors_success = 0 + self.test_vectors_not_run = 0 self.test_vectors_not_supported = 0 for test_vector_res in test_vector_results: - if test_vector_res.test_result == TestVectorResult.NOT_SUPPORTED: + if test_vector_res.test_result == TestVectorResult.SUCCESS: + self.test_vectors_success += 1 + elif test_vector_res.test_result == TestVectorResult.NOT_SUPPORTED: self.test_vectors_not_supported += 1 - elif test_vector_res.errors: - if self.negative_test: - self.test_vectors_success += 1 - else: - for error in test_vector_res.errors: - # Use same format to report errors as TextTestRunner - print(f"{'=' * 71}\nFAIL: {error[0]}\n{'-' * 70}") - for line in error[1:]: - print(line) - else: - if not self.negative_test: - self.test_vectors_success += 1 + elif test_vector_res.test_result == TestVectorResult.NOT_RUN: + self.test_vectors_not_run += 1 + + if test_vector_res.errors and not self.negative_test: + for error in test_vector_res.errors: + # Use same format to report errors as TextTestRunner + print(f"{'=' * 71}\nFAIL: {error[0]}\n{'-' * 70}") + for line in error[1:]: + print(line) # Collect the test vector results and failures since they come # from a different process self.test_vectors[test_vector_res.name] = test_vector_res status_parts = [f"{self.test_vectors_success}/{len(tests)} tests successfully"] + if self.test_vectors_not_run > 0: + status_parts.append(f"{self.test_vectors_not_run} not run") if self.test_vectors_not_supported > 0: status_parts.append(f"{self.test_vectors_not_supported} not supported") status_parts.append(f"in {self.time_taken:.3f} secs")