diff --git a/openevolve/cli.py b/openevolve/cli.py index 98b0008f9..dd5d707dd 100644 --- a/openevolve/cli.py +++ b/openevolve/cli.py @@ -126,6 +126,7 @@ async def main_async() -> int: best_program = await openevolve.run( iterations=args.iterations, target_score=args.target_score, + checkpoint_path=args.checkpoint, ) # Get the checkpoint path diff --git a/openevolve/config.py b/openevolve/config.py index 1b0dc6e80..c89766d1c 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -56,7 +56,10 @@ class LLMConfig(LLMModelConfig): retry_delay: int = 5 # n-model configuration for evolution LLM ensemble - models: List[LLMModelConfig] = field(default_factory=lambda: [LLMModelConfig()]) + models: List[LLMModelConfig] = field(default_factory=lambda: [ + LLMModelConfig(name="gpt-4o-mini", weight=0.8), + LLMModelConfig(name="gpt-4o", weight=0.2) + ]) # n-model configuration for evaluator LLM ensemble evaluator_models: List[LLMModelConfig] = field(default_factory=lambda: []) @@ -195,7 +198,7 @@ class EvaluatorConfig: cascade_thresholds: List[float] = field(default_factory=lambda: [0.5, 0.75, 0.9]) # Parallel evaluation - parallel_evaluations: int = 4 + parallel_evaluations: int = 1 distributed: bool = False # LLM-based feedback @@ -217,6 +220,7 @@ class Config: log_level: str = "INFO" log_dir: Optional[str] = None random_seed: Optional[int] = 42 + language: str = None # Component configurations llm: LLMConfig = field(default_factory=LLMConfig) @@ -361,4 +365,4 @@ def load_config(config_path: Optional[Union[str, Path]] = None) -> Config: # Make the system message available to the individual models, in case it is not provided from the prompt sampler config.llm.update_model_params({"system_message": config.prompt.system_message}) - return config + return config \ No newline at end of file diff --git a/openevolve/controller.py b/openevolve/controller.py index 5205561ca..bf4ea683d 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -5,25 +5,20 @@ import asyncio import logging import os -import re +import signal import time import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union -import traceback +from typing import Any, Dict, List, Optional, Union from openevolve.config import Config, load_config from openevolve.database import Program, ProgramDatabase from openevolve.evaluator import Evaluator from openevolve.llm.ensemble import LLMEnsemble from openevolve.prompt.sampler import PromptSampler +from openevolve.threaded_parallel import ImprovedParallelController from openevolve.utils.code_utils import ( - apply_diff, extract_code_language, - extract_diffs, - format_diff_summary, - parse_evolve_blocks, - parse_full_rewrite, ) from openevolve.utils.format_utils import ( format_metrics_safe, @@ -129,7 +124,8 @@ def __init__( # Load initial program self.initial_program_path = initial_program_path self.initial_program_code = self._load_initial_program() - self.language = extract_code_language(self.initial_program_code) + if not self.config.language: + self.config.language = extract_code_language(self.initial_program_code) # Extract file extension from initial program self.file_extension = os.path.splitext(initial_program_path)[1] @@ -162,8 +158,12 @@ def __init__( self.evaluator_prompt_sampler, database=self.database, ) + self.evaluation_file = evaluation_file - logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") + logger.info(f"Initialized OpenEvolve with {initial_program_path}") + + # Initialize improved parallel processing components + self.parallel_controller = None def _setup_logging(self) -> None: """Set up logging""" @@ -198,24 +198,31 @@ async def run( self, iterations: Optional[int] = None, target_score: Optional[float] = None, - ) -> Program: + checkpoint_path: Optional[str] = None, + ) -> Optional[Program]: """ - Run the evolution process + Run the evolution process with improved parallel processing Args: iterations: Maximum number of iterations (uses config if None) target_score: Target score to reach (continues until reached if specified) + checkpoint_path: Path to resume from checkpoint Returns: Best program found """ max_iterations = iterations or self.config.max_iterations - - # Define start_iteration before creating the initial program - start_iteration = self.database.last_iteration + + # Determine starting iteration + start_iteration = 0 + if checkpoint_path and os.path.exists(checkpoint_path): + self._load_checkpoint(checkpoint_path) + start_iteration = self.database.last_iteration + 1 + logger.info(f"Resuming from checkpoint at iteration {start_iteration}") + else: + start_iteration = self.database.last_iteration # Only add initial program if starting fresh (not resuming from checkpoint) - # Check if we're resuming AND no program matches initial code to avoid pollution should_add_initial = ( start_iteration == 0 and len(self.database.programs) == 0 @@ -236,7 +243,7 @@ async def run( initial_program = Program( id=initial_program_id, code=self.initial_program_code, - language=self.language, + language=self.config.language, metrics=initial_metrics, iteration_found=start_iteration, ) @@ -244,203 +251,49 @@ async def run( self.database.add(initial_program) else: logger.info( - f"Skipping initial program addition (resuming from iteration {start_iteration} with {len(self.database.programs)} existing programs)" + f"Skipping initial program addition (resuming from iteration {start_iteration} " + f"with {len(self.database.programs)} existing programs)" ) - # Main evolution loop - total_iterations = start_iteration + max_iterations - - logger.info( - f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})" - ) - - # Island-based evolution variables - programs_per_island = max( - 1, max_iterations // (self.config.database.num_islands * 10) - ) # Dynamic allocation - current_island_counter = 0 - - logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands") - self.database.log_island_status() - - for i in range(start_iteration, total_iterations): - iteration_start = time.time() - - # Manage island evolution - switch islands periodically - if i > start_iteration and current_island_counter >= programs_per_island: - self.database.next_island() - current_island_counter = 0 - logger.debug(f"Switched to island {self.database.current_island}") - - current_island_counter += 1 - - # Sample parent and inspirations from current island - parent, inspirations = self.database.sample() - - # Get artifacts for the parent program if available - parent_artifacts = self.database.get_artifacts(parent.id) - - # Get actual top programs for prompt context (separate from inspirations) - # This ensures the LLM sees only high-performing programs as examples - actual_top_programs = self.database.get_top_programs(5) - - # Build prompt - prompt = self.prompt_sampler.build_prompt( - current_program=parent.code, - parent_program=parent.code, # We don't have the parent's code, use the same - program_metrics=parent.metrics, - previous_programs=[p.to_dict() for p in self.database.get_top_programs(3)], - top_programs=[p.to_dict() for p in actual_top_programs], # Use actual top programs - inspirations=[p.to_dict() for p in inspirations], # Pass inspirations separately - language=self.language, - evolution_round=i, - diff_based_evolution=self.config.diff_based_evolution, - program_artifacts=parent_artifacts if parent_artifacts else None, + # Initialize improved parallel processing + try: + self.parallel_controller = ImprovedParallelController( + self.config, self.evaluation_file, self.database ) + + # Set up signal handlers for graceful shutdown + def signal_handler(signum, frame): + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + self.parallel_controller.request_shutdown() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + self.parallel_controller.start() + + # Run evolution with improved parallel processing and checkpoint callback + await self._run_evolution_with_checkpoints( + start_iteration, max_iterations, target_score + ) + + finally: + # Clean up parallel processing resources + if self.parallel_controller: + self.parallel_controller.stop() + self.parallel_controller = None - # Generate code modification - try: - llm_response = await self.llm_ensemble.generate_with_context( - system_message=prompt["system"], - messages=[{"role": "user", "content": prompt["user"]}], - ) - - # Parse the response - if self.config.diff_based_evolution: - diff_blocks = extract_diffs(llm_response) - - if not diff_blocks: - logger.warning(f"Iteration {i+1}: No valid diffs found in response") - continue - - # Apply the diffs - child_code = apply_diff(parent.code, llm_response) - changes_summary = format_diff_summary(diff_blocks) - else: - # Parse full rewrite - new_code = parse_full_rewrite(llm_response, self.language) - - if not new_code: - logger.warning(f"Iteration {i+1}: No valid code found in response") - continue - - child_code = new_code - changes_summary = "Full rewrite" - - # Check code length - if len(child_code) > self.config.max_code_length: - logger.warning( - f"Iteration {i+1}: Generated code exceeds maximum length " - f"({len(child_code)} > {self.config.max_code_length})" - ) - continue - - # Evaluate the child program - child_id = str(uuid.uuid4()) - child_metrics = await self.evaluator.evaluate_program(child_code, child_id) - - # Handle artifacts if they exist - artifacts = self.evaluator.get_pending_artifacts(child_id) - - # Create a child program - child_program = Program( - id=child_id, - code=child_code, - language=self.language, - parent_id=parent.id, - generation=parent.generation + 1, - metrics=child_metrics, - metadata={ - "changes": changes_summary, - "parent_metrics": parent.metrics, - }, - ) - - # Add to database (will be added to current island) - self.database.add(child_program, iteration=i + 1) - - # Log prompts - self.database.log_prompt( - template_key=( - "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" - ), - program_id=child_id, - prompt=prompt, - responses=[llm_response], - ) - - # Store artifacts if they exist - if artifacts: - self.database.store_artifacts(child_id, artifacts) - - # Log prompts - self.database.log_prompt( - template_key=( - "full_rewrite_user" if not self.config.diff_based_evolution else "diff_user" - ), - program_id=child_id, - prompt=prompt, - responses=[llm_response], - ) - - # Increment generation for current island - self.database.increment_island_generation() - - # Check if migration should occur - if self.database.should_migrate(): - logger.info(f"Performing migration at iteration {i+1}") - self.database.migrate_programs() - self.database.log_island_status() - - # Log progress - iteration_time = time.time() - iteration_start - self._log_iteration(i, parent, child_program, iteration_time) - - # Specifically check if this is the new best program - if self.database.best_program_id == child_program.id: - logger.info(f"🌟 New best solution found at iteration {i+1}: {child_program.id}") - logger.info(f"Metrics: {format_metrics_safe(child_program.metrics)}") - - # Save checkpoint - if (i + 1) % self.config.checkpoint_interval == 0: - self._save_checkpoint(i + 1) - # Also log island status at checkpoints - logger.info(f"Island status at checkpoint {i+1}:") - self.database.log_island_status() - - # Check if target score reached - if target_score is not None: - # Only consider numeric metrics for target score calculation - numeric_metrics = [ - v - for v in child_metrics.values() - if isinstance(v, (int, float)) and not isinstance(v, bool) - ] - if numeric_metrics: - avg_score = sum(numeric_metrics) / len(numeric_metrics) - if avg_score >= target_score: - logger.info( - f"Target score {target_score} reached after {i+1} iterations" - ) - break - - except Exception as e: - logger.exception(f"Error in iteration {i+1}: {str(e)}") - continue - - # Get the best program using our tracking mechanism + # Get the best program best_program = None if self.database.best_program_id: best_program = self.database.get(self.database.best_program_id) logger.info(f"Using tracked best program: {self.database.best_program_id}") - # Fallback to calculating best program if tracked program not found if best_program is None: best_program = self.database.get_best_program() logger.info("Using calculated best program (tracked program not found)") # Check if there's a better program by combined_score that wasn't tracked - if "combined_score" in best_program.metrics: + if best_program and "combined_score" in best_program.metrics: best_by_combined = self.database.get_best_program(metric="combined_score") if ( best_by_combined @@ -456,7 +309,8 @@ async def run( f"Found program with better combined_score: {best_by_combined.id}" ) logger.warning( - f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" + f"Score difference: {best_program.metrics['combined_score']:.4f} vs " + f"{best_by_combined.metrics['combined_score']:.4f}" ) best_program = best_by_combined @@ -465,14 +319,10 @@ async def run( f"Evolution complete. Best program has metrics: " f"{format_metrics_safe(best_program.metrics)}" ) - - # Save the best program (using our tracked best program) self._save_best_program(best_program) - return best_program else: logger.warning("No valid programs found during evolution") - # Return None if no programs found instead of undefined initial_program return None def _log_iteration( @@ -558,6 +408,35 @@ def _save_checkpoint(self, iteration: int) -> None: logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") + def _load_checkpoint(self, checkpoint_path: str) -> None: + """Load state from a checkpoint directory""" + if not os.path.exists(checkpoint_path): + raise FileNotFoundError(f"Checkpoint directory {checkpoint_path} not found") + + logger.info(f"Loading checkpoint from {checkpoint_path}") + self.database.load(checkpoint_path) + logger.info( + f"Checkpoint loaded successfully (iteration {self.database.last_iteration})" + ) + + async def _run_evolution_with_checkpoints( + self, start_iteration: int, max_iterations: int, target_score: Optional[float] + ) -> None: + """Run evolution with checkpoint saving support""" + logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands") + self.database.log_island_status() + + # Run the evolution process with checkpoint callback + await self.parallel_controller.run_evolution( + start_iteration, max_iterations, target_score, + checkpoint_callback=self._save_checkpoint + ) + + # Save final checkpoint if needed + final_iteration = start_iteration + max_iterations - 1 + if final_iteration > 0 and final_iteration % self.config.checkpoint_interval == 0: + self._save_checkpoint(final_iteration) + def _save_best_program(self, program: Optional[Program] = None) -> None: """ Save the best program @@ -607,4 +486,4 @@ def _save_best_program(self, program: Optional[Program] = None) -> None: indent=2, ) - logger.info(f"Saved best program to {code_path} with program info to {info_path}") + logger.info(f"Saved best program to {code_path} with program info to {info_path}") \ No newline at end of file diff --git a/openevolve/database.py b/openevolve/database.py index 6e2cd8492..f33f994a1 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -9,7 +9,7 @@ import random import time from dataclasses import asdict, dataclass, field, fields -from pathlib import Path +# FileLock removed - no longer needed with threaded parallel processing from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np @@ -164,9 +164,6 @@ def add( self.programs[program.id] = program - # Enforce population size limit - self._enforce_population_limit() - # Calculate feature coordinates for MAP-Elites feature_coords = self._calculate_feature_coords(program) @@ -209,6 +206,10 @@ def add( self._save_program(program) logger.debug(f"Added program {program.id} to island {island_idx}") + + # Enforce population size limit + self._enforce_population_limit() + return program.id def get(self, program_id: str) -> Optional[Program]: @@ -350,7 +351,7 @@ def save(self, path: Optional[str] = None, iteration: int = 0) -> None: logger.warning("No database path specified, skipping save") return - # Create directory if it doesn't exist + # create directory if it doesn't exist os.makedirs(save_path, exist_ok=True) # Save each program @@ -604,7 +605,10 @@ def _calculate_feature_coords(self, program: Program) -> List[int]: else: # Default to middle bin if feature not found coords.append(self.feature_bins // 2) - + logging.info( + "MAP-Elites coords: %s", + str({self.config.feature_dimensions[i]: coords[i] for i in range(len(coords))}), + ) return coords def _feature_coords_to_key(self, coords: List[int]) -> str: diff --git a/openevolve/iteration.py b/openevolve/iteration.py new file mode 100644 index 000000000..af3d3850d --- /dev/null +++ b/openevolve/iteration.py @@ -0,0 +1,147 @@ +import asyncio +import os +import uuid +import logging +import time +from dataclasses import dataclass + +from openevolve.database import Program, ProgramDatabase +from openevolve.config import Config +from openevolve.evaluator import Evaluator +from openevolve.llm.ensemble import LLMEnsemble +from openevolve.prompt.sampler import PromptSampler +from openevolve.utils.code_utils import ( + apply_diff, + extract_diffs, + format_diff_summary, + parse_full_rewrite, +) + + +@dataclass +class Result: + """Resulting program and metrics from an iteration of OpenEvolve""" + + child_program: str = None + parent: str = None + child_metrics: str = None + iteration_time: float = None + prompt: str = None + llm_response: str = None + artifacts: dict = None + + + + + +async def run_iteration_with_shared_db( + iteration: int, + config: Config, + database: ProgramDatabase, + evaluator: Evaluator, + llm_ensemble: LLMEnsemble, + prompt_sampler: PromptSampler +): + """ + Run a single iteration using shared memory database + + This is optimized for use with persistent worker processes. + """ + logger = logging.getLogger(__name__) + + try: + # Sample parent and inspirations from database + parent, inspirations = database.sample() + + # Get artifacts for the parent program if available + parent_artifacts = database.get_artifacts(parent.id) + + # Get actual top programs for prompt context (separate from inspirations) + actual_top_programs = database.get_top_programs(5) + + # Build prompt + prompt = prompt_sampler.build_prompt( + current_program=parent.code, + parent_program=parent.code, + program_metrics=parent.metrics, + previous_programs=[p.to_dict() for p in database.get_top_programs(3)], + top_programs=[p.to_dict() for p in actual_top_programs], + inspirations=[p.to_dict() for p in inspirations], + language=config.language, + evolution_round=iteration, + diff_based_evolution=config.diff_based_evolution, + program_artifacts=parent_artifacts if parent_artifacts else None, + ) + + result = Result(parent=parent) + iteration_start = time.time() + + # Generate code modification + llm_response = await llm_ensemble.generate_with_context( + system_message=prompt["system"], + messages=[{"role": "user", "content": prompt["user"]}], + ) + + # Parse the response + if config.diff_based_evolution: + diff_blocks = extract_diffs(llm_response) + + if not diff_blocks: + logger.warning(f"Iteration {iteration+1}: No valid diffs found in response") + return None + + # Apply the diffs + child_code = apply_diff(parent.code, llm_response) + changes_summary = format_diff_summary(diff_blocks) + else: + # Parse full rewrite + new_code = parse_full_rewrite(llm_response, config.language) + + if not new_code: + logger.warning(f"Iteration {iteration+1}: No valid code found in response") + return None + + child_code = new_code + changes_summary = "Full rewrite" + + # Check code length + if len(child_code) > config.max_code_length: + logger.warning( + f"Iteration {iteration+1}: Generated code exceeds maximum length " + f"({len(child_code)} > {config.max_code_length})" + ) + return None + + # Evaluate the child program + child_id = str(uuid.uuid4()) + result.child_metrics = await evaluator.evaluate_program(child_code, child_id) + + # Handle artifacts if they exist + artifacts = evaluator.get_pending_artifacts(child_id) + + # Create a child program + result.child_program = Program( + id=child_id, + code=child_code, + language=config.language, + parent_id=parent.id, + generation=parent.generation + 1, + metrics=result.child_metrics, + iteration_found=iteration, + metadata={ + "changes": changes_summary, + "parent_metrics": parent.metrics, + }, + ) + + result.prompt = prompt + result.llm_response = llm_response + result.artifacts = artifacts + result.iteration_time = time.time() - iteration_start + result.iteration = iteration + + return result + + except Exception as e: + logger.exception(f"Error in iteration {iteration}: {e}") + return None diff --git a/openevolve/threaded_parallel.py b/openevolve/threaded_parallel.py new file mode 100644 index 000000000..e772d6920 --- /dev/null +++ b/openevolve/threaded_parallel.py @@ -0,0 +1,336 @@ +""" +Improved parallel processing using threads with shared memory +""" + +import asyncio +import logging +import signal +import threading +import time +from concurrent.futures import ThreadPoolExecutor, Future +from typing import Any, Dict, List, Optional + +from openevolve.config import Config +from openevolve.database import ProgramDatabase +from openevolve.evaluator import Evaluator +from openevolve.llm.ensemble import LLMEnsemble +from openevolve.prompt.sampler import PromptSampler +from openevolve.iteration import run_iteration_with_shared_db + +logger = logging.getLogger(__name__) + + +class ThreadedEvaluationPool: + """ + Thread-based parallel evaluation pool for improved performance + + Uses threads instead of processes to avoid pickling issues while + still providing parallelism for I/O-bound LLM calls. + """ + + def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase): + self.config = config + self.evaluation_file = evaluation_file + self.database = database + + self.num_workers = config.evaluator.parallel_evaluations + self.executor = None + + # Pre-initialize components for each thread + self.thread_local = threading.local() + + logger.info(f"Initializing threaded evaluation pool with {self.num_workers} workers") + + def start(self) -> None: + """Start the thread pool""" + self.executor = ThreadPoolExecutor( + max_workers=self.num_workers, + thread_name_prefix="EvalWorker" + ) + logger.info(f"Started thread pool with {self.num_workers} threads") + + def stop(self) -> None: + """Stop the thread pool""" + if self.executor: + self.executor.shutdown(wait=True) + self.executor = None + logger.info("Stopped thread pool") + + def submit_evaluation(self, iteration: int) -> Future: + """ + Submit an evaluation task to the thread pool + + Args: + iteration: Iteration number to evaluate + + Returns: + Future that will contain the result + """ + if not self.executor: + raise RuntimeError("Thread pool not started") + + return self.executor.submit(self._run_evaluation, iteration) + + def _run_evaluation(self, iteration: int): + """Run evaluation in a worker thread""" + # Get or create thread-local components + if not hasattr(self.thread_local, 'initialized'): + self._initialize_thread_components() + + try: + # Run the iteration + result = asyncio.run(run_iteration_with_shared_db( + iteration, + self.config, + self.database, # Shared database (thread-safe reads) + self.thread_local.evaluator, + self.thread_local.llm_ensemble, + self.thread_local.prompt_sampler + )) + + return result + + except Exception as e: + logger.error(f"Error in thread evaluation {iteration}: {e}") + return None + + def _initialize_thread_components(self) -> None: + """Initialize components for this thread""" + thread_id = threading.get_ident() + logger.debug(f"Initializing components for thread {thread_id}") + + try: + # Initialize LLM components + self.thread_local.llm_ensemble = LLMEnsemble(self.config.llm.models) + self.thread_local.llm_evaluator_ensemble = LLMEnsemble(self.config.llm.evaluator_models) + + # Initialize prompt samplers + self.thread_local.prompt_sampler = PromptSampler(self.config.prompt) + self.thread_local.evaluator_prompt_sampler = PromptSampler(self.config.prompt) + self.thread_local.evaluator_prompt_sampler.set_templates("evaluator_system_message") + + # Initialize evaluator + self.thread_local.evaluator = Evaluator( + self.config.evaluator, + self.evaluation_file, + self.thread_local.llm_evaluator_ensemble, + self.thread_local.evaluator_prompt_sampler, + database=self.database, + ) + + self.thread_local.initialized = True + logger.debug(f"Initialized components for thread {thread_id}") + + except Exception as e: + logger.error(f"Failed to initialize thread components: {e}") + raise + + +class ImprovedParallelController: + """ + Controller for improved parallel processing using shared memory and threads + """ + + def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase): + self.config = config + self.evaluation_file = evaluation_file + self.database = database + + self.thread_pool = None + self.database_lock = threading.RLock() # For database writes + self.shutdown_flag = threading.Event() # For graceful shutdown + + def start(self) -> None: + """Start the improved parallel system""" + self.thread_pool = ThreadedEvaluationPool( + self.config, self.evaluation_file, self.database + ) + self.thread_pool.start() + + logger.info("Started improved parallel controller") + + def stop(self) -> None: + """Stop the improved parallel system""" + self.shutdown_flag.set() # Signal shutdown + + if self.thread_pool: + self.thread_pool.stop() + self.thread_pool = None + + logger.info("Stopped improved parallel controller") + + def request_shutdown(self) -> None: + """Request graceful shutdown (for signal handlers)""" + logger.info("Graceful shutdown requested...") + self.shutdown_flag.set() + + async def run_evolution( + self, start_iteration: int, max_iterations: int, target_score: Optional[float] = None, + checkpoint_callback=None + ): + """ + Run evolution with improved parallel processing + + Args: + start_iteration: Starting iteration number + max_iterations: Maximum number of iterations + target_score: Target score to achieve + + Returns: + Best program found + """ + total_iterations = start_iteration + max_iterations + + logger.info( + f"Starting improved parallel evolution from iteration {start_iteration} " + f"for {max_iterations} iterations (total: {total_iterations})" + ) + + # Submit initial batch of evaluations + pending_futures = {} + batch_size = min(self.config.evaluator.parallel_evaluations * 2, max_iterations) + + for i in range(start_iteration, min(start_iteration + batch_size, total_iterations)): + future = self.thread_pool.submit_evaluation(i) + pending_futures[i] = future + + next_iteration_to_submit = start_iteration + batch_size + completed_iterations = 0 + + # Island management + programs_per_island = max(1, max_iterations // (self.config.database.num_islands * 10)) + current_island_counter = 0 + + # Process results as they complete + while pending_futures and completed_iterations < max_iterations and not self.shutdown_flag.is_set(): + # Find completed futures + completed_iteration = None + for iteration, future in list(pending_futures.items()): + if future.done(): + completed_iteration = iteration + break + + if completed_iteration is None: + # No results ready, wait a bit + await asyncio.sleep(0.01) + continue + + # Process completed result + future = pending_futures.pop(completed_iteration) + + try: + result = future.result() + + if result and hasattr(result, 'child_program') and result.child_program: + # Thread-safe database update + with self.database_lock: + self.database.add(result.child_program, iteration=completed_iteration) + + # Store artifacts if they exist + if result.artifacts: + self.database.store_artifacts(result.child_program.id, result.artifacts) + + # Log prompts + if hasattr(result, 'prompt') and result.prompt: + self.database.log_prompt( + template_key=( + "full_rewrite_user" if not self.config.diff_based_evolution + else "diff_user" + ), + program_id=result.child_program.id, + prompt=result.prompt, + responses=[result.llm_response] if hasattr(result, 'llm_response') else [], + ) + + # Manage island evolution + if completed_iteration > start_iteration and current_island_counter >= programs_per_island: + self.database.next_island() + current_island_counter = 0 + logger.debug(f"Switched to island {self.database.current_island}") + + current_island_counter += 1 + + # Increment generation for current island + self.database.increment_island_generation() + + # Check migration + if self.database.should_migrate(): + logger.info(f"Performing migration at iteration {completed_iteration}") + self.database.migrate_programs() + self.database.log_island_status() + + # Log progress (outside lock) + logger.info( + f"Iteration {completed_iteration}: " + f"Program {result.child_program.id} " + f"(parent: {result.parent.id if result.parent else 'None'}) " + f"completed in {result.iteration_time:.2f}s" + ) + + if result.child_program.metrics: + metrics_str = ", ".join([ + f"{k}={v:.4f}" if isinstance(v, (int, float)) else f"{k}={v}" + for k, v in result.child_program.metrics.items() + ]) + logger.info(f"Metrics: {metrics_str}") + + # Check for new best program + if self.database.best_program_id == result.child_program.id: + logger.info( + f"🌟 New best solution found at iteration {completed_iteration}: " + f"{result.child_program.id}" + ) + + # Save checkpoints at intervals + if completed_iteration % self.config.checkpoint_interval == 0: + logger.info(f"Checkpoint interval reached at iteration {completed_iteration}") + self.database.log_island_status() + if checkpoint_callback: + checkpoint_callback(completed_iteration) + + # Check target score + if target_score is not None and result.child_program.metrics: + numeric_metrics = [ + v for v in result.child_program.metrics.values() + if isinstance(v, (int, float)) + ] + if numeric_metrics: + avg_score = sum(numeric_metrics) / len(numeric_metrics) + if avg_score >= target_score: + logger.info( + f"Target score {target_score} reached after {completed_iteration} iterations" + ) + break + else: + logger.warning(f"No valid result from iteration {completed_iteration}") + + except Exception as e: + logger.error(f"Error processing result from iteration {completed_iteration}: {e}") + + completed_iterations += 1 + + # Submit next iteration if available + if next_iteration_to_submit < total_iterations: + future = self.thread_pool.submit_evaluation(next_iteration_to_submit) + pending_futures[next_iteration_to_submit] = future + next_iteration_to_submit += 1 + + # Handle shutdown or completion + if self.shutdown_flag.is_set(): + logger.info("Shutdown requested, canceling remaining evaluations...") + # Cancel remaining futures + for iteration, future in pending_futures.items(): + future.cancel() + logger.debug(f"Canceled iteration {iteration}") + else: + # Wait for any remaining futures if not shutting down + for iteration, future in pending_futures.items(): + try: + future.result(timeout=10.0) + except Exception as e: + logger.warning(f"Error waiting for iteration {iteration}: {e}") + + if self.shutdown_flag.is_set(): + logger.info("Evolution interrupted by shutdown") + else: + logger.info("Evolution completed") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 94f30a35c..b5de7e6b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openevolve" -version = "0.0.11" +version = "0.0.12" description = "Open-source implementation of AlphaEvolve" readme = "README.md" requires-python = ">=3.9" diff --git a/setup.py b/setup.py index e7ea0d5bb..2d13b91f2 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="openevolve", - version="0.0.11", + version="0.0.12", packages=find_packages(), include_package_data=True, )