diff --git a/benchmarks/swe_bench/eval_infer.py b/benchmarks/swe_bench/eval_infer.py index c7fc0fe9..2883a799 100644 --- a/benchmarks/swe_bench/eval_infer.py +++ b/benchmarks/swe_bench/eval_infer.py @@ -212,8 +212,8 @@ def main() -> None: parser.add_argument( "--model-name", - default="OpenHands", - help="Model name to use in the model_name_or_path field (default: OpenHands)", + default="openhands", + help="Model name to use in the model_name_or_path field (default: openhands)", ) parser.add_argument( diff --git a/benchmarks/utils/evaluation.py b/benchmarks/utils/evaluation.py index cf862a37..7cf5d97b 100644 --- a/benchmarks/utils/evaluation.py +++ b/benchmarks/utils/evaluation.py @@ -4,8 +4,10 @@ import json import os +import sys from abc import ABC, abstractmethod from concurrent.futures import ProcessPoolExecutor, as_completed +from contextlib import contextmanager from typing import Callable, List, Optional, Tuple from pydantic import BaseModel, Field @@ -35,6 +37,17 @@ class Evaluation(ABC, BaseModel): metadata: EvalMetadata num_workers: int = Field(default=1, ge=1) + def model_post_init(self, __context) -> None: + """Save metadata to output directory after initialization.""" + # Ensure output directory exists + os.makedirs(self.metadata.eval_output_dir, exist_ok=True) + + # Save metadata to JSON file + metadata_file = os.path.join(self.metadata.eval_output_dir, "metadata.json") + with open(metadata_file, "w", encoding="utf-8") as f: + f.write(self.metadata.model_dump_json(indent=2)) + logger.info(f"Saved metadata to {metadata_file}") + @property def output_path(self) -> str: return os.path.join(self.metadata.eval_output_dir, OUTPUT_FILENAME) @@ -247,9 +260,9 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None: logger.warning("on_result callback failed: %s", cb_err) # Run evaluation for this attempt - with ProcessPoolExecutor( - max_workers=self.num_workers, initializer=_child_init - ) as pool: + pool = ProcessPoolExecutor(max_workers=self.num_workers) + futures = [] + try: futures = [ pool.submit(self._process_one_mp, inst) for inst in instances_to_process @@ -271,6 +284,17 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None: stack_info=True, ) + # Normal completion - shutdown gracefully + pool.shutdown(wait=True) + except KeyboardInterrupt: + logger.warning("KeyboardInterrupt received, shutting down workers...") + self._cleanup_pool(pool, futures, wait=False) + logger.info("All workers terminated") + raise + except Exception: + self._cleanup_pool(pool, futures, wait=False) + raise + # Restore original temperature if attempt > 1 and original_temperature == 0.0: self.metadata.llm.temperature = original_temperature @@ -296,6 +320,34 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None: ) return all_outputs + def _cleanup_pool( + self, + pool: ProcessPoolExecutor, + futures: list, + wait: bool = False, + ) -> None: + """Clean up pool by canceling futures, terminating workers, and shutting down. + + Args: + pool: The ProcessPoolExecutor to clean up + futures: List of futures to cancel + wait: Whether to wait for workers to finish (True) or terminate immediately (False) + """ + # Cancel all pending futures + for fut in futures: + fut.cancel() + + # Forcefully terminate all worker processes if not waiting + if not wait and hasattr(pool, "_processes") and pool._processes: + for process in pool._processes.values(): + try: + process.terminate() + except Exception: + pass + + # Shutdown the pool + pool.shutdown(wait=wait, cancel_futures=True) + # --- Worker-side method (executed in child processes) --------------------------- def _process_one_mp( self, instance: EvalInstance @@ -307,67 +359,157 @@ def _process_one_mp( - Ensures proper context-managed cleanup - Returns (instance, output) so the parent can stream results """ - logger.info("[child] start id=%s", instance.id) + # Set up instance-specific logging + log_dir = os.path.join(self.metadata.eval_output_dir, "logs") + reset_logger_for_multiprocessing(log_dir, instance.id) - retry_count = 0 - last_error = None - max_retries = self.metadata.max_retries + # Get log file path for stdout/stderr redirection + log_file = os.path.join(log_dir, f"instance_{instance.id}.output.log") - while retry_count <= max_retries: - workspace = None - try: - workspace = self.prepare_workspace(instance) - out = self.evaluate_instance(instance, workspace) - logger.info("[child] done id=%s", instance.id) - return instance, out - except Exception as e: - last_error = e - retry_count += 1 - - if retry_count <= max_retries: - logger.warning( - f"[child] Instance {instance.id} failed " - f"(attempt {retry_count}/{max_retries}): " - f"{str(e)[:50]}" - ) - else: - logger.error( - f"[child] Instance {instance.id} failed after " - f"{max_retries} retries. Last error: {str(e)[:50]}", - exc_info=True, - ) - # Create error output for final failure - error_output = self._create_error_output( - instance, last_error, max_retries - ) - return instance, error_output - finally: - # Ensure workspace cleanup happens regardless of success or failure - if workspace is not None: - try: - # Use the context manager protocol for cleanup - workspace.__exit__(None, None, None) - logger.debug( - "[child] cleaned up workspace for id=%s", instance.id - ) - except Exception as cleanup_error: + # Redirect stdout/stderr to capture all output (SDK visualizations, etc.) + with redirect_stdout_stderr(log_file): + logger.info("[child] start id=%s", instance.id) + + retry_count = 0 + last_error = None + max_retries = self.metadata.max_retries + + while retry_count <= max_retries: + workspace = None + try: + workspace = self.prepare_workspace(instance) + out = self.evaluate_instance(instance, workspace) + logger.info("[child] done id=%s", instance.id) + return instance, out + except Exception as e: + last_error = e + retry_count += 1 + + if retry_count <= max_retries: logger.warning( - f"[child] Failed to cleanup workspace for {instance.id}: " - f"{str(cleanup_error)[:50]}" + f"[child] Instance {instance.id} failed " + f"(attempt {retry_count}/{max_retries}): " + f"{str(e)[:50]}" + ) + else: + logger.error( + f"[child] Instance {instance.id} failed after " + f"{max_retries} retries. Last error: {str(e)[:50]}", + exc_info=True, + ) + # Create error output for final failure + error_output = self._create_error_output( + instance, last_error, max_retries ) + return instance, error_output + finally: + # Ensure workspace cleanup happens regardless of success or failure + if workspace is not None: + try: + # Use the context manager protocol for cleanup + workspace.__exit__(None, None, None) + logger.debug( + "[child] cleaned up workspace for id=%s", instance.id + ) + except Exception as cleanup_error: + logger.warning( + f"[child] Failed to cleanup workspace for {instance.id}: " + f"{str(cleanup_error)[:50]}" + ) + + # This should never be reached, but added for type safety + error_output = self._create_error_output( + instance, Exception("Unexpected error: no attempts made"), max_retries + ) + return instance, error_output - # This should never be reached, but added for type safety - error_output = self._create_error_output( - instance, Exception("Unexpected error: no attempts made"), max_retries - ) - return instance, error_output + +# ---------- Multiprocessing logging helper --------------------------------------- -# ---------- Optional per-process initializer --------------------------------------- +def reset_logger_for_multiprocessing(log_dir: str, instance_id: str) -> None: + """Reset the logger for multiprocessing with instance-specific logging. + Save logs to a separate file for each instance, instead of trying to write to the + same file/console from multiple processes. This provides: + - One INFO line to console at start with tail hint + - All subsequent logs go to instance-specific file + - Only WARNING+ messages go to console after initial message -def _child_init() -> None: - """Per-process initializer (placeholder). - Put signal handlers or per-process setup here if needed. + Args: + log_dir: Directory to store log files + instance_id: Unique identifier for the instance being processed """ - pass + import logging + + # Set up logger + log_file = os.path.join(log_dir, f"instance_{instance_id}.log") + + # Get root logger and remove all existing handlers + root_logger = logging.getLogger() + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Create console handler for initial message + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter( + logging.Formatter( + f"Instance {instance_id} - " + "%(asctime)s - %(levelname)s - %(message)s" + ) + ) + root_logger.addHandler(console_handler) + root_logger.setLevel(logging.DEBUG) + + # Print one INFO line with helpful hint + root_logger.info( + f"Starting evaluation for instance {instance_id}.\n" + f'Hint: run "tail -f {log_file}" to see live logs in a separate shell' + ) + + # Now set console to WARNING+ only + console_handler.setLevel(logging.WARNING) + + # Add file handler for detailed logs + os.makedirs(log_dir, exist_ok=True) + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + file_handler.setLevel(logging.INFO) + root_logger.addHandler(file_handler) + + +@contextmanager +def redirect_stdout_stderr(log_file_path: str): + """Context manager to redirect stdout/stderr to a log file. + + This captures all print() statements, SDK visualizations, and any other + output that goes to stdout/stderr. + + Args: + log_file_path: Path to the log file where output should be redirected + """ + # Save original stdout/stderr + original_stdout = sys.stdout + original_stderr = sys.stderr + log_file = None + + try: + # Open log file in append mode with line buffering + log_file = open(log_file_path, "a", buffering=1, encoding="utf-8") + + # Redirect stdout and stderr + sys.stdout = log_file + sys.stderr = log_file + + yield + + finally: + # Restore original stdout/stderr + sys.stdout = original_stdout + sys.stderr = original_stderr + + # Close the log file if it was opened + if log_file is not None and not log_file.closed: + log_file.close() diff --git a/tests/test_keyboard_interrupt.py b/tests/test_keyboard_interrupt.py new file mode 100644 index 00000000..7dc743f0 --- /dev/null +++ b/tests/test_keyboard_interrupt.py @@ -0,0 +1,280 @@ +"""Tests for KeyboardInterrupt handling in the evaluation module.""" + +import os +import signal +import subprocess +import sys +import tempfile +import time + +import psutil +import pytest + + +# Helper script that will be run as subprocess +EVALUATION_SCRIPT = """ +import os +import time +import sys +from typing import List +from unittest.mock import Mock + +# Add parent directory to path +sys.path.insert(0, "{project_root}") + +from benchmarks.utils.evaluation import Evaluation +from benchmarks.utils.models import EvalInstance, EvalMetadata, EvalOutput +from openhands.sdk import LLM +from openhands.sdk.workspace import RemoteWorkspace + + +class TestEvaluation(Evaluation): + def prepare_instances(self) -> List[EvalInstance]: + return [ + EvalInstance(id=f"test_instance_{{i}}", data={{"test": "data"}}) + for i in range(10) + ] + + def prepare_workspace(self, instance: EvalInstance) -> RemoteWorkspace: + mock_workspace = Mock(spec=RemoteWorkspace) + mock_workspace.__enter__ = Mock(return_value=mock_workspace) + mock_workspace.__exit__ = Mock(return_value=None) + return mock_workspace + + def evaluate_instance( + self, instance: EvalInstance, workspace: RemoteWorkspace + ) -> EvalOutput: + # Simulate long-running task + time.sleep(60) # Long sleep + return EvalOutput( + instance_id=instance.id, + test_result={{"success": True}}, + instruction="test instruction", + error=None, + history=[], + instance=instance.data, + ) + + +if __name__ == "__main__": + llm = LLM(model="test-model") + metadata = EvalMetadata( + llm=llm, + dataset="test", + dataset_split="test", + max_iterations=10, + eval_output_dir="{tmpdir}", + details={{}}, + eval_limit=0, + max_attempts=1, + max_retries=0, + ) + + evaluation = TestEvaluation(metadata=metadata, num_workers=4) + + print("PID:{{}}".format(os.getpid()), flush=True) + + try: + evaluation.run() + except KeyboardInterrupt: + print("KeyboardInterrupt caught", flush=True) + sys.exit(0) +""" + + +def get_child_processes(parent_pid: int) -> list: + """Get all child processes of a parent process recursively.""" + try: + parent = psutil.Process(parent_pid) + children = parent.children(recursive=True) + return children + except psutil.NoSuchProcess: + return [] + + +def test_keyboard_interrupt_cleanup(): + """Test that all child processes are properly cleaned up on KeyboardInterrupt.""" + # Get project root + project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create the test script + script_path = os.path.join(tmpdir, "test_eval.py") + with open(script_path, "w") as f: + f.write(EVALUATION_SCRIPT.format(project_root=project_root, tmpdir=tmpdir)) + + # Start the evaluation in a subprocess + print("\n=== Starting evaluation subprocess ===") + process = subprocess.Popen( + [sys.executable, script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + + # Wait for the process to start and get its PID + eval_pid = None + start_time = time.time() + stdout_lines = [] + + assert process.stdout is not None, "Process stdout is None" + + while time.time() - start_time < 10: + # Check if process is still running + if process.poll() is not None: + # Process died, get all output + stdout_rest, stderr_rest = process.communicate() + print(f"Process died with code: {process.returncode}") + print(f"STDOUT: {stdout_rest}") + print(f"STDERR: {stderr_rest}") + break + + try: + # Try to read the PID from stdout + line = process.stdout.readline() + if line: + print(f"Got line: {line.strip()}") + stdout_lines.append(line) + if line.startswith("PID:"): + eval_pid = int(line.split(":")[1].strip()) + print(f"Evaluation process PID: {eval_pid}") + break + except Exception as e: + print(f"Error reading PID: {e}") + time.sleep(0.1) + + if eval_pid is None and process.stderr is not None: + # Try to get any error output + try: + stderr_content = process.stderr.read() + print(f"\nSTDERR output:\n{stderr_content}") + except Exception: + pass + + assert eval_pid is not None, ( + f"Could not get evaluation process PID. Stdout: {stdout_lines}" + ) + + # Wait for worker processes to start + print("Waiting for workers to start...") + time.sleep(3) + + # Get child processes before interrupt + children_before = get_child_processes(eval_pid) + python_workers_before = [ + p for p in children_before if "python" in p.name().lower() + ] + print(f"Worker processes before interrupt: {len(python_workers_before)}") + print(f"Worker PIDs: {[p.pid for p in python_workers_before]}") + + # Verify we have worker processes + assert len(python_workers_before) > 0, ( + f"No worker processes found. All children: {[(p.pid, p.name()) for p in children_before]}" + ) + + # Send SIGINT to the subprocess + print("\n=== Sending SIGINT ===") + process.send_signal(signal.SIGINT) + + # Wait for process to exit + try: + process.wait(timeout=10) + print(f"Process exited with code: {process.returncode}") + except subprocess.TimeoutExpired: + print("Process did not exit in time, force killing") + process.kill() + process.wait() + + # Give a moment for cleanup + time.sleep(2) + + # Check if all worker processes are gone + remaining_workers = [] + for worker in python_workers_before: + try: + if psutil.pid_exists(worker.pid): + proc = psutil.Process(worker.pid) + # Check if it's still the same process (not reused PID) + if proc.create_time() == worker.create_time(): + remaining_workers.append(worker.pid) + except psutil.NoSuchProcess: + pass # Process is gone, which is what we want + + print("\n=== Results ===") + print(f"Worker processes before: {len(python_workers_before)}") + print(f"Remaining workers: {len(remaining_workers)}") + if remaining_workers: + print(f"Remaining PIDs: {remaining_workers}") + + # Assert all workers are cleaned up + assert len(remaining_workers) == 0, ( + f"Worker processes still running after SIGINT: {remaining_workers}" + ) + + print("✓ All child processes cleaned up successfully") + + +def test_keyboard_interrupt_immediate(): + """Test cleanup when interrupt happens very early.""" + project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + with tempfile.TemporaryDirectory() as tmpdir: + script_path = os.path.join(tmpdir, "test_eval.py") + with open(script_path, "w") as f: + f.write(EVALUATION_SCRIPT.format(project_root=project_root, tmpdir=tmpdir)) + + print("\n=== Testing immediate interrupt ===") + process = subprocess.Popen( + [sys.executable, script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + # Get PID + eval_pid = None + assert process.stdout is not None, "Process stdout is None" + for _ in range(50): + try: + line = process.stdout.readline() + if line.startswith("PID:"): + eval_pid = int(line.split(":")[1].strip()) + break + except Exception: + pass + time.sleep(0.1) + + assert eval_pid is not None, "Could not get PID" + + # Send interrupt almost immediately + time.sleep(0.5) + process.send_signal(signal.SIGINT) + + # Wait for cleanup + try: + process.wait(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + + time.sleep(1) + + # Verify no zombie processes + try: + parent = psutil.Process(eval_pid) + remaining = parent.children(recursive=True) + except psutil.NoSuchProcess: + remaining = [] + + python_workers = [p for p in remaining if "python" in p.name().lower()] + + assert len(python_workers) == 0, ( + f"Worker processes still running: {[p.pid for p in python_workers]}" + ) + + print("✓ Immediate interrupt handled correctly") + + +if __name__ == "__main__": + # Run tests with verbose output + pytest.main([__file__, "-v", "-s"])