diff --git a/graph_net/analysis_util.py b/graph_net/analysis_util.py index e4f192391..d5150f8c8 100644 --- a/graph_net/analysis_util.py +++ b/graph_net/analysis_util.py @@ -41,34 +41,32 @@ def extract_speedup_data_from_subdirs(benchmark_path: str) -> dict: # but os.walk is also robust for nested directories if needed in the future. for root, _, files in os.walk(current_dir_path): for file in files: - if file.endswith(".json"): - json_file = os.path.join(root, file) - try: - with open(json_file, "r") as f: - data = json.load(f) - performance = data.get("performance", {}) - if not performance: - continue - - speedup_data = performance.get("speedup") - if isinstance(speedup_data, dict): - # Prioritize 'e2e' speedup, fallback to 'gpu' - if "e2e" in speedup_data: - data_by_subdir[subdir_name].append( - speedup_data["e2e"] - ) - elif "gpu" in speedup_data: - data_by_subdir[subdir_name].append( - speedup_data["gpu"] - ) - elif isinstance(speedup_data, (float, int)): - data_by_subdir[subdir_name].append(speedup_data) - - except (json.JSONDecodeError, KeyError) as e: - print( - f"Warning: Failed to read or parse file -> {json_file}, Error: {e}" - ) - continue + if not file.endswith(".json"): + continue + + json_file = os.path.join(root, file) + try: + with open(json_file, "r") as f: + data = json.load(f) + performance = data.get("performance", {}) + if not performance: + continue + + speedup_data = performance.get("speedup") + if isinstance(speedup_data, dict): + # Prioritize 'e2e' speedup, fallback to 'gpu' + if "e2e" in speedup_data: + data_by_subdir[subdir_name].append(speedup_data["e2e"]) + elif "gpu" in speedup_data: + data_by_subdir[subdir_name].append(speedup_data["gpu"]) + elif isinstance(speedup_data, (float, int)): + data_by_subdir[subdir_name].append(speedup_data) + + except (json.JSONDecodeError, KeyError) as e: + print( + f"Warning: Failed to read or parse file -> {json_file}, Error: {e}" + ) + continue return data_by_subdir @@ -85,6 +83,72 @@ def load_json_file(filepath: str) -> dict: return {} +def detect_sample_error_code(log_text: str) -> str: + """ + Detect the error code for a single sample from log text. + + This function is used for bug subgraph detection. It analyzes log text + (which can be generated from a single sample) and returns an error code. + + Args: + log_text: Log text content (can be a string or list of lines) + + Returns: + Error code string. Possible values: + - "correct": Sample executed successfully + - "eager_fail": Eager model execution failed + - "compile_fail": Compiled model compilation failed + - "runtime_fail": Runtime error during execution + - "unknown": Unable to determine error type + """ + if isinstance(log_text, str): + lines = log_text.split("\n") + else: + lines = log_text + + # Define regex patterns for error detection + patterns = { + "result_status": re.compile(r"\[Result\] status: (.+)"), + "failure": re.compile(r"\[Fail due to (.+)\.\]"), + } + + # Error type mapping based on failure reason keywords + error_keywords = { + "eager": "eager_fail", + "compiled": "compile_fail", + } + + for i, line in enumerate(lines): + result_status_match = patterns["result_status"].search(line) + if not result_status_match: + continue + + status = result_status_match.group(1).strip() + if status == "success": + return "correct" + + if status != "failed": + continue + + # Check the next line for failure reason + if (i + 1) >= len(lines): + return "runtime_fail" + + error_reason_match = patterns["failure"].search(lines[i + 1]) + if not error_reason_match: + return "runtime_fail" + + reason = error_reason_match.group(1).lower() + # Check for specific error keywords + for keyword, error_code in error_keywords.items(): + if keyword in reason: + return error_code + + return "runtime_fail" + + return "unknown" + + def parse_logs_to_data(log_file: str) -> list: """ Parse a structured log file generated by the benchmark script and @@ -189,32 +253,39 @@ def parse_logs_to_data(log_file: str) -> list: data["correctness"][key.strip()] = values continue - # Look for the status, and if it's "failed", look ahead to the next line. - result_status_match = patterns["result_status"].search(line) - if result_status_match: - status = result_status_match.group(1).strip() - data["result"]["status"] = status - if status == "failed" and (i + 1) < len(lines): - error_reason_match = patterns["failure"].search(lines[i + 1]) - if error_reason_match: - reason = error_reason_match.group(1).lower() - if "eager" in reason: - data["performance"]["failure"] = "eager" - data["result"]["status"] = "eager_fail" - elif "compiled" in reason: - data["performance"]["failure"] = "compiled" - data["result"]["status"] = "compile_fail" - else: - data["performance"]["failure"] = "other" - data["result"]["status"] = "runtime_fail" - continue - + # Check for speedup speedup_match = patterns["speedup"].search(line) if speedup_match: key, value_str = speedup_match.groups() data["performance"]["speedup"][key.strip()] = float(value_str) continue + # Look for the status, and if it's "failed", look ahead to the next line. + result_status_match = patterns["result_status"].search(line) + if not result_status_match: + continue + + status = result_status_match.group(1).strip() + data["result"]["status"] = status + if status != "failed" or (i + 1) >= len(lines): + continue + + error_reason_match = patterns["failure"].search(lines[i + 1]) + if not error_reason_match: + continue + + reason = error_reason_match.group(1).lower() + if "eager" in reason: + data["performance"]["failure"] = "eager" + data["result"]["status"] = "eager_fail" + elif "compiled" in reason: + data["performance"]["failure"] = "compiled" + data["result"]["status"] = "compile_fail" + else: + data["performance"]["failure"] = "other" + data["result"]["status"] = "runtime_fail" + continue + # After parsing all lines, process the results if not all_runs_data: print("No processable log entries found in the file.") @@ -223,30 +294,24 @@ def parse_logs_to_data(log_file: str) -> list: samples = [] for run_key, data in all_runs_data.items(): try: + speedup_dict = data["performance"].get("speedup", {}) + # Build result field with status and speedup (for compatibility with log2json output format) - if data["result"]["status"] == "success": + if data["result"]["status"] == "success" and speedup_dict: speedup_data = {} - if "e2e" in data["performance"]["speedup"]: - e2e_value = data["performance"]["speedup"]["e2e"] - speedup_data["e2e"] = {"mean": e2e_value} - if "gpu" in data["performance"]["speedup"]: - gpu_value = data["performance"]["speedup"]["gpu"] - speedup_data["gpu"] = {"mean": gpu_value} + for key in ["e2e", "gpu"]: + if key in speedup_dict: + speedup_data[key] = {"mean": speedup_dict[key]} if speedup_data: data["result"]["speedup"] = speedup_data - # Ensure performance.speedup.e2e is a direct value (not nested dict) + # Ensure performance.speedup.e2e/gpu are direct values (not nested dict) # This is required by calculate_s_scores which uses performance_data.get("speedup", {}).get("e2e") - if "speedup" in data["performance"]: - speedup_dict = data["performance"]["speedup"] - if "e2e" in speedup_dict: - e2e_val = speedup_dict["e2e"] - if isinstance(e2e_val, dict) and "mean" in e2e_val: - speedup_dict["e2e"] = e2e_val["mean"] - if "gpu" in speedup_dict: - gpu_val = speedup_dict["gpu"] - if isinstance(gpu_val, dict) and "mean" in gpu_val: - speedup_dict["gpu"] = gpu_val["mean"] + for key in ["e2e", "gpu"]: + if key in speedup_dict: + val = speedup_dict[key] + if isinstance(val, dict) and "mean" in val: + speedup_dict[key] = val["mean"] samples.append(data) @@ -261,53 +326,31 @@ def parse_logs_to_data(log_file: str) -> list: return samples -def load_one_folder(folder_path: str) -> list: - """ - Traverse all .json files in a *single* folder and load all raw data. - Returns a list of raw data dictionaries. - """ - if not os.path.isdir(folder_path): - return [] - - folder_name = os.path.basename(folder_path) - samples = [] - print(f" - Loading JSON files from folder: {folder_path}") - - for filename in os.listdir(folder_path): - if filename.endswith(".json"): - filepath = os.path.join(folder_path, filename) - data = load_json_file(filepath) - if data: - samples.append(data) - return samples - - def scan_all_folders(benchmark_path: str) -> dict: """ - Unified entry point that supports both log files and JSON directories: - - If benchmark_path is a log file → parse it directly and return data as a single curve. - - If benchmark_path is a directory with .json files directly under it → treat them as a single curve. - - Otherwise, fallback to the old logic where subdirectories represent curves. - Returns dict[folder_name] -> list_of_samples + Unified entry point that supports log files and directories: + - If benchmark_path is a log file (.log or .txt) → parse it directly and return data as a single curve. + - If benchmark_path is a directory → scan for .log and .txt files in the directory, + each log file becomes a curve. + Returns dict[curve_name] -> list_of_samples """ - # Check if the path is a log file + # Handle single log file if os.path.isfile(benchmark_path): print(f"Detected log file: '{benchmark_path}'") samples = parse_logs_to_data(benchmark_path) - if samples: - # Use the log file name (without extension) as the curve name - folder_name = ( - os.path.splitext(os.path.basename(benchmark_path))[0] or "benchmark" - ) - print( - f" - Parsed log file → 1 curve '{folder_name}' " - f"with {len(samples)} samples." - ) - return {folder_name: samples} - else: + if not samples: print(f" - No valid data found in log file.") return {} + folder_name = ( + os.path.splitext(os.path.basename(benchmark_path))[0] or "benchmark" + ) + print( + f" - Parsed log file → 1 curve '{folder_name}' " + f"with {len(samples)} samples." + ) + return {folder_name: samples} + # Check if it's a directory if not os.path.isdir(benchmark_path): print( @@ -317,27 +360,38 @@ def scan_all_folders(benchmark_path: str) -> dict: print(f"Scanning '{benchmark_path}' ...") - # Try flat structure, directly read JSON - flat_samples = load_one_folder(benchmark_path) - if flat_samples: # ≥1 JSON loaded successfully - folder_name = os.path.basename(benchmark_path) or "benchmark" - print( - f" - Detected flat structure → 1 curve '{folder_name}' " - f"with {len(flat_samples)} samples." - ) - return {folder_name: flat_samples} + # Find .log and .txt files in the directory + log_files = sorted( + [ + f + for f in os.listdir(benchmark_path) + if os.path.isfile(os.path.join(benchmark_path, f)) + and f.endswith((".log", ".txt")) + ] + ) - # Fall back to subdirectories as curves logic + if not log_files: + print(" - No log files (.log or .txt) found in directory.") + return {} + + # Process log files, each becomes a curve all_results = {} - print(" - No JSON files found at top level → scanning sub-folders.") - for entry in os.listdir(benchmark_path): - folder_full_path = os.path.join(benchmark_path, entry) - if os.path.isdir(folder_full_path): - samples = load_one_folder(folder_full_path) - if samples: - all_results[entry] = samples - print(f" - Folder '{entry}' loaded {len(samples)} samples.") - print(f"Total folders loaded: {len(all_results)}") + print(f" - Found {len(log_files)} log file(s) → each becomes a curve.") + for log_file in log_files: + log_file_path = os.path.join(benchmark_path, log_file) + samples = parse_logs_to_data(log_file_path) + if not samples: + continue + + curve_name = os.path.splitext(log_file)[0] or "benchmark" + all_results[curve_name] = samples + print(f" - Curve '{curve_name}': {len(samples)} samples.") + + if not all_results: + print(" - No valid data found in any log file.") + return {} + + print(f"Total curves loaded: {len(all_results)}") return all_results @@ -526,7 +580,6 @@ def print_stat_info( rectified_speedups.append(regularized_speedup) # ES(t) calculation: based on state change - rec_speedup_fake_degrad = 0 if t_key < 1: if fail_type is not None or speedup is None: rec_speedup_fake_degrad = fpdb