Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 177 additions & 124 deletions graph_net/analysis_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,32 @@ def extract_speedup_data_from_subdirs(benchmark_path: str) -> dict:
# but os.walk is also robust for nested directories if needed in the future.
for root, _, files in os.walk(current_dir_path):
for file in files:
if file.endswith(".json"):
json_file = os.path.join(root, file)
try:
with open(json_file, "r") as f:
data = json.load(f)
performance = data.get("performance", {})
if not performance:
continue

speedup_data = performance.get("speedup")
if isinstance(speedup_data, dict):
# Prioritize 'e2e' speedup, fallback to 'gpu'
if "e2e" in speedup_data:
data_by_subdir[subdir_name].append(
speedup_data["e2e"]
)
elif "gpu" in speedup_data:
data_by_subdir[subdir_name].append(
speedup_data["gpu"]
)
elif isinstance(speedup_data, (float, int)):
data_by_subdir[subdir_name].append(speedup_data)

except (json.JSONDecodeError, KeyError) as e:
print(
f"Warning: Failed to read or parse file -> {json_file}, Error: {e}"
)
continue
if not file.endswith(".json"):
continue

json_file = os.path.join(root, file)
try:
with open(json_file, "r") as f:
data = json.load(f)
performance = data.get("performance", {})
if not performance:
continue

speedup_data = performance.get("speedup")
if isinstance(speedup_data, dict):
# Prioritize 'e2e' speedup, fallback to 'gpu'
if "e2e" in speedup_data:
data_by_subdir[subdir_name].append(speedup_data["e2e"])
elif "gpu" in speedup_data:
data_by_subdir[subdir_name].append(speedup_data["gpu"])
elif isinstance(speedup_data, (float, int)):
data_by_subdir[subdir_name].append(speedup_data)

except (json.JSONDecodeError, KeyError) as e:
print(
f"Warning: Failed to read or parse file -> {json_file}, Error: {e}"
)
continue

return data_by_subdir

Expand All @@ -85,6 +83,72 @@ def load_json_file(filepath: str) -> dict:
return {}


def detect_sample_error_code(log_text: str) -> str:
"""
Detect the error code for a single sample from log text.

This function is used for bug subgraph detection. It analyzes log text
(which can be generated from a single sample) and returns an error code.

Args:
log_text: Log text content (can be a string or list of lines)

Returns:
Error code string. Possible values:
- "correct": Sample executed successfully
- "eager_fail": Eager model execution failed
- "compile_fail": Compiled model compilation failed
- "runtime_fail": Runtime error during execution
- "unknown": Unable to determine error type
"""
if isinstance(log_text, str):
lines = log_text.split("\n")
else:
lines = log_text

# Define regex patterns for error detection
patterns = {
"result_status": re.compile(r"\[Result\] status: (.+)"),
"failure": re.compile(r"\[Fail due to (.+)\.\]"),
}

# Error type mapping based on failure reason keywords
error_keywords = {
"eager": "eager_fail",
"compiled": "compile_fail",
}

for i, line in enumerate(lines):
result_status_match = patterns["result_status"].search(line)
if not result_status_match:
continue

status = result_status_match.group(1).strip()
if status == "success":
return "correct"

if status != "failed":
continue

# Check the next line for failure reason
if (i + 1) >= len(lines):
return "runtime_fail"

error_reason_match = patterns["failure"].search(lines[i + 1])
if not error_reason_match:
return "runtime_fail"

reason = error_reason_match.group(1).lower()
# Check for specific error keywords
for keyword, error_code in error_keywords.items():
if keyword in reason:
return error_code

return "runtime_fail"

return "unknown"


def parse_logs_to_data(log_file: str) -> list:
"""
Parse a structured log file generated by the benchmark script and
Expand Down Expand Up @@ -189,32 +253,39 @@ def parse_logs_to_data(log_file: str) -> list:
data["correctness"][key.strip()] = values
continue

# Look for the status, and if it's "failed", look ahead to the next line.
result_status_match = patterns["result_status"].search(line)
if result_status_match:
status = result_status_match.group(1).strip()
data["result"]["status"] = status
if status == "failed" and (i + 1) < len(lines):
error_reason_match = patterns["failure"].search(lines[i + 1])
if error_reason_match:
reason = error_reason_match.group(1).lower()
if "eager" in reason:
data["performance"]["failure"] = "eager"
data["result"]["status"] = "eager_fail"
elif "compiled" in reason:
data["performance"]["failure"] = "compiled"
data["result"]["status"] = "compile_fail"
else:
data["performance"]["failure"] = "other"
data["result"]["status"] = "runtime_fail"
continue

# Check for speedup
speedup_match = patterns["speedup"].search(line)
if speedup_match:
key, value_str = speedup_match.groups()
data["performance"]["speedup"][key.strip()] = float(value_str)
continue

# Look for the status, and if it's "failed", look ahead to the next line.
result_status_match = patterns["result_status"].search(line)
if not result_status_match:
continue

status = result_status_match.group(1).strip()
data["result"]["status"] = status
if status != "failed" or (i + 1) >= len(lines):
continue

error_reason_match = patterns["failure"].search(lines[i + 1])
if not error_reason_match:
continue

reason = error_reason_match.group(1).lower()
if "eager" in reason:
data["performance"]["failure"] = "eager"
data["result"]["status"] = "eager_fail"
elif "compiled" in reason:
data["performance"]["failure"] = "compiled"
data["result"]["status"] = "compile_fail"
else:
data["performance"]["failure"] = "other"
data["result"]["status"] = "runtime_fail"
continue

# After parsing all lines, process the results
if not all_runs_data:
print("No processable log entries found in the file.")
Expand All @@ -223,30 +294,24 @@ def parse_logs_to_data(log_file: str) -> list:
samples = []
for run_key, data in all_runs_data.items():
try:
speedup_dict = data["performance"].get("speedup", {})

# Build result field with status and speedup (for compatibility with log2json output format)
if data["result"]["status"] == "success":
if data["result"]["status"] == "success" and speedup_dict:
speedup_data = {}
if "e2e" in data["performance"]["speedup"]:
e2e_value = data["performance"]["speedup"]["e2e"]
speedup_data["e2e"] = {"mean": e2e_value}
if "gpu" in data["performance"]["speedup"]:
gpu_value = data["performance"]["speedup"]["gpu"]
speedup_data["gpu"] = {"mean": gpu_value}
for key in ["e2e", "gpu"]:
if key in speedup_dict:
speedup_data[key] = {"mean": speedup_dict[key]}
if speedup_data:
data["result"]["speedup"] = speedup_data

# Ensure performance.speedup.e2e is a direct value (not nested dict)
# Ensure performance.speedup.e2e/gpu are direct values (not nested dict)
# This is required by calculate_s_scores which uses performance_data.get("speedup", {}).get("e2e")
if "speedup" in data["performance"]:
speedup_dict = data["performance"]["speedup"]
if "e2e" in speedup_dict:
e2e_val = speedup_dict["e2e"]
if isinstance(e2e_val, dict) and "mean" in e2e_val:
speedup_dict["e2e"] = e2e_val["mean"]
if "gpu" in speedup_dict:
gpu_val = speedup_dict["gpu"]
if isinstance(gpu_val, dict) and "mean" in gpu_val:
speedup_dict["gpu"] = gpu_val["mean"]
for key in ["e2e", "gpu"]:
if key in speedup_dict:
val = speedup_dict[key]
if isinstance(val, dict) and "mean" in val:
speedup_dict[key] = val["mean"]

samples.append(data)

Expand All @@ -261,53 +326,31 @@ def parse_logs_to_data(log_file: str) -> list:
return samples


def load_one_folder(folder_path: str) -> list:
"""
Traverse all .json files in a *single* folder and load all raw data.
Returns a list of raw data dictionaries.
"""
if not os.path.isdir(folder_path):
return []

folder_name = os.path.basename(folder_path)
samples = []
print(f" - Loading JSON files from folder: {folder_path}")

for filename in os.listdir(folder_path):
if filename.endswith(".json"):
filepath = os.path.join(folder_path, filename)
data = load_json_file(filepath)
if data:
samples.append(data)
return samples


def scan_all_folders(benchmark_path: str) -> dict:
"""
Unified entry point that supports both log files and JSON directories:
- If benchmark_path is a log file → parse it directly and return data as a single curve.
- If benchmark_path is a directory with .json files directly under it → treat them as a single curve.
- Otherwise, fallback to the old logic where subdirectories represent curves.
Returns dict[folder_name] -> list_of_samples
Unified entry point that supports log files and directories:
- If benchmark_path is a log file (.log or .txt) → parse it directly and return data as a single curve.
- If benchmark_path is a directory → scan for .log and .txt files in the directory,
each log file becomes a curve.
Returns dict[curve_name] -> list_of_samples
"""
# Check if the path is a log file
# Handle single log file
if os.path.isfile(benchmark_path):
print(f"Detected log file: '{benchmark_path}'")
samples = parse_logs_to_data(benchmark_path)
if samples:
# Use the log file name (without extension) as the curve name
folder_name = (
os.path.splitext(os.path.basename(benchmark_path))[0] or "benchmark"
)
print(
f" - Parsed log file → 1 curve '{folder_name}' "
f"with {len(samples)} samples."
)
return {folder_name: samples}
else:
if not samples:
print(f" - No valid data found in log file.")
return {}

folder_name = (
os.path.splitext(os.path.basename(benchmark_path))[0] or "benchmark"
)
print(
f" - Parsed log file → 1 curve '{folder_name}' "
f"with {len(samples)} samples."
)
return {folder_name: samples}

# Check if it's a directory
if not os.path.isdir(benchmark_path):
print(
Expand All @@ -317,27 +360,38 @@ def scan_all_folders(benchmark_path: str) -> dict:

print(f"Scanning '{benchmark_path}' ...")

# Try flat structure, directly read JSON
flat_samples = load_one_folder(benchmark_path)
if flat_samples: # ≥1 JSON loaded successfully
folder_name = os.path.basename(benchmark_path) or "benchmark"
print(
f" - Detected flat structure → 1 curve '{folder_name}' "
f"with {len(flat_samples)} samples."
)
return {folder_name: flat_samples}
# Find .log and .txt files in the directory
log_files = sorted(
[
f
for f in os.listdir(benchmark_path)
if os.path.isfile(os.path.join(benchmark_path, f))
and f.endswith((".log", ".txt"))
]
)

# Fall back to subdirectories as curves logic
if not log_files:
print(" - No log files (.log or .txt) found in directory.")
return {}

# Process log files, each becomes a curve
all_results = {}
print(" - No JSON files found at top level → scanning sub-folders.")
for entry in os.listdir(benchmark_path):
folder_full_path = os.path.join(benchmark_path, entry)
if os.path.isdir(folder_full_path):
samples = load_one_folder(folder_full_path)
if samples:
all_results[entry] = samples
print(f" - Folder '{entry}' loaded {len(samples)} samples.")
print(f"Total folders loaded: {len(all_results)}")
print(f" - Found {len(log_files)} log file(s) → each becomes a curve.")
for log_file in log_files:
log_file_path = os.path.join(benchmark_path, log_file)
samples = parse_logs_to_data(log_file_path)
if not samples:
continue

curve_name = os.path.splitext(log_file)[0] or "benchmark"
all_results[curve_name] = samples
print(f" - Curve '{curve_name}': {len(samples)} samples.")

if not all_results:
print(" - No valid data found in any log file.")
return {}

print(f"Total curves loaded: {len(all_results)}")
return all_results


Expand Down Expand Up @@ -526,7 +580,6 @@ def print_stat_info(
rectified_speedups.append(regularized_speedup)

# ES(t) calculation: based on state change
rec_speedup_fake_degrad = 0
if t_key < 1:
if fail_type is not None or speedup is None:
rec_speedup_fake_degrad = fpdb
Expand Down