From 5cd34360add57d6f764d354f977b74102158810c Mon Sep 17 00:00:00 2001 From: jvm Date: Sun, 18 May 2025 15:37:28 +0200 Subject: [PATCH] Reformatted code with $ black openevolve tests examples --- examples/function_minimization/evaluator.py | 131 ++++---- .../function_minimization/initial_program.py | 18 +- openevolve/cli.py | 106 +++---- openevolve/config.py | 69 +++-- openevolve/controller.py | 208 +++++++------ openevolve/database.py | 289 +++++++++--------- openevolve/evaluator.py | 164 +++++----- openevolve/llm/__init__.py | 1 + openevolve/llm/base.py | 10 +- openevolve/llm/ensemble.py | 39 +-- openevolve/llm/openai.py | 36 +-- openevolve/prompt/__init__.py | 1 + openevolve/prompt/sampler.py | 162 +++++----- openevolve/prompt/templates.py | 11 +- openevolve/utils/__init__.py | 9 +- openevolve/utils/async_utils.py | 63 ++-- openevolve/utils/code_utils.py | 89 +++--- tests/test_basic.py | 57 ++-- 18 files changed, 736 insertions(+), 727 deletions(-) diff --git a/examples/function_minimization/evaluator.py b/examples/function_minimization/evaluator.py index efffaee6f..d21d567d0 100644 --- a/examples/function_minimization/evaluator.py +++ b/examples/function_minimization/evaluator.py @@ -1,6 +1,7 @@ """ Evaluator for the function minimization example """ + import importlib.util import numpy as np import time @@ -9,16 +10,17 @@ import traceback import sys + def run_with_timeout(func, args=(), kwargs={}, timeout_seconds=5): """ Run a function with a timeout using concurrent.futures - + Args: func: Function to run args: Arguments to pass to the function kwargs: Keyword arguments to pass to the function timeout_seconds: Timeout in seconds - + Returns: Result of the function or raises TimeoutError """ @@ -27,7 +29,10 @@ def run_with_timeout(func, args=(), kwargs={}, timeout_seconds=5): try: return future.result(timeout=timeout_seconds) except concurrent.futures.TimeoutError: - raise TimeoutError(f"Function {func.__name__} timed out after {timeout_seconds} seconds") + raise TimeoutError( + f"Function {func.__name__} timed out after {timeout_seconds} seconds" + ) + def safe_float(value): """Convert a value to float safely""" @@ -37,14 +42,15 @@ def safe_float(value): print(f"Warning: Could not convert {value} of type {type(value)} to float") return 0.0 + def evaluate(program_path): """ Evaluate the program by running it multiple times and checking how close it gets to the known global minimum. - + Args: program_path: Path to the program file - + Returns: Dictionary of metrics """ @@ -52,13 +58,13 @@ def evaluate(program_path): GLOBAL_MIN_X = -1.76 GLOBAL_MIN_Y = -1.03 GLOBAL_MIN_VALUE = -2.104 - + try: # Load the program spec = importlib.util.spec_from_file_location("program", program_path) program = importlib.util.module_from_spec(spec) spec.loader.exec_module(program) - + # Check if the required function exists if not hasattr(program, "run_search"): print(f"Error: program does not have 'run_search' function") @@ -67,67 +73,77 @@ def evaluate(program_path): "distance_score": 0.0, "speed_score": 0.0, "combined_score": 0.0, - "error": "Missing run_search function" + "error": "Missing run_search function", } - + # Run multiple trials num_trials = 10 values = [] distances = [] times = [] success_count = 0 - + for trial in range(num_trials): try: start_time = time.time() - + # Run with timeout result = run_with_timeout(program.run_search, timeout_seconds=5) - + # Check if we got a tuple of 3 values if not isinstance(result, tuple) or len(result) != 3: - print(f"Trial {trial}: Invalid result format, expected tuple of 3 values but got {type(result)}") + print( + f"Trial {trial}: Invalid result format, expected tuple of 3 values but got {type(result)}" + ) continue - + x, y, value = result - + end_time = time.time() - + # Ensure all values are float x = safe_float(x) y = safe_float(y) value = safe_float(value) - + # Check if the result is valid (not NaN or infinite) - if (np.isnan(x) or np.isnan(y) or np.isnan(value) or - np.isinf(x) or np.isinf(y) or np.isinf(value)): + if ( + np.isnan(x) + or np.isnan(y) + or np.isnan(value) + or np.isinf(x) + or np.isinf(y) + or np.isinf(value) + ): print(f"Trial {trial}: Invalid result, got x={x}, y={y}, value={value}") continue - + # Calculate metrics x_diff = safe_float(x) - GLOBAL_MIN_X y_diff = safe_float(y) - GLOBAL_MIN_Y distance_to_global = np.sqrt(x_diff**2 + y_diff**2) value_difference = abs(value - GLOBAL_MIN_VALUE) - + values.append(float(value)) distances.append(float(distance_to_global)) times.append(float(end_time - start_time)) success_count += 1 - + except TimeoutError as e: print(f"Trial {trial}: {str(e)}") continue except IndexError as e: # Specifically handle IndexError which often happens with early termination checks print(f"Trial {trial}: IndexError - {str(e)}") - print("This is likely due to a list index check before the list is fully populated.") + print( + "This is likely due to a list index check before the list is fully populated." + ) continue except Exception as e: print(f"Trial {trial}: Error - {str(e)}") print(traceback.format_exc()) continue - + # If all trials failed, return zero scores if success_count == 0: return { @@ -135,31 +151,33 @@ def evaluate(program_path): "distance_score": 0.0, "speed_score": 0.0, "combined_score": 0.0, - "error": "All trials failed" + "error": "All trials failed", } - + # Calculate metrics avg_value = float(np.mean(values)) avg_distance = float(np.mean(distances)) avg_time = float(np.mean(times)) if times else 1.0 - + # Convert to scores (higher is better) value_score = float(1.0 / (1.0 + abs(avg_value - GLOBAL_MIN_VALUE))) # Normalize and invert distance_score = float(1.0 / (1.0 + avg_distance)) speed_score = float(1.0 / avg_time) if avg_time > 0 else 0.0 - + # Normalize speed score (so it doesn't dominate) speed_score = float(min(speed_score, 10.0) / 10.0) - + # Add reliability score based on success rate reliability_score = float(success_count / num_trials) - + # Calculate a single combined score that prioritizes finding good solutions # over secondary metrics like speed and reliability # Value and distance scores (quality of solution) get 90% of the weight # Speed and reliability get only 10% combined - combined_score = float(0.6 * value_score + 0.3 * distance_score + 0.05 * speed_score + 0.05 * reliability_score) - + combined_score = float( + 0.6 * value_score + 0.3 * distance_score + 0.05 * speed_score + 0.05 * reliability_score + ) + # Also compute an "overall" score that will be the primary metric for selection # This adds a bonus for finding solutions close to the global minimum # and heavily penalizes solutions that aren't finding the right region @@ -169,10 +187,10 @@ def evaluate(program_path): solution_quality = 0.5 else: # Not finding the right region solution_quality = 0.1 - + # Overall score is dominated by solution quality but also factors in the combined score overall_score = 0.8 * solution_quality + 0.2 * combined_score - + return { "value_score": value_score, "distance_score": distance_score, @@ -180,7 +198,7 @@ def evaluate(program_path): "reliability_score": reliability_score, "combined_score": combined_score, "overall_score": overall_score, # This will be the primary selection metric - "success_rate": reliability_score + "success_rate": reliability_score, } except Exception as e: print(f"Evaluation failed completely: {str(e)}") @@ -190,9 +208,10 @@ def evaluate(program_path): "distance_score": 0.0, "speed_score": 0.0, "combined_score": 0.0, - "error": str(e) + "error": str(e), } + # Stage-based evaluation for cascade evaluation def evaluate_stage1(program_path): """First stage evaluation with fewer trials""" @@ -200,49 +219,58 @@ def evaluate_stage1(program_path): GLOBAL_MIN_X = float(-1.76) GLOBAL_MIN_Y = float(-1.03) GLOBAL_MIN_VALUE = float(-2.104) - + # Quick check to see if the program runs without errors try: # Load the program spec = importlib.util.spec_from_file_location("program", program_path) program = importlib.util.module_from_spec(spec) spec.loader.exec_module(program) - + # Check if the required function exists if not hasattr(program, "run_search"): print(f"Stage 1 validation: Program does not have 'run_search' function") return {"runs_successfully": 0.0, "error": "Missing run_search function"} - + try: # Run a single trial with timeout result = run_with_timeout(program.run_search, timeout_seconds=5) - + # Check if we got a tuple of 3 values if not isinstance(result, tuple) or len(result) != 3: - print(f"Stage 1: Invalid result format, expected tuple of 3 values but got {type(result)}") + print( + f"Stage 1: Invalid result format, expected tuple of 3 values but got {type(result)}" + ) return {"runs_successfully": 0.0, "error": "Invalid result format"} - + x, y, value = result - + # Ensure all values are float x = safe_float(x) y = safe_float(y) value = safe_float(value) - + # Check if the result is valid - if np.isnan(x) or np.isnan(y) or np.isnan(value) or np.isinf(x) or np.isinf(y) or np.isinf(value): + if ( + np.isnan(x) + or np.isnan(y) + or np.isnan(value) + or np.isinf(x) + or np.isinf(y) + or np.isinf(value) + ): print(f"Stage 1 validation: Invalid result, got x={x}, y={y}, value={value}") return {"runs_successfully": 0.5, "error": "Invalid result values"} - + # Calculate distance safely x_diff = float(x) - GLOBAL_MIN_X y_diff = float(y) - GLOBAL_MIN_Y distance = float(np.sqrt(x_diff**2 + y_diff**2)) - + # Calculate value-based score value_score = float(1.0 / (1.0 + abs(value - GLOBAL_MIN_VALUE))) distance_score = float(1.0 / (1.0 + distance)) - + # Calculate solution quality metric if distance < 1.0: # Very close to the correct solution solution_quality = 1.0 @@ -250,7 +278,7 @@ def evaluate_stage1(program_path): solution_quality = 0.5 else: # Not finding the right region solution_quality = 0.1 - + # Basic metrics with overall score return { "runs_successfully": 1.0, @@ -258,7 +286,7 @@ def evaluate_stage1(program_path): "distance": distance, "value_score": value_score, "distance_score": distance_score, - "overall_score": solution_quality # This becomes a strong guiding metric + "overall_score": solution_quality, # This becomes a strong guiding metric } except TimeoutError as e: print(f"Stage 1 evaluation timed out: {e}") @@ -272,12 +300,13 @@ def evaluate_stage1(program_path): print(f"Stage 1 evaluation failed: {e}") print(traceback.format_exc()) return {"runs_successfully": 0.0, "error": str(e)} - + except Exception as e: print(f"Stage 1 evaluation failed: {e}") print(traceback.format_exc()) return {"runs_successfully": 0.0, "error": str(e)} + def evaluate_stage2(program_path): """Second stage evaluation with more thorough testing""" # Full evaluation as in the main evaluate function diff --git a/examples/function_minimization/initial_program.py b/examples/function_minimization/initial_program.py index 41733beab..652d03fdf 100644 --- a/examples/function_minimization/initial_program.py +++ b/examples/function_minimization/initial_program.py @@ -2,14 +2,15 @@ """Function minimization example for OpenEvolve""" import numpy as np + def search_algorithm(iterations=1000, bounds=(-5, 5)): """ A simple random search algorithm that often gets stuck in local minima. - + Args: iterations: Number of iterations to run bounds: Bounds for the search space (min, max) - + Returns: Tuple of (best_x, best_y, best_value) """ @@ -17,29 +18,34 @@ def search_algorithm(iterations=1000, bounds=(-5, 5)): best_x = np.random.uniform(bounds[0], bounds[1]) best_y = np.random.uniform(bounds[0], bounds[1]) best_value = evaluate_function(best_x, best_y) - + for _ in range(iterations): # Simple random search x = np.random.uniform(bounds[0], bounds[1]) y = np.random.uniform(bounds[0], bounds[1]) value = evaluate_function(x, y) - + if value < best_value: best_value = value best_x, best_y = x, y - + return best_x, best_y, best_value + def evaluate_function(x, y): """The complex function we're trying to minimize""" - return np.sin(x) * np.cos(y) + np.sin(x*y) + (x**2 + y**2)/20 + return np.sin(x) * np.cos(y) + np.sin(x * y) + (x**2 + y**2) / 20 + + # EVOLVE-BLOCK-END + # This part remains fixed (not evolved) def run_search(): x, y, value = search_algorithm() return x, y, value + if __name__ == "__main__": x, y, value = run_search() print(f"Found minimum at ({x}, {y}) with value {value}") diff --git a/openevolve/cli.py b/openevolve/cli.py index 6ca7bcfe6..4666eb032 100644 --- a/openevolve/cli.py +++ b/openevolve/cli.py @@ -1,6 +1,7 @@ """ Command-line interface for OpenEvolve """ + import argparse import asyncio import logging @@ -17,113 +18,79 @@ def parse_args() -> argparse.Namespace: """Parse command-line arguments""" parser = argparse.ArgumentParser(description="OpenEvolve - Evolutionary coding agent") - - parser.add_argument( - "initial_program", - help="Path to the initial program file" - ) - - parser.add_argument( - "evaluation_file", - help="Path to the evaluation file containing an 'evaluate' function" - ) - - parser.add_argument( - "--config", - "-c", - help="Path to configuration file (YAML)", - default=None - ) - + + parser.add_argument("initial_program", help="Path to the initial program file") + parser.add_argument( - "--output", - "-o", - help="Output directory for results", - default=None + "evaluation_file", help="Path to the evaluation file containing an 'evaluate' function" ) - + + parser.add_argument("--config", "-c", help="Path to configuration file (YAML)", default=None) + + parser.add_argument("--output", "-o", help="Output directory for results", default=None) + parser.add_argument( - "--iterations", - "-i", - help="Maximum number of iterations", - type=int, - default=None + "--iterations", "-i", help="Maximum number of iterations", type=int, default=None ) - + parser.add_argument( - "--target-score", - "-t", - help="Target score to reach", - type=float, - default=None + "--target-score", "-t", help="Target score to reach", type=float, default=None ) - + parser.add_argument( "--log-level", "-l", help="Logging level", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], - default="INFO" - ) - - parser.add_argument( - "--api-base", - help="Base URL for the LLM API", - default=None - ) - - parser.add_argument( - "--primary-model", - help="Primary LLM model name", - default=None - ) - - parser.add_argument( - "--secondary-model", - help="Secondary LLM model name", - default=None + default="INFO", ) - + + parser.add_argument("--api-base", help="Base URL for the LLM API", default=None) + + parser.add_argument("--primary-model", help="Primary LLM model name", default=None) + + parser.add_argument("--secondary-model", help="Secondary LLM model name", default=None) + return parser.parse_args() async def main_async() -> int: """ Main asynchronous entry point - + Returns: Exit code """ args = parse_args() - + # Check if files exist if not os.path.exists(args.initial_program): print(f"Error: Initial program file '{args.initial_program}' not found") return 1 - + if not os.path.exists(args.evaluation_file): print(f"Error: Evaluation file '{args.evaluation_file}' not found") return 1 - + # Create config object with command-line overrides config = None if args.api_base or args.primary_model or args.secondary_model: # Load base config from file or defaults config = load_config(args.config) - + # Apply command-line overrides if args.api_base: config.llm.api_base = args.api_base print(f"Using API base: {config.llm.api_base}") - + if args.primary_model: config.llm.primary_model = args.primary_model print(f"Using primary model: {config.llm.primary_model}") - + if args.secondary_model: config.llm.secondary_model = args.secondary_model print(f"Using secondary model: {config.llm.secondary_model}") - + # Initialize OpenEvolve try: openevolve = OpenEvolve( @@ -133,27 +100,28 @@ async def main_async() -> int: config_path=args.config if config is None else None, output_dir=args.output, ) - + # Override log level if specified if args.log_level: logging.getLogger().setLevel(getattr(logging, args.log_level)) - + # Run evolution best_program = await openevolve.run( iterations=args.iterations, target_score=args.target_score, ) - + print(f"\nEvolution complete!") print(f"Best program metrics:") for name, value in best_program.metrics.items(): print(f" {name}: {value:.4f}") - + return 0 - + except Exception as e: print(f"Error: {str(e)}") import traceback + traceback.print_exc() return 1 @@ -161,7 +129,7 @@ async def main_async() -> int: def main() -> int: """ Main entry point - + Returns: Exit code """ diff --git a/openevolve/config.py b/openevolve/config.py index acd9e2a04..b04dc7c72 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -1,6 +1,7 @@ """ Configuration handling for OpenEvolve """ + import os from dataclasses import dataclass, field from pathlib import Path @@ -12,24 +13,24 @@ @dataclass class LLMConfig: """Configuration for LLM models""" - + # Primary model primary_model: str = "gemini-2.0-flash-lite" primary_model_weight: float = 0.8 - + # Secondary model secondary_model: str = "gemini-2.0-flash" secondary_model_weight: float = 0.2 - + # API configuration api_base: str = "https://api.openai.com/v1" api_key: Optional[str] = None - + # Generation parameters temperature: float = 0.7 top_p: float = 0.95 max_tokens: int = 4096 - + # Request parameters timeout: int = 60 retries: int = 3 @@ -39,18 +40,18 @@ class LLMConfig: @dataclass class PromptConfig: """Configuration for prompt generation""" - + template_dir: Optional[str] = None system_message: str = "You are an expert coder helping to improve programs through evolution." - + # Number of examples to include in the prompt num_top_programs: int = 3 num_diverse_programs: int = 2 - + # Template stochasticity use_template_stochasticity: bool = True template_variations: Dict[str, List[str]] = field(default_factory=dict) - + # Meta-prompting use_meta_prompting: bool = False meta_prompt_weight: float = 0.1 @@ -59,22 +60,22 @@ class PromptConfig: @dataclass class DatabaseConfig: """Configuration for the program database""" - + # General settings db_path: Optional[str] = None # Path to store database on disk in_memory: bool = True - + # Evolutionary parameters population_size: int = 1000 archive_size: int = 100 num_islands: int = 5 - + # Selection parameters elite_selection_ratio: float = 0.1 exploration_ratio: float = 0.2 exploitation_ratio: float = 0.7 diversity_metric: str = "edit_distance" # Options: "edit_distance", "feature_based" - + # Feature map dimensions for MAP-Elites feature_dimensions: List[str] = field(default_factory=lambda: ["score", "complexity"]) feature_bins: int = 10 @@ -83,23 +84,23 @@ class DatabaseConfig: @dataclass class EvaluatorConfig: """Configuration for program evaluation""" - + # General settings timeout: int = 300 # Maximum evaluation time in seconds max_retries: int = 3 - + # Resource limits for evaluation memory_limit_mb: Optional[int] = None cpu_limit: Optional[float] = None - + # Evaluation strategies cascade_evaluation: bool = True cascade_thresholds: List[float] = field(default_factory=lambda: [0.5, 0.75, 0.9]) - + # Parallel evaluation parallel_evaluations: int = 4 distributed: bool = False - + # LLM-based feedback use_llm_feedback: bool = False llm_feedback_weight: float = 0.1 @@ -108,43 +109,43 @@ class EvaluatorConfig: @dataclass class Config: """Master configuration for OpenEvolve""" - + # General settings max_iterations: int = 10000 checkpoint_interval: int = 100 log_level: str = "INFO" log_dir: Optional[str] = None random_seed: Optional[int] = None - + # Component configurations llm: LLMConfig = field(default_factory=LLMConfig) prompt: PromptConfig = field(default_factory=PromptConfig) database: DatabaseConfig = field(default_factory=DatabaseConfig) evaluator: EvaluatorConfig = field(default_factory=EvaluatorConfig) - + # Evolution settings diff_based_evolution: bool = True allow_full_rewrites: bool = False max_code_length: int = 10000 - + @classmethod def from_yaml(cls, path: Union[str, Path]) -> "Config": """Load configuration from a YAML file""" - with open(path, 'r') as f: + with open(path, "r") as f: config_dict = yaml.safe_load(f) return cls.from_dict(config_dict) - + @classmethod def from_dict(cls, config_dict: Dict[str, Any]) -> "Config": """Create configuration from a dictionary""" # Handle nested configurations config = Config() - + # Update top-level fields for key, value in config_dict.items(): if key not in ["llm", "prompt", "database", "evaluator"] and hasattr(config, key): setattr(config, key, value) - + # Update nested configs if "llm" in config_dict: config.llm = LLMConfig(**config_dict["llm"]) @@ -154,9 +155,9 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "Config": config.database = DatabaseConfig(**config_dict["database"]) if "evaluator" in config_dict: config.evaluator = EvaluatorConfig(**config_dict["evaluator"]) - + return config - + def to_dict(self) -> Dict[str, Any]: """Convert configuration to a dictionary""" return { @@ -166,7 +167,6 @@ def to_dict(self) -> Dict[str, Any]: "log_level": self.log_level, "log_dir": self.log_dir, "random_seed": self.random_seed, - # Component configurations "llm": { "primary_model": self.llm.primary_model, @@ -216,16 +216,15 @@ def to_dict(self) -> Dict[str, Any]: "use_llm_feedback": self.evaluator.use_llm_feedback, "llm_feedback_weight": self.evaluator.llm_feedback_weight, }, - # Evolution settings "diff_based_evolution": self.diff_based_evolution, "allow_full_rewrites": self.allow_full_rewrites, "max_code_length": self.max_code_length, } - + def to_yaml(self, path: Union[str, Path]) -> None: """Save configuration to a YAML file""" - with open(path, 'w') as f: + with open(path, "w") as f: yaml.dump(self.to_dict(), f, default_flow_style=False) @@ -233,15 +232,15 @@ def load_config(config_path: Optional[Union[str, Path]] = None) -> Config: """Load configuration from a YAML file or use defaults""" if config_path and os.path.exists(config_path): return Config.from_yaml(config_path) - + # Use environment variables if available api_key = os.environ.get("OPENAI_API_KEY") api_base = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1") - + config = Config() if api_key: config.llm.api_key = api_key if api_base: config.llm.api_base = api_base - + return config diff --git a/openevolve/controller.py b/openevolve/controller.py index 575d2c4f7..8569bd677 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -1,6 +1,7 @@ """ Main controller for OpenEvolve """ + import asyncio import logging import os @@ -30,17 +31,17 @@ class OpenEvolve: """ Main controller for OpenEvolve - + Orchestrates the evolution process, coordinating between the prompt sampler, LLM ensemble, evaluator, and program database. - + Features: - Tracks the absolute best program across evolution steps - Ensures the best solution is not lost during the MAP-Elites process - Always includes the best program in the selection process for inspiration - Maintains detailed logs and metadata about improvements """ - + def __init__( self, initial_program_path: str, @@ -56,21 +57,21 @@ def __init__( else: # Load from file or use defaults self.config = load_config(config_path) - + # Set up output directory self.output_dir = output_dir or os.path.join( os.path.dirname(initial_program_path), "openevolve_output" ) os.makedirs(self.output_dir, exist_ok=True) - + # Set up logging self._setup_logging() - + # Load initial program self.initial_program_path = initial_program_path self.initial_program_code = self._load_initial_program() self.language = extract_code_language(self.initial_program_code) - + # Extract file extension from initial program self.file_extension = os.path.splitext(initial_program_path)[1] if not self.file_extension: @@ -80,53 +81,44 @@ def __init__( # Make sure it starts with a dot if not self.file_extension.startswith("."): self.file_extension = f".{self.file_extension}" - + # Initialize components self.llm_ensemble = LLMEnsemble(self.config.llm) self.prompt_sampler = PromptSampler(self.config.prompt) self.database = ProgramDatabase(self.config.database) - self.evaluator = Evaluator( - self.config.evaluator, - evaluation_file, - self.llm_ensemble - ) - - logger.info( - f"Initialized OpenEvolve with {initial_program_path} " - f"and {evaluation_file}" - ) - + self.evaluator = Evaluator(self.config.evaluator, evaluation_file, self.llm_ensemble) + + logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") + def _setup_logging(self) -> None: """Set up logging""" log_dir = self.config.log_dir or os.path.join(self.output_dir, "logs") os.makedirs(log_dir, exist_ok=True) - + # Set up root logger root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, self.config.log_level)) - + # Add file handler log_file = os.path.join(log_dir, f"openevolve_{time.strftime('%Y%m%d_%H%M%S')}.log") file_handler = logging.FileHandler(log_file) - file_handler.setFormatter(logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - )) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) root_logger.addHandler(file_handler) - + # Add console handler console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter( - "%(asctime)s - %(levelname)s - %(message)s" - )) + console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) root_logger.addHandler(console_handler) - + logger.info(f"Logging to {log_file}") - + def _load_initial_program(self) -> str: """Load the initial program from file""" with open(self.initial_program_path, "r") as f: return f.read() - + async def run( self, iterations: Optional[int] = None, @@ -134,40 +126,40 @@ async def run( ) -> Program: """ Run the evolution process - + Args: iterations: Maximum number of iterations (uses config if None) target_score: Target score to reach (continues until reached if specified) - + Returns: Best program found """ max_iterations = iterations or self.config.max_iterations - + # Initialize the database with the initial program initial_program_id = str(uuid.uuid4()) - + # Evaluate the initial program initial_metrics = await self.evaluator.evaluate_program( self.initial_program_code, initial_program_id ) - + initial_program = Program( id=initial_program_id, code=self.initial_program_code, language=self.language, metrics=initial_metrics, ) - + self.database.add(initial_program) - + # Main evolution loop for i in range(max_iterations): iteration_start = time.time() - + # Sample parent and inspirations parent, inspirations = self.database.sample() - + # Build prompt prompt = self.prompt_sampler.build_prompt( current_program=parent.code, @@ -179,36 +171,36 @@ async def run( evolution_round=i, allow_full_rewrite=self.config.allow_full_rewrites, ) - + # Generate code modification try: llm_response = await self.llm_ensemble.generate_with_context( system_message=prompt["system"], messages=[{"role": "user", "content": prompt["user"]}], ) - + # Parse the response if self.config.diff_based_evolution: diff_blocks = extract_diffs(llm_response) - + if not diff_blocks: logger.warning(f"Iteration {i+1}: No valid diffs found in response") continue - + # Apply the diffs child_code = apply_diff(parent.code, llm_response) changes_summary = format_diff_summary(diff_blocks) else: # Parse full rewrite new_code = parse_full_rewrite(llm_response, self.language) - + if not new_code: logger.warning(f"Iteration {i+1}: No valid code found in response") continue - + child_code = new_code changes_summary = "Full rewrite" - + # Check code length if len(child_code) > self.config.max_code_length: logger.warning( @@ -216,13 +208,11 @@ async def run( f"({len(child_code)} > {self.config.max_code_length})" ) continue - + # Evaluate the child program child_id = str(uuid.uuid4()) - child_metrics = await self.evaluator.evaluate_program( - child_code, child_id - ) - + child_metrics = await self.evaluator.evaluate_program(child_code, child_id) + # Create a child program child_program = Program( id=child_id, @@ -236,71 +226,84 @@ async def run( "parent_metrics": parent.metrics, }, ) - + # Add to database self.database.add(child_program) - + # Log progress iteration_time = time.time() - iteration_start self._log_iteration(i, parent, child_program, iteration_time) - + # Specifically check if this is the new best program if self.database.best_program_id == child_program.id: - logger.info(f"🌟 New best solution found at iteration {i+1}: {child_program.id}") - logger.info(f"Metrics: {', '.join(f'{name}={value:.4f}' for name, value in child_program.metrics.items())}") - + logger.info( + f"🌟 New best solution found at iteration {i+1}: {child_program.id}" + ) + logger.info( + f"Metrics: {', '.join(f'{name}={value:.4f}' for name, value in child_program.metrics.items())}" + ) + # Save checkpoint if (i + 1) % self.config.checkpoint_interval == 0: self._save_checkpoint(i + 1) - + # Check if target score reached if target_score is not None: avg_score = sum(child_metrics.values()) / max(1, len(child_metrics)) if avg_score >= target_score: - logger.info( - f"Target score {target_score} reached after {i+1} iterations" - ) + logger.info(f"Target score {target_score} reached after {i+1} iterations") break - + except Exception as e: logger.error(f"Error in iteration {i+1}: {str(e)}") continue - + # Get the best program using our tracking mechanism best_program = None if self.database.best_program_id: best_program = self.database.get(self.database.best_program_id) logger.info(f"Using tracked best program: {self.database.best_program_id}") - + # Fallback to calculating best program if tracked program not found if best_program is None: best_program = self.database.get_best_program() logger.info("Using calculated best program (tracked program not found)") - + # Check if there's a better program by combined_score that wasn't tracked if "combined_score" in best_program.metrics: best_by_combined = self.database.get_best_program(metric="combined_score") - if best_by_combined and best_by_combined.id != best_program.id and "combined_score" in best_by_combined.metrics: + if ( + best_by_combined + and best_by_combined.id != best_program.id + and "combined_score" in best_by_combined.metrics + ): # If the combined_score of this program is significantly better, use it instead - if best_by_combined.metrics["combined_score"] > best_program.metrics["combined_score"] + 0.02: - logger.warning(f"Found program with better combined_score: {best_by_combined.id}") - logger.warning(f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}") + if ( + best_by_combined.metrics["combined_score"] + > best_program.metrics["combined_score"] + 0.02 + ): + logger.warning( + f"Found program with better combined_score: {best_by_combined.id}" + ) + logger.warning( + f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" + ) best_program = best_by_combined - + if best_program: logger.info( f"Evolution complete. Best program has metrics: " f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" ) - + # Save the best program (using our tracked best program) self._save_best_program() - + return best_program else: logger.warning("No valid programs found during evolution") return initial_program - + def _log_iteration( self, iteration: int, @@ -310,7 +313,7 @@ def _log_iteration( ) -> None: """ Log iteration progress - + Args: iteration: Iteration number parent: Parent program @@ -323,38 +326,36 @@ def _log_iteration( if metric in parent.metrics: diff = value - parent.metrics[metric] improvement[metric] = diff - - improvement_str = ", ".join( - f"{name}={diff:+.4f}" for name, diff in improvement.items() - ) - + + improvement_str = ", ".join(f"{name}={diff:+.4f}" for name, diff in improvement.items()) + logger.info( f"Iteration {iteration+1}: Child {child.id} from parent {parent.id} " f"in {elapsed_time:.2f}s. Metrics: " f"{', '.join(f'{name}={value:.4f}' for name, value in child.metrics.items())} " f"(Δ: {improvement_str})" ) - + def _save_checkpoint(self, iteration: int) -> None: """ Save a checkpoint - + Args: iteration: Current iteration number """ checkpoint_dir = os.path.join(self.output_dir, "checkpoints") os.makedirs(checkpoint_dir, exist_ok=True) - + # Save the database checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_{iteration}") self.database.save(checkpoint_path) - + logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") - + def _save_best_program(self, program: Optional[Program] = None) -> None: """ Save the best program - + Args: program: Best program (if None, uses the tracked best program) """ @@ -365,33 +366,38 @@ def _save_best_program(self, program: Optional[Program] = None) -> None: else: # Fallback to calculating best program if no tracked best program program = self.database.get_best_program() - + if not program: logger.warning("No best program found to save") return - + best_dir = os.path.join(self.output_dir, "best") os.makedirs(best_dir, exist_ok=True) - + # Use the extension from the initial program file filename = f"best_program{self.file_extension}" code_path = os.path.join(best_dir, filename) - + with open(code_path, "w") as f: f.write(program.code) - + # Save complete program info including metrics info_path = os.path.join(best_dir, "best_program_info.json") with open(info_path, "w") as f: import json - json.dump({ - "id": program.id, - "generation": program.generation, - "timestamp": program.timestamp, - "parent_id": program.parent_id, - "metrics": program.metrics, - "language": program.language, - "saved_at": time.time() - }, f, indent=2) - + + json.dump( + { + "id": program.id, + "generation": program.generation, + "timestamp": program.timestamp, + "parent_id": program.parent_id, + "metrics": program.metrics, + "language": program.language, + "saved_at": time.time(), + }, + f, + indent=2, + ) + logger.info(f"Saved best program to {code_path} with program info to {info_path}") diff --git a/openevolve/database.py b/openevolve/database.py index 008098179..92aff460a 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -1,6 +1,7 @@ """ Program database for OpenEvolve """ + import json import logging import os @@ -21,31 +22,31 @@ @dataclass class Program: """Represents a program in the database""" - + # Program identification id: str code: str language: str = "python" - + # Evolution information parent_id: Optional[str] = None generation: int = 0 timestamp: float = field(default_factory=time.time) - + # Performance metrics metrics: Dict[str, float] = field(default_factory=dict) - + # Derived features complexity: float = 0.0 diversity: float = 0.0 - + # Metadata metadata: Dict[str, Any] = field(default_factory=dict) - + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation""" return asdict(self) - + @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Program": """Create from dictionary representation""" @@ -55,138 +56,136 @@ def from_dict(cls, data: Dict[str, Any]) -> "Program": class ProgramDatabase: """ Database for storing and sampling programs during evolution - + The database implements a combination of MAP-Elites algorithm and island-based population model to maintain diversity during evolution. It also tracks the absolute best program separately to ensure it's never lost. """ - + def __init__(self, config: DatabaseConfig): self.config = config - + # In-memory program storage self.programs: Dict[str, Program] = {} - + # Feature grid for MAP-Elites self.feature_map: Dict[str, str] = {} self.feature_bins = config.feature_bins - + # Island populations self.islands: List[Set[str]] = [set() for _ in range(config.num_islands)] - + # Archive of elite programs self.archive: Set[str] = set() - + # Track the absolute best program separately self.best_program_id: Optional[str] = None - + # Load database from disk if path is provided if config.db_path and os.path.exists(config.db_path): self.load(config.db_path) - + logger.info(f"Initialized program database with {len(self.programs)} programs") - + def add(self, program: Program) -> str: """ Add a program to the database - + Args: program: Program to add - + Returns: Program ID """ # Store the program self.programs[program.id] = program - + # Calculate feature coordinates for MAP-Elites feature_coords = self._calculate_feature_coords(program) - + # Add to feature map (replacing existing if better) feature_key = self._feature_coords_to_key(feature_coords) if feature_key not in self.feature_map or self._is_better( program, self.programs[self.feature_map[feature_key]] ): self.feature_map[feature_key] = program.id - + # Add to an island (randomly) island_idx = random.randint(0, len(self.islands) - 1) self.islands[island_idx].add(program.id) - + # Update archive self._update_archive(program) - + # Update the absolute best program tracking self._update_best_program(program) - + # Save to disk if configured if self.config.db_path: self._save_program(program) - + logger.debug(f"Added program {program.id} to database") return program.id - + def get(self, program_id: str) -> Optional[Program]: """ Get a program by ID - + Args: program_id: Program ID - + Returns: Program or None if not found """ return self.programs.get(program_id) - + def sample(self) -> Tuple[Program, List[Program]]: """ Sample a program and inspirations for the next evolution step - + Returns: Tuple of (parent_program, inspiration_programs) """ # Select parent program parent = self._sample_parent() - + # Select inspirations inspirations = self._sample_inspirations(parent, n=5) - + logger.debug(f"Sampled parent {parent.id} and {len(inspirations)} inspirations") return parent, inspirations - + def get_best_program(self, metric: Optional[str] = None) -> Optional[Program]: """ Get the best program based on a metric - + Args: metric: Metric to use for ranking (uses combined_score or average if None) - + Returns: Best program or None if database is empty """ if not self.programs: return None - + # If no specific metric and we have a tracked best program, return it if metric is None and self.best_program_id and self.best_program_id in self.programs: logger.debug(f"Using tracked best program: {self.best_program_id}") return self.programs[self.best_program_id] - + if metric: # Sort by specific metric sorted_programs = sorted( [p for p in self.programs.values() if metric in p.metrics], key=lambda p: p.metrics[metric], - reverse=True + reverse=True, ) if sorted_programs: logger.debug(f"Found best program by metric '{metric}': {sorted_programs[0].id}") elif self.programs and all("combined_score" in p.metrics for p in self.programs.values()): # Sort by combined_score if it exists (preferred method) sorted_programs = sorted( - self.programs.values(), - key=lambda p: p.metrics["combined_score"], - reverse=True + self.programs.values(), key=lambda p: p.metrics["combined_score"], reverse=True ) if sorted_programs: logger.debug(f"Found best program by combined_score: {sorted_programs[0].id}") @@ -195,66 +194,69 @@ def get_best_program(self, metric: Optional[str] = None) -> Optional[Program]: sorted_programs = sorted( self.programs.values(), key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)), - reverse=True + reverse=True, ) if sorted_programs: logger.debug(f"Found best program by average metrics: {sorted_programs[0].id}") - + # Update the best program tracking if we found a better program - if sorted_programs and (self.best_program_id is None or - sorted_programs[0].id != self.best_program_id): + if sorted_programs and ( + self.best_program_id is None or sorted_programs[0].id != self.best_program_id + ): old_id = self.best_program_id self.best_program_id = sorted_programs[0].id logger.info(f"Updated best program tracking from {old_id} to {self.best_program_id}") - + # Also log the scores to help understand the update - if old_id and old_id in self.programs and "combined_score" in self.programs[old_id].metrics \ - and "combined_score" in self.programs[self.best_program_id].metrics: + if ( + old_id + and old_id in self.programs + and "combined_score" in self.programs[old_id].metrics + and "combined_score" in self.programs[self.best_program_id].metrics + ): old_score = self.programs[old_id].metrics["combined_score"] new_score = self.programs[self.best_program_id].metrics["combined_score"] - logger.info(f"Score change: {old_score:.4f} → {new_score:.4f} ({new_score-old_score:+.4f})") - + logger.info( + f"Score change: {old_score:.4f} → {new_score:.4f} ({new_score-old_score:+.4f})" + ) + return sorted_programs[0] if sorted_programs else None - - def get_top_programs( - self, - n: int = 10, - metric: Optional[str] = None - ) -> List[Program]: + + def get_top_programs(self, n: int = 10, metric: Optional[str] = None) -> List[Program]: """ Get the top N programs based on a metric - + Args: n: Number of programs to return metric: Metric to use for ranking (uses average if None) - + Returns: List of top programs """ if not self.programs: return [] - + if metric: # Sort by specific metric sorted_programs = sorted( [p for p in self.programs.values() if metric in p.metrics], key=lambda p: p.metrics[metric], - reverse=True + reverse=True, ) else: # Sort by average of all metrics sorted_programs = sorted( self.programs.values(), key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)), - reverse=True + reverse=True, ) - + return sorted_programs[:n] - + def save(self, path: Optional[str] = None) -> None: """ Save the database to disk - + Args: path: Path to save to (uses config.db_path if None) """ @@ -262,14 +264,14 @@ def save(self, path: Optional[str] = None) -> None: if not save_path: logger.warning("No database path specified, skipping save") return - + # Create directory if it doesn't exist os.makedirs(save_path, exist_ok=True) - + # Save each program for program in self.programs.values(): self._save_program(program, save_path) - + # Save metadata metadata = { "feature_map": self.feature_map, @@ -277,34 +279,34 @@ def save(self, path: Optional[str] = None) -> None: "archive": list(self.archive), "best_program_id": self.best_program_id, } - + with open(os.path.join(save_path, "metadata.json"), "w") as f: json.dump(metadata, f) - + logger.info(f"Saved database with {len(self.programs)} programs to {save_path}") - + def load(self, path: str) -> None: """ Load the database from disk - + Args: path: Path to load from """ if not os.path.exists(path): logger.warning(f"Database path {path} does not exist, skipping load") return - + # Load metadata metadata_path = os.path.join(path, "metadata.json") if os.path.exists(metadata_path): with open(metadata_path, "r") as f: metadata = json.load(f) - + self.feature_map = metadata.get("feature_map", {}) self.islands = [set(island) for island in metadata.get("islands", [])] self.archive = set(metadata.get("archive", [])) self.best_program_id = metadata.get("best_program_id") - + # Load programs programs_dir = os.path.join(path, "programs") if os.path.exists(programs_dir): @@ -314,18 +316,18 @@ def load(self, path: str) -> None: try: with open(program_path, "r") as f: program_data = json.load(f) - + program = Program.from_dict(program_data) self.programs[program.id] = program except Exception as e: logger.warning(f"Error loading program {program_file}: {str(e)}") - + logger.info(f"Loaded database with {len(self.programs)} programs from {path}") - + def _save_program(self, program: Program, base_path: Optional[str] = None) -> None: """ Save a program to disk - + Args: program: Program to save base_path: Base path to save to (uses config.db_path if None) @@ -333,50 +335,48 @@ def _save_program(self, program: Program, base_path: Optional[str] = None) -> No save_path = base_path or self.config.db_path if not save_path: return - + # Create programs directory if it doesn't exist programs_dir = os.path.join(save_path, "programs") os.makedirs(programs_dir, exist_ok=True) - + # Save program program_path = os.path.join(programs_dir, f"{program.id}.json") with open(program_path, "w") as f: json.dump(program.to_dict(), f) - + def _calculate_feature_coords(self, program: Program) -> List[int]: """ Calculate feature coordinates for the MAP-Elites grid - + Args: program: Program to calculate features for - + Returns: List of feature coordinates """ coords = [] - + for dim in self.config.feature_dimensions: if dim == "complexity": # Use code length as complexity measure complexity = len(program.code) - bin_idx = min( - int(complexity / 1000 * self.feature_bins), - self.feature_bins - 1 - ) + bin_idx = min(int(complexity / 1000 * self.feature_bins), self.feature_bins - 1) coords.append(bin_idx) elif dim == "diversity": # Use average edit distance to other programs if len(self.programs) < 5: bin_idx = 0 else: - sample_programs = random.sample(list(self.programs.values()), min(5, len(self.programs))) + sample_programs = random.sample( + list(self.programs.values()), min(5, len(self.programs)) + ) avg_distance = sum( - calculate_edit_distance(program.code, other.code) + calculate_edit_distance(program.code, other.code) for other in sample_programs ) / len(sample_programs) bin_idx = min( - int(avg_distance / 1000 * self.feature_bins), - self.feature_bins - 1 + int(avg_distance / 1000 * self.feature_bins), self.feature_bins - 1 ) coords.append(bin_idx) elif dim == "score": @@ -385,72 +385,66 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: bin_idx = 0 else: avg_score = sum(program.metrics.values()) / len(program.metrics) - bin_idx = min( - int(avg_score * self.feature_bins), - self.feature_bins - 1 - ) + bin_idx = min(int(avg_score * self.feature_bins), self.feature_bins - 1) coords.append(bin_idx) elif dim in program.metrics: # Use specific metric score = program.metrics[dim] - bin_idx = min( - int(score * self.feature_bins), - self.feature_bins - 1 - ) + bin_idx = min(int(score * self.feature_bins), self.feature_bins - 1) coords.append(bin_idx) else: # Default to middle bin if feature not found coords.append(self.feature_bins // 2) - + return coords - + def _feature_coords_to_key(self, coords: List[int]) -> str: """ Convert feature coordinates to a string key - + Args: coords: Feature coordinates - + Returns: String key """ return "-".join(str(c) for c in coords) - + def _is_better(self, program1: Program, program2: Program) -> bool: """ Determine if program1 is better than program2 - + Args: program1: First program program2: Second program - + Returns: True if program1 is better than program2 """ # If no metrics, use newest if not program1.metrics and not program2.metrics: return program1.timestamp > program2.timestamp - + # If only one has metrics, it's better if program1.metrics and not program2.metrics: return True if not program1.metrics and program2.metrics: return False - + # Check for combined_score first (this is the preferred metric) if "combined_score" in program1.metrics and "combined_score" in program2.metrics: return program1.metrics["combined_score"] > program2.metrics["combined_score"] - + # Fallback to average of all metrics avg1 = sum(program1.metrics.values()) / len(program1.metrics) avg2 = sum(program2.metrics.values()) / len(program2.metrics) - + return avg1 > avg2 - + def _update_archive(self, program: Program) -> None: """ Update the archive of elite programs - + Args: program: Program to consider for archive """ @@ -458,23 +452,22 @@ def _update_archive(self, program: Program) -> None: if len(self.archive) < self.config.archive_size: self.archive.add(program.id) return - + # Otherwise, find worst program in archive archive_programs = [self.programs[pid] for pid in self.archive] worst_program = min( - archive_programs, - key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)) + archive_programs, key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)) ) - + # Replace if new program is better if self._is_better(program, worst_program): self.archive.remove(worst_program.id) self.archive.add(program.id) - + def _update_best_program(self, program: Program) -> None: """ Update the absolute best program tracking - + Args: program: Program to consider as the new best """ @@ -483,28 +476,30 @@ def _update_best_program(self, program: Program) -> None: self.best_program_id = program.id logger.debug(f"Set initial best program to {program.id}") return - + # Compare with current best program current_best = self.programs[self.best_program_id] - + # Update if the new program is better if self._is_better(program, current_best): old_id = self.best_program_id self.best_program_id = program.id - + # Log the change if "combined_score" in program.metrics and "combined_score" in current_best.metrics: old_score = current_best.metrics["combined_score"] new_score = program.metrics["combined_score"] score_diff = new_score - old_score - logger.info(f"New best program {program.id} replaces {old_id} (combined_score: {old_score:.4f} → {new_score:.4f}, +{score_diff:.4f})") + logger.info( + f"New best program {program.id} replaces {old_id} (combined_score: {old_score:.4f} → {new_score:.4f}, +{score_diff:.4f})" + ) else: logger.info(f"New best program {program.id} replaces {old_id}") - + def _sample_parent(self) -> Program: """ Sample a parent program for the next evolution step - + Returns: Parent program """ @@ -513,52 +508,48 @@ def _sample_parent(self) -> Program: # Exploitation: Use elite program from archive parent_id = random.choice(list(self.archive)) return self.programs[parent_id] - + # Exploration: Sample from an island island_idx = random.randint(0, len(self.islands) - 1) - + if not self.islands[island_idx]: # If island is empty, use best program return self.get_best_program() or next(iter(self.programs.values())) - + parent_id = random.choice(list(self.islands[island_idx])) return self.programs[parent_id] - - def _sample_inspirations( - self, - parent: Program, - n: int = 5 - ) -> List[Program]: + + def _sample_inspirations(self, parent: Program, n: int = 5) -> List[Program]: """ Sample inspiration programs for the next evolution step - + Args: parent: Parent program n: Number of inspirations to sample - + Returns: List of inspiration programs """ inspirations = [] - + # Always include the absolute best program if available and different from parent if self.best_program_id is not None and self.best_program_id != parent.id: best_program = self.programs[self.best_program_id] inspirations.append(best_program) logger.debug(f"Including best program {self.best_program_id} in inspirations") - + # Add top programs as inspirations top_n = max(1, int(n * self.config.elite_selection_ratio)) top_programs = self.get_top_programs(n=top_n) for program in top_programs: if program.id not in [p.id for p in inspirations] and program.id != parent.id: inspirations.append(program) - + # Add diverse programs if len(self.programs) > n and len(inspirations) < n: # Sample from different feature cells feature_coords = self._calculate_feature_coords(parent) - + # Get programs from nearby feature cells nearby_programs = [] for _ in range(n - len(inspirations)): @@ -567,26 +558,30 @@ def _sample_inspirations( max(0, min(self.feature_bins - 1, c + random.randint(-1, 1))) for c in feature_coords ] - + # Try to get program from this cell cell_key = self._feature_coords_to_key(perturbed_coords) if cell_key in self.feature_map: program_id = self.feature_map[cell_key] if program_id != parent.id and program_id not in [p.id for p in inspirations]: nearby_programs.append(self.programs[program_id]) - + # If we need more, add random programs if len(inspirations) + len(nearby_programs) < n: remaining = n - len(inspirations) - len(nearby_programs) all_ids = set(self.programs.keys()) - excluded_ids = {parent.id}.union(p.id for p in inspirations).union(p.id for p in nearby_programs) + excluded_ids = ( + {parent.id} + .union(p.id for p in inspirations) + .union(p.id for p in nearby_programs) + ) available_ids = list(all_ids - excluded_ids) - + if available_ids: random_ids = random.sample(available_ids, min(remaining, len(available_ids))) random_programs = [self.programs[pid] for pid in random_ids] nearby_programs.extend(random_programs) - + inspirations.extend(nearby_programs) - + return inspirations[:n] diff --git a/openevolve/evaluator.py b/openevolve/evaluator.py index 410e642f6..4b111f326 100644 --- a/openevolve/evaluator.py +++ b/openevolve/evaluator.py @@ -1,6 +1,7 @@ """ Evaluation system for OpenEvolve """ + import asyncio import importlib.util import json @@ -24,13 +25,13 @@ class Evaluator: """ Evaluates programs and assigns scores - + The evaluator is responsible for executing programs, measuring their performance, and assigning scores based on the evaluation criteria. """ - + def __init__( - self, + self, config: EvaluatorConfig, evaluation_file: str, llm_ensemble: Optional[LLMEnsemble] = None, @@ -38,38 +39,40 @@ def __init__( self.config = config self.evaluation_file = evaluation_file self.llm_ensemble = llm_ensemble - + # Create a task pool for parallel evaluation self.task_pool = TaskPool(max_concurrency=config.parallel_evaluations) - + # Set up evaluation function if file exists self._load_evaluation_function() - + logger.info(f"Initialized evaluator with {evaluation_file}") - + def _load_evaluation_function(self) -> None: """Load the evaluation function from the evaluation file""" if not os.path.exists(self.evaluation_file): raise ValueError(f"Evaluation file {self.evaluation_file} not found") - + try: spec = importlib.util.spec_from_file_location("evaluation_module", self.evaluation_file) if spec is None or spec.loader is None: raise ImportError(f"Failed to load spec from {self.evaluation_file}") - + module = importlib.util.module_from_spec(spec) sys.modules["evaluation_module"] = module spec.loader.exec_module(module) - + if not hasattr(module, "evaluate"): - raise AttributeError(f"Evaluation file {self.evaluation_file} does not contain an 'evaluate' function") - + raise AttributeError( + f"Evaluation file {self.evaluation_file} does not contain an 'evaluate' function" + ) + self.evaluate_function = module.evaluate logger.info(f"Successfully loaded evaluation function from {self.evaluation_file}") except Exception as e: logger.error(f"Error loading evaluation function: {str(e)}") raise - + async def evaluate_program( self, program_code: str, @@ -77,21 +80,21 @@ async def evaluate_program( ) -> Dict[str, float]: """ Evaluate a program and return scores - + Args: program_code: Code to evaluate program_id: Optional ID for logging - + Returns: Dictionary of metric name to score """ start_time = time.time() - + # Create a temporary file for the program with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file: temp_file.write(program_code.encode("utf-8")) temp_file_path = temp_file.name - + try: # Run evaluation if self.config.cascade_evaluation: @@ -100,167 +103,171 @@ async def evaluate_program( else: # Run direct evaluation metrics = await self._direct_evaluate(temp_file_path) - + # Add LLM feedback if configured if self.config.use_llm_feedback and self.llm_ensemble: feedback_metrics = await self._llm_evaluate(program_code) - + # Combine metrics for name, value in feedback_metrics.items(): metrics[f"llm_{name}"] = value * self.config.llm_feedback_weight - + elapsed = time.time() - start_time program_id_str = f" {program_id}" if program_id else "" logger.info( f"Evaluated program{program_id_str} in {elapsed:.2f}s: " f"{', '.join(f'{name}={value:.4f}' for name, value in metrics.items())}" ) - + return metrics - + except Exception as e: logger.error(f"Error evaluating program: {str(e)}") return {"error": 0.0} - + finally: # Clean up temporary file if os.path.exists(temp_file_path): os.unlink(temp_file_path) - + @run_in_executor def _direct_evaluate(self, program_path: str) -> Dict[str, float]: """ Directly evaluate a program using the evaluation function - + Args: program_path: Path to the program file - + Returns: Dictionary of metric name to score """ try: # Run the evaluation with timeout result = self.evaluate_function(program_path) - + # Validate result if not isinstance(result, dict): logger.warning(f"Evaluation returned non-dictionary result: {result}") return {"error": 0.0} - + return result - + except Exception as e: logger.error(f"Error in direct evaluation: {str(e)}") return {"error": 0.0} - + async def _cascade_evaluate(self, program_path: str) -> Dict[str, float]: """ Run cascade evaluation with increasingly challenging test cases - + Args: program_path: Path to the program file - + Returns: Dictionary of metric name to score """ # Import the evaluation module to get cascade functions if they exist try: - spec = importlib.util.spec_from_file_location( - "evaluation_module", self.evaluation_file - ) + spec = importlib.util.spec_from_file_location("evaluation_module", self.evaluation_file) if spec is None or spec.loader is None: return await self._direct_evaluate(program_path) - + module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) - + # Check if cascade functions exist if not hasattr(module, "evaluate_stage1"): return await self._direct_evaluate(program_path) - + # Run first stage try: stage1_result = await run_in_executor(module.evaluate_stage1)(program_path) if not isinstance(stage1_result, dict): - logger.warning(f"Stage 1 evaluation returned non-dictionary result: {stage1_result}") + logger.warning( + f"Stage 1 evaluation returned non-dictionary result: {stage1_result}" + ) return {"error": 0.0} except Exception as e: logger.error(f"Error in stage 1 evaluation: {str(e)}") return {"error": 0.0} - + # Check threshold if not self._passes_threshold(stage1_result, self.config.cascade_thresholds[0]): return stage1_result - + # Check if second stage exists if not hasattr(module, "evaluate_stage2"): return stage1_result - + # Run second stage try: stage2_result = await run_in_executor(module.evaluate_stage2)(program_path) if not isinstance(stage2_result, dict): - logger.warning(f"Stage 2 evaluation returned non-dictionary result: {stage2_result}") + logger.warning( + f"Stage 2 evaluation returned non-dictionary result: {stage2_result}" + ) return stage1_result except Exception as e: logger.error(f"Error in stage 2 evaluation: {str(e)}") return stage1_result - + # Merge results result = {} # Convert all values to float to avoid type errors for name, value in stage1_result.items(): if isinstance(value, (int, float)) and name != "error": result[name] = float(value) - + for name, value in stage2_result.items(): if isinstance(value, (int, float)) and name != "error": result[name] = float(value) - + # Check threshold if len(self.config.cascade_thresholds) < 2 or not self._passes_threshold( result, self.config.cascade_thresholds[1] ): return result - + # Check if third stage exists if not hasattr(module, "evaluate_stage3"): return result - + # Run third stage try: stage3_result = await run_in_executor(module.evaluate_stage3)(program_path) if not isinstance(stage3_result, dict): - logger.warning(f"Stage 3 evaluation returned non-dictionary result: {stage3_result}") + logger.warning( + f"Stage 3 evaluation returned non-dictionary result: {stage3_result}" + ) return result except Exception as e: logger.error(f"Error in stage 3 evaluation: {str(e)}") return result - + # Merge results for name, value in stage3_result.items(): if isinstance(value, (int, float)) and name != "error": result[name] = float(value) - + return result - + except Exception as e: logger.error(f"Error in cascade evaluation: {str(e)}") return {"error": 0.0} - + async def _llm_evaluate(self, program_code: str) -> Dict[str, float]: """ Use LLM to evaluate code quality - + Args: program_code: Code to evaluate - + Returns: Dictionary of metric name to score """ if not self.llm_ensemble: return {} - + try: # Create prompt for LLM prompt = f""" @@ -284,17 +291,18 @@ async def _llm_evaluate(self, program_code: str) -> Dict[str, float]: "reasoning": "[brief explanation of scores]" }} """ - + # Get LLM response response = await self.llm_ensemble.generate(prompt) - + # Extract JSON from response try: # Try to find JSON block - json_pattern = r'```json\n(.*?)\n```' + json_pattern = r"```json\n(.*?)\n```" import re + json_match = re.search(json_pattern, response, re.DOTALL) - + if json_match: json_str = json_match.group(1) else: @@ -305,75 +313,73 @@ async def _llm_evaluate(self, program_code: str) -> Dict[str, float]: end_idx = json_str.rfind("}") + 1 if start_idx >= 0 and end_idx > start_idx: json_str = json_str[start_idx:end_idx] - + # Parse JSON result = json.loads(json_str) - + # Extract metrics metrics = {} for key in ["readability", "maintainability", "efficiency"]: if key in result: metrics[key] = float(result[key]) - + return metrics - + except Exception as e: logger.warning(f"Error parsing LLM response: {str(e)}") return {} - + except Exception as e: logger.error(f"Error in LLM evaluation: {str(e)}") return {} - + def _passes_threshold(self, metrics: Dict[str, float], threshold: float) -> bool: """ Check if metrics pass a threshold - + Args: metrics: Dictionary of metric name to score threshold: Threshold to pass - + Returns: True if metrics pass threshold """ if not metrics: return False - + # Calculate average score, skipping non-numeric values and 'error' key valid_metrics = [] for name, value in metrics.items(): # Skip 'error' keys and ensure values are numeric - if name != 'error' and isinstance(value, (int, float)): + if name != "error" and isinstance(value, (int, float)): try: valid_metrics.append(float(value)) except (TypeError, ValueError): logger.warning(f"Skipping non-numeric metric: {name}={value}") continue - + if not valid_metrics: return False - + avg_score = sum(valid_metrics) / len(valid_metrics) return avg_score >= threshold - + async def evaluate_multiple( self, programs: List[Tuple[str, str]], ) -> List[Dict[str, float]]: """ Evaluate multiple programs in parallel - + Args: programs: List of (program_code, program_id) tuples - + Returns: List of metric dictionaries """ tasks = [ - self.task_pool.create_task( - self.evaluate_program, program_code, program_id - ) + self.task_pool.create_task(self.evaluate_program, program_code, program_id) for program_code, program_id in programs ] - + return await asyncio.gather(*tasks) diff --git a/openevolve/llm/__init__.py b/openevolve/llm/__init__.py index 7aa42ac34..26bbef567 100644 --- a/openevolve/llm/__init__.py +++ b/openevolve/llm/__init__.py @@ -1,6 +1,7 @@ """ LLM module initialization """ + from openevolve.llm.base import LLMInterface from openevolve.llm.ensemble import LLMEnsemble from openevolve.llm.openai import OpenAILLM diff --git a/openevolve/llm/base.py b/openevolve/llm/base.py index 6aede09af..1cbf436e9 100644 --- a/openevolve/llm/base.py +++ b/openevolve/llm/base.py @@ -1,24 +1,22 @@ """ Base LLM interface """ + from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional class LLMInterface(ABC): """Abstract base class for LLM interfaces""" - + @abstractmethod async def generate(self, prompt: str, **kwargs) -> str: """Generate text from a prompt""" pass - + @abstractmethod async def generate_with_context( - self, - system_message: str, - messages: List[Dict[str, str]], - **kwargs + self, system_message: str, messages: List[Dict[str, str]], **kwargs ) -> str: """Generate text using a system message and conversational context""" pass diff --git a/openevolve/llm/ensemble.py b/openevolve/llm/ensemble.py index 68d93d747..0c518ccad 100644 --- a/openevolve/llm/ensemble.py +++ b/openevolve/llm/ensemble.py @@ -1,6 +1,7 @@ """ Model ensemble for LLMs """ + import asyncio import logging import random @@ -15,66 +16,54 @@ class LLMEnsemble: """Ensemble of LLMs for generating diverse code modifications""" - + def __init__(self, config: LLMConfig): self.config = config - + # Initialize primary and secondary models self.primary_model = OpenAILLM(config, model=config.primary_model) self.secondary_model = OpenAILLM(config, model=config.secondary_model) - + # Model weights for sampling self._weights = [ config.primary_model_weight, config.secondary_model_weight, ] - + # Normalize weights total = sum(self._weights) self._weights = [w / total for w in self._weights] - + logger.info( f"Initialized LLM ensemble with models: " f"{config.primary_model} (weight: {self._weights[0]:.2f}), " f"{config.secondary_model} (weight: {self._weights[1]:.2f})" ) - + async def generate(self, prompt: str, **kwargs) -> str: """Generate text using a randomly selected model based on weights""" model = self._sample_model() return await model.generate(prompt, **kwargs) - + async def generate_with_context( - self, - system_message: str, - messages: List[Dict[str, str]], - **kwargs + self, system_message: str, messages: List[Dict[str, str]], **kwargs ) -> str: """Generate text using a system message and conversational context""" model = self._sample_model() return await model.generate_with_context(system_message, messages, **kwargs) - + def _sample_model(self) -> LLMInterface: """Sample a model from the ensemble based on weights""" models = [self.primary_model, self.secondary_model] index = random.choices(range(len(models)), weights=self._weights, k=1)[0] return models[index] - - async def generate_multiple( - self, - prompt: str, - n: int, - **kwargs - ) -> List[str]: + + async def generate_multiple(self, prompt: str, n: int, **kwargs) -> List[str]: """Generate multiple texts in parallel""" tasks = [self.generate(prompt, **kwargs) for _ in range(n)] return await asyncio.gather(*tasks) - - async def parallel_generate( - self, - prompts: List[str], - **kwargs - ) -> List[str]: + + async def parallel_generate(self, prompts: List[str], **kwargs) -> List[str]: """Generate responses for multiple prompts in parallel""" tasks = [self.generate(prompt, **kwargs) for prompt in prompts] return await asyncio.gather(*tasks) diff --git a/openevolve/llm/openai.py b/openevolve/llm/openai.py index ec5425e31..fda41b9ef 100644 --- a/openevolve/llm/openai.py +++ b/openevolve/llm/openai.py @@ -1,6 +1,7 @@ """ OpenAI API interface for LLMs """ + import asyncio import logging import time @@ -16,7 +17,7 @@ class OpenAILLM(LLMInterface): """LLM interface using OpenAI-compatible APIs""" - + def __init__( self, config: LLMConfig, @@ -24,34 +25,31 @@ def __init__( ): self.config = config self.model = model or config.primary_model - + # Set up API client self.client = openai.OpenAI( api_key=config.api_key, base_url=config.api_base, ) - + logger.info(f"Initialized OpenAI LLM with model: {self.model}") - + async def generate(self, prompt: str, **kwargs) -> str: """Generate text from a prompt""" return await self.generate_with_context( system_message=self.config.system_message, messages=[{"role": "user", "content": prompt}], - **kwargs + **kwargs, ) - + async def generate_with_context( - self, - system_message: str, - messages: List[Dict[str, str]], - **kwargs + self, system_message: str, messages: List[Dict[str, str]], **kwargs ) -> str: """Generate text using a system message and conversational context""" # Prepare messages with system message formatted_messages = [{"role": "system", "content": system_message}] formatted_messages.extend(messages) - + # Set up generation parameters params = { "model": self.model, @@ -60,18 +58,15 @@ async def generate_with_context( "top_p": kwargs.get("top_p", self.config.top_p), "max_tokens": kwargs.get("max_tokens", self.config.max_tokens), } - + # Attempt the API call with retries retries = kwargs.get("retries", self.config.retries) retry_delay = kwargs.get("retry_delay", self.config.retry_delay) timeout = kwargs.get("timeout", self.config.timeout) - + for attempt in range(retries + 1): try: - response = await asyncio.wait_for( - self._call_api(params), - timeout=timeout - ) + response = await asyncio.wait_for(self._call_api(params), timeout=timeout) return response except asyncio.TimeoutError: if attempt < retries: @@ -89,15 +84,14 @@ async def generate_with_context( else: logger.error(f"All {retries + 1} attempts failed with error: {str(e)}") raise - + async def _call_api(self, params: Dict[str, Any]) -> str: """Make the actual API call""" # Use asyncio to run the blocking API call in a thread pool loop = asyncio.get_event_loop() response = await loop.run_in_executor( - None, - lambda: self.client.chat.completions.create(**params) + None, lambda: self.client.chat.completions.create(**params) ) - + # Extract the response content return response.choices[0].message.content diff --git a/openevolve/prompt/__init__.py b/openevolve/prompt/__init__.py index 3ac05f637..725577bfd 100644 --- a/openevolve/prompt/__init__.py +++ b/openevolve/prompt/__init__.py @@ -1,6 +1,7 @@ """ Prompt module initialization """ + from openevolve.prompt.sampler import PromptSampler from openevolve.prompt.templates import TemplateManager diff --git a/openevolve/prompt/sampler.py b/openevolve/prompt/sampler.py index 9a9b236a0..8d59220c7 100644 --- a/openevolve/prompt/sampler.py +++ b/openevolve/prompt/sampler.py @@ -1,6 +1,7 @@ """ Prompt sampling for OpenEvolve """ + import logging import random from typing import Any, Dict, List, Optional, Tuple, Union @@ -13,28 +14,26 @@ class PromptSampler: """Generates prompts for code evolution""" - + def __init__(self, config: PromptConfig): self.config = config self.template_manager = TemplateManager(config.template_dir) - + # Initialize the random number generator random.seed() - + # Store custom template mappings self.system_template_override = None self.user_template_override = None - + logger.info("Initialized prompt sampler") - + def set_templates( - self, - system_template: Optional[str] = None, - user_template: Optional[str] = None + self, system_template: Optional[str] = None, user_template: Optional[str] = None ) -> None: """ Set custom templates to use for this sampler - + Args: system_template: Template name for system message user_template: Template name for user message @@ -42,7 +41,7 @@ def set_templates( self.system_template_override = system_template self.user_template_override = user_template logger.info(f"Set custom templates: system={system_template}, user={user_template}") - + def build_prompt( self, current_program: str, @@ -57,7 +56,7 @@ def build_prompt( ) -> Dict[str, str]: """ Build a prompt for the LLM - + Args: current_program: Current program code parent_program: Parent program from which current was derived @@ -68,7 +67,7 @@ def build_prompt( evolution_round: Current evolution round allow_full_rewrite: Whether to allow a full rewrite template_key: Optional override for template key - + Returns: Dictionary with 'system' and 'user' keys """ @@ -82,10 +81,10 @@ def build_prompt( else: # Default behavior user_template_key = "full_rewrite_user" if allow_full_rewrite else "diff_user" - + # Get the template user_template = self.template_manager.get_template(user_template_key) - + # Use system template override if set if self.system_template_override: system_message = self.template_manager.get_template(self.system_template_override) @@ -94,24 +93,24 @@ def build_prompt( # If system_message is a template name rather than content, get the template if system_message in self.template_manager.templates: system_message = self.template_manager.get_template(system_message) - + # Format metrics metrics_str = self._format_metrics(program_metrics) - + # Identify areas for improvement improvement_areas = self._identify_improvement_areas( current_program, parent_program, program_metrics, previous_programs ) - + # Format evolution history evolution_history = self._format_evolution_history( previous_programs, top_programs, language ) - + # Apply stochastic template variations if enabled if self.config.use_template_stochasticity: user_template = self._apply_template_variations(user_template) - + # Format the final user message user_message = user_template.format( metrics=metrics_str, @@ -120,16 +119,16 @@ def build_prompt( current_program=current_program, language=language, ) - + return { "system": system_message, "user": user_message, } - + def _format_metrics(self, metrics: Dict[str, float]) -> str: """Format metrics for the prompt""" return "\n".join([f"- {name}: {value:.4f}" for name, value in metrics.items()]) - + def _identify_improvement_areas( self, current_program: str, @@ -140,54 +139,56 @@ def _identify_improvement_areas( """Identify potential areas for improvement""" # This method could be expanded to include more sophisticated analysis # For now, we'll use a simple approach - + improvement_areas = [] - + # Check program length if len(current_program) > 500: - improvement_areas.append("Consider simplifying the code to improve readability and maintainability") - + improvement_areas.append( + "Consider simplifying the code to improve readability and maintainability" + ) + # Check for performance patterns in previous attempts if len(previous_programs) >= 2: recent_attempts = previous_programs[-2:] metrics_improved = [] metrics_regressed = [] - + for metric, value in metrics.items(): improved = True regressed = True - + for attempt in recent_attempts: if attempt["metrics"].get(metric, 0) <= value: regressed = False if attempt["metrics"].get(metric, 0) >= value: improved = False - + if improved and metric not in metrics_improved: metrics_improved.append(metric) if regressed and metric not in metrics_regressed: metrics_regressed.append(metric) - + if metrics_improved: improvement_areas.append( f"Metrics showing improvement: {', '.join(metrics_improved)}. " "Consider continuing with similar changes." ) - + if metrics_regressed: improvement_areas.append( f"Metrics showing regression: {', '.join(metrics_regressed)}. " "Consider reverting or revising recent changes in these areas." ) - + # If we don't have specific improvements to suggest if not improvement_areas: improvement_areas.append( "Focus on optimizing the code for better performance on the target metrics" ) - + return "\n".join([f"- {area}" for area in improvement_areas]) - + def _format_evolution_history( self, previous_programs: List[Dict[str, Any]], @@ -199,85 +200,96 @@ def _format_evolution_history( history_template = self.template_manager.get_template("evolution_history") previous_attempt_template = self.template_manager.get_template("previous_attempt") top_program_template = self.template_manager.get_template("top_program") - + # Format previous attempts (most recent first) previous_attempts_str = "" - selected_previous = previous_programs[-min(3, len(previous_programs)):] - + selected_previous = previous_programs[-min(3, len(previous_programs)) :] + for i, program in enumerate(reversed(selected_previous)): attempt_number = len(previous_programs) - i changes = program.get("changes", "Unknown changes") - + # Format performance metrics - performance_str = ", ".join([ - f"{name}: {value:.4f}" - for name, value in program.get("metrics", {}).items() - ]) - + performance_str = ", ".join( + [f"{name}: {value:.4f}" for name, value in program.get("metrics", {}).items()] + ) + # Determine outcome based on comparison with parent parent_metrics = program.get("parent_metrics", {}) outcome = "Mixed results" - - if all(program.get("metrics", {}).get(m, 0) >= parent_metrics.get(m, 0) - for m in program.get("metrics", {})): + + if all( + program.get("metrics", {}).get(m, 0) >= parent_metrics.get(m, 0) + for m in program.get("metrics", {}) + ): outcome = "Improvement in all metrics" - elif all(program.get("metrics", {}).get(m, 0) <= parent_metrics.get(m, 0) - for m in program.get("metrics", {})): + elif all( + program.get("metrics", {}).get(m, 0) <= parent_metrics.get(m, 0) + for m in program.get("metrics", {}) + ): outcome = "Regression in all metrics" - - previous_attempts_str += previous_attempt_template.format( - attempt_number=attempt_number, - changes=changes, - performance=performance_str, - outcome=outcome, - ) + "\n\n" - + + previous_attempts_str += ( + previous_attempt_template.format( + attempt_number=attempt_number, + changes=changes, + performance=performance_str, + outcome=outcome, + ) + + "\n\n" + ) + # Format top programs top_programs_str = "" - selected_top = top_programs[:min(self.config.num_top_programs, len(top_programs))] - + selected_top = top_programs[: min(self.config.num_top_programs, len(top_programs))] + for i, program in enumerate(selected_top): # Extract a snippet (first 10 lines) for display program_code = program.get("code", "") program_snippet = "\n".join(program_code.split("\n")[:10]) if len(program_code.split("\n")) > 10: program_snippet += "\n# ... (truncated for brevity)" - + # Calculate a composite score - score = sum(program.get("metrics", {}).values()) / max(1, len(program.get("metrics", {}))) - + score = sum(program.get("metrics", {}).values()) / max( + 1, len(program.get("metrics", {})) + ) + # Extract key features (this could be more sophisticated) key_features = program.get("key_features", []) if not key_features: key_features = [ - f"Performs well on {name} ({value:.4f})" + f"Performs well on {name} ({value:.4f})" for name, value in program.get("metrics", {}).items() ] - + key_features_str = ", ".join(key_features) - - top_programs_str += top_program_template.format( - program_number=i + 1, - score=f"{score:.4f}", - language=language, - program_snippet=program_snippet, - key_features=key_features_str, - ) + "\n\n" - + + top_programs_str += ( + top_program_template.format( + program_number=i + 1, + score=f"{score:.4f}", + language=language, + program_snippet=program_snippet, + key_features=key_features_str, + ) + + "\n\n" + ) + # Combine into full history return history_template.format( previous_attempts=previous_attempts_str.strip(), top_programs=top_programs_str.strip(), ) - + def _apply_template_variations(self, template: str) -> str: """Apply stochastic variations to the template""" result = template - + # Apply variations defined in the config for key, variations in self.config.template_variations.items(): if variations and f"{{{key}}}" in result: chosen_variation = random.choice(variations) result = result.replace(f"{{{key}}}", chosen_variation) - + return result diff --git a/openevolve/prompt/templates.py b/openevolve/prompt/templates.py index 49fa9acbe..b50e34e9b 100644 --- a/openevolve/prompt/templates.py +++ b/openevolve/prompt/templates.py @@ -1,6 +1,7 @@ """ Prompt templates for OpenEvolve """ + import os from pathlib import Path from typing import Dict, List, Optional, Union @@ -118,27 +119,27 @@ class TemplateManager: """Manages templates for prompt generation""" - + def __init__(self, template_dir: Optional[str] = None): self.templates = DEFAULT_TEMPLATES.copy() - + # Load templates from directory if provided if template_dir and os.path.isdir(template_dir): self._load_templates_from_dir(template_dir) - + def _load_templates_from_dir(self, template_dir: str) -> None: """Load templates from a directory""" for file_path in Path(template_dir).glob("*.txt"): template_name = file_path.stem with open(file_path, "r") as f: self.templates[template_name] = f.read() - + def get_template(self, template_name: str) -> str: """Get a template by name""" if template_name not in self.templates: raise ValueError(f"Template '{template_name}' not found") return self.templates[template_name] - + def add_template(self, template_name: str, template: str) -> None: """Add or update a template""" self.templates[template_name] = template diff --git a/openevolve/utils/__init__.py b/openevolve/utils/__init__.py index eb5679336..6d16167af 100644 --- a/openevolve/utils/__init__.py +++ b/openevolve/utils/__init__.py @@ -1,11 +1,12 @@ """ Utilities module initialization """ + from openevolve.utils.async_utils import ( - TaskPool, - gather_with_concurrency, - retry_async, - run_in_executor + TaskPool, + gather_with_concurrency, + retry_async, + run_in_executor, ) from openevolve.utils.code_utils import ( apply_diff, diff --git a/openevolve/utils/async_utils.py b/openevolve/utils/async_utils.py index dfd3b5b0d..872f0e5ee 100644 --- a/openevolve/utils/async_utils.py +++ b/openevolve/utils/async_utils.py @@ -1,6 +1,7 @@ """ Async utilities for OpenEvolve """ + import asyncio import functools import logging @@ -9,54 +10,50 @@ logger = logging.getLogger(__name__) -T = TypeVar('T') +T = TypeVar("T") def run_in_executor(f: Callable) -> Callable: """ Decorator to run a synchronous function in an executor - + Args: f: Function to decorate - + Returns: Decorated function that runs in an executor """ + @functools.wraps(f) async def wrapper(*args: Any, **kwargs: Any) -> Any: loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - functools.partial(f, *args, **kwargs) - ) + return await loop.run_in_executor(None, functools.partial(f, *args, **kwargs)) + return wrapper async def gather_with_concurrency( - n: int, - *tasks: asyncio.Future, - return_exceptions: bool = False + n: int, *tasks: asyncio.Future, return_exceptions: bool = False ) -> List[Any]: """ Run tasks with a concurrency limit - + Args: n: Maximum number of tasks to run concurrently *tasks: Tasks to run return_exceptions: Whether to return exceptions instead of raising them - + Returns: List of task results """ semaphore = asyncio.Semaphore(n) - + async def sem_task(task: asyncio.Future) -> Any: async with semaphore: return await task - + return await asyncio.gather( - *(sem_task(task) for task in tasks), - return_exceptions=return_exceptions + *(sem_task(task) for task in tasks), return_exceptions=return_exceptions ) @@ -67,11 +64,11 @@ async def retry_async( delay: float = 1.0, backoff: float = 2.0, exceptions: Union[Exception, tuple] = Exception, - **kwargs: Any + **kwargs: Any, ) -> Any: """ Retry an async function with exponential backoff - + Args: coro: Coroutine function to retry *args: Arguments to pass to the coroutine @@ -80,16 +77,16 @@ async def retry_async( backoff: Multiplier for delay between retries exceptions: Exception(s) to catch **kwargs: Keyword arguments to pass to the coroutine - + Returns: Result of the coroutine - + Raises: The last exception caught if all retries fail """ last_exception = None current_delay = delay - + for i in range(retries + 1): try: return await coro(*args, **kwargs) @@ -106,10 +103,10 @@ async def retry_async( logger.error( f"All {retries+1} attempts failed. Last error: {type(e).__name__}: {str(e)}" ) - + if last_exception: raise last_exception - + return None # Should never reach here @@ -117,35 +114,35 @@ class TaskPool: """ A simple task pool for managing and limiting concurrent tasks """ - + def __init__(self, max_concurrency: int = 10): self.semaphore = asyncio.Semaphore(max_concurrency) self.tasks: List[asyncio.Task] = [] - + async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any: """ Run a coroutine in the pool - + Args: coro: Coroutine function to run *args: Arguments to pass to the coroutine **kwargs: Keyword arguments to pass to the coroutine - + Returns: Result of the coroutine """ async with self.semaphore: return await coro(*args, **kwargs) - + def create_task(self, coro: Callable, *args: Any, **kwargs: Any) -> asyncio.Task: """ Create and track a task in the pool - + Args: coro: Coroutine function to run *args: Arguments to pass to the coroutine **kwargs: Keyword arguments to pass to the coroutine - + Returns: Task object """ @@ -153,16 +150,16 @@ def create_task(self, coro: Callable, *args: Any, **kwargs: Any) -> asyncio.Task self.tasks.append(task) task.add_done_callback(lambda t: self.tasks.remove(t)) return task - + async def wait_all(self) -> None: """Wait for all tasks in the pool to complete""" if self.tasks: await asyncio.gather(*self.tasks) - + async def cancel_all(self) -> None: """Cancel all tasks in the pool""" for task in self.tasks: task.cancel() - + if self.tasks: await asyncio.gather(*self.tasks, return_exceptions=True) diff --git a/openevolve/utils/code_utils.py b/openevolve/utils/code_utils.py index 44c8e922a..397465fb6 100644 --- a/openevolve/utils/code_utils.py +++ b/openevolve/utils/code_utils.py @@ -1,6 +1,7 @@ """ Utilities for code parsing, diffing, and manipulation """ + import re from typing import Dict, List, Optional, Tuple, Union @@ -8,20 +9,20 @@ def parse_evolve_blocks(code: str) -> List[Tuple[int, int, str]]: """ Parse evolve blocks from code - + Args: code: Source code with evolve blocks - + Returns: List of tuples (start_line, end_line, block_content) """ lines = code.split("\n") blocks = [] - + in_block = False start_line = -1 block_content = [] - + for i, line in enumerate(lines): if "# EVOLVE-BLOCK-START" in line: in_block = True @@ -32,51 +33,51 @@ def parse_evolve_blocks(code: str) -> List[Tuple[int, int, str]]: blocks.append((start_line, i, "\n".join(block_content))) elif in_block: block_content.append(line) - + return blocks def apply_diff(original_code: str, diff_text: str) -> str: """ Apply a diff to the original code - + Args: original_code: Original source code diff_text: Diff in the SEARCH/REPLACE format - + Returns: Modified code """ # Split into lines for easier processing original_lines = original_code.split("\n") result_lines = original_lines.copy() - + # Extract diff blocks diff_pattern = r"<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE" diff_blocks = re.findall(diff_pattern, diff_text, re.DOTALL) - + # Apply each diff block for search_text, replace_text in diff_blocks: search_lines = search_text.split("\n") replace_lines = replace_text.split("\n") - + # Find where the search pattern starts in the original code for i in range(len(result_lines) - len(search_lines) + 1): - if result_lines[i:i+len(search_lines)] == search_lines: + if result_lines[i : i + len(search_lines)] == search_lines: # Replace the matched section - result_lines[i:i+len(search_lines)] = replace_lines + result_lines[i : i + len(search_lines)] = replace_lines break - + return "\n".join(result_lines) def extract_diffs(diff_text: str) -> List[Tuple[str, str]]: """ Extract diff blocks from the diff text - + Args: diff_text: Diff in the SEARCH/REPLACE format - + Returns: List of tuples (search_text, replace_text) """ @@ -88,100 +89,104 @@ def extract_diffs(diff_text: str) -> List[Tuple[str, str]]: def parse_full_rewrite(llm_response: str, language: str = "python") -> Optional[str]: """ Extract a full rewrite from an LLM response - + Args: llm_response: Response from the LLM language: Programming language - + Returns: Extracted code or None if not found """ code_block_pattern = r"```" + language + r"\n(.*?)```" matches = re.findall(code_block_pattern, llm_response, re.DOTALL) - + if matches: return matches[0].strip() - + # Fallback to any code block code_block_pattern = r"```(.*?)```" matches = re.findall(code_block_pattern, llm_response, re.DOTALL) - + if matches: return matches[0].strip() - + return None def format_diff_summary(diff_blocks: List[Tuple[str, str]]) -> str: """ Create a human-readable summary of the diff - + Args: diff_blocks: List of (search_text, replace_text) tuples - + Returns: Summary string """ summary = [] - + for i, (search_text, replace_text) in enumerate(diff_blocks): search_lines = search_text.strip().split("\n") replace_lines = replace_text.strip().split("\n") - + # Create a short summary if len(search_lines) == 1 and len(replace_lines) == 1: summary.append(f"Change {i+1}: '{search_lines[0]}' to '{replace_lines[0]}'") else: - search_summary = f"{len(search_lines)} lines" if len(search_lines) > 1 else search_lines[0] - replace_summary = f"{len(replace_lines)} lines" if len(replace_lines) > 1 else replace_lines[0] + search_summary = ( + f"{len(search_lines)} lines" if len(search_lines) > 1 else search_lines[0] + ) + replace_summary = ( + f"{len(replace_lines)} lines" if len(replace_lines) > 1 else replace_lines[0] + ) summary.append(f"Change {i+1}: Replace {search_summary} with {replace_summary}") - + return "\n".join(summary) def calculate_edit_distance(code1: str, code2: str) -> int: """ Calculate the Levenshtein edit distance between two code snippets - + Args: code1: First code snippet code2: Second code snippet - + Returns: Edit distance (number of operations needed to transform code1 into code2) """ if code1 == code2: return 0 - + # Simple implementation of Levenshtein distance m, n = len(code1), len(code2) dp = [[0 for _ in range(n + 1)] for _ in range(m + 1)] - + for i in range(m + 1): dp[i][0] = i - + for j in range(n + 1): dp[0][j] = j - + for i in range(1, m + 1): for j in range(1, n + 1): - cost = 0 if code1[i-1] == code2[j-1] else 1 + cost = 0 if code1[i - 1] == code2[j - 1] else 1 dp[i][j] = min( - dp[i-1][j] + 1, # deletion - dp[i][j-1] + 1, # insertion - dp[i-1][j-1] + cost, # substitution + dp[i - 1][j] + 1, # deletion + dp[i][j - 1] + 1, # insertion + dp[i - 1][j - 1] + cost, # substitution ) - + return dp[m][n] def extract_code_language(code: str) -> str: """ Try to determine the language of a code snippet - + Args: code: Code snippet - + Returns: Detected language or "unknown" """ @@ -198,5 +203,5 @@ def extract_code_language(code: str) -> str: return "rust" elif re.search(r"^(SELECT|CREATE TABLE|INSERT INTO)", code, re.MULTILINE): return "sql" - + return "unknown" diff --git a/tests/test_basic.py b/tests/test_basic.py index 55c55168b..7a746e337 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,6 +1,7 @@ """ Basic tests for OpenEvolve components """ + import asyncio import os import tempfile @@ -17,7 +18,7 @@ class TestCodeUtils(unittest.TestCase): """Tests for code utilities""" - + def test_extract_diffs(self): """Test extracting diffs from a response""" diff_text = """ @@ -39,14 +40,14 @@ def hello(): x = 2 >>>>>>> REPLACE """ - + diffs = extract_diffs(diff_text) self.assertEqual(len(diffs), 2) - self.assertEqual(diffs[0][0].strip(), "def hello():\n print(\"Hello\")") - self.assertEqual(diffs[0][1].strip(), "def hello():\n print(\"Hello, World!\")") + self.assertEqual(diffs[0][0].strip(), 'def hello():\n print("Hello")') + self.assertEqual(diffs[0][1].strip(), 'def hello():\n print("Hello, World!")') self.assertEqual(diffs[1][0].strip(), "x = 1") self.assertEqual(diffs[1][1].strip(), "x = 2") - + def test_apply_diff(self): """Test applying diffs to code""" original_code = """ @@ -56,7 +57,7 @@ def hello(): x = 1 y = 2 """ - + diff_text = """ <<<<<<< SEARCH def hello(): @@ -72,7 +73,7 @@ def hello(): x = 2 >>>>>>> REPLACE """ - + expected_code = """ def hello(): print("Hello, World!") @@ -80,25 +81,25 @@ def hello(): x = 2 y = 2 """ - + result = apply_diff(original_code, diff_text) - + # Normalize whitespace for comparison self.assertEqual( - result.replace(" ", "").replace("\n", ""), - expected_code.replace(" ", "").replace("\n", "") + result.replace(" ", "").replace("\n", ""), + expected_code.replace(" ", "").replace("\n", ""), ) class TestProgramDatabase(unittest.TestCase): """Tests for program database""" - + def setUp(self): """Set up test database""" config = Config() config.database.in_memory = True self.db = ProgramDatabase(config.database) - + def test_add_and_get(self): """Test adding and retrieving a program""" program = Program( @@ -107,15 +108,15 @@ def test_add_and_get(self): language="python", metrics={"score": 0.5}, ) - + self.db.add(program) - + retrieved = self.db.get("test1") self.assertIsNotNone(retrieved) self.assertEqual(retrieved.id, "test1") self.assertEqual(retrieved.code, "def test(): pass") self.assertEqual(retrieved.metrics["score"], 0.5) - + def test_get_best_program(self): """Test getting the best program""" program1 = Program( @@ -124,21 +125,21 @@ def test_get_best_program(self): language="python", metrics={"score": 0.5}, ) - + program2 = Program( id="test2", code="def test2(): pass", language="python", metrics={"score": 0.7}, ) - + self.db.add(program1) self.db.add(program2) - + best = self.db.get_best_program() self.assertIsNotNone(best) self.assertEqual(best.id, "test2") - + def test_sample(self): """Test sampling from the database""" program1 = Program( @@ -147,31 +148,31 @@ def test_sample(self): language="python", metrics={"score": 0.5}, ) - + program2 = Program( id="test2", code="def test2(): pass", language="python", metrics={"score": 0.7}, ) - + self.db.add(program1) self.db.add(program2) - + parent, inspirations = self.db.sample() - + self.assertIsNotNone(parent) self.assertIn(parent.id, ["test1", "test2"]) class TestPromptSampler(unittest.TestCase): """Tests for prompt sampler""" - + def setUp(self): """Set up test prompt sampler""" config = Config() self.prompt_sampler = PromptSampler(config.prompt) - + def test_build_prompt(self): """Test building a prompt""" current_program = "def test(): pass" @@ -191,7 +192,7 @@ def test_build_prompt(self): "metrics": {"score": 0.6}, } ] - + prompt = self.prompt_sampler.build_prompt( current_program=current_program, parent_program=parent_program, @@ -199,7 +200,7 @@ def test_build_prompt(self): previous_programs=previous_programs, top_programs=top_programs, ) - + self.assertIn("system", prompt) self.assertIn("user", prompt) self.assertIn("def test(): pass", prompt["user"])