# `optimize_anything`: A Universal API for Text-Based Optimization

**TL;DR**: We introduce `optimize_anything`, a single, declarative API that uses LLMs as intelligent proposers to optimize *anything* representable as text—code, prompts, configurations, agent architectures. The key insight: if it can be serialized to text, an LLM can reason about it and propose improvements. The secret sauce? **A**uxiliary **S**ide **I**nformation (ASI).

---

## Key Takeaways

1. **Unified Interface**: Whether you're optimizing prompts, code, hyperparameters, or agent architectures, the API is the same—just provide a `seed_candidate` (starting point) and a `fitness_fn` (how good are we doing?).

2. **The Convex Hull of Optimization**: `optimize_anything` is designed to be the "convex hull" of all text-based optimization problems. Different libraries optimize different things (Optuna for hyperparameters, evolutionary strategies for algorithms, gradient descent for neural networks). We unify them under one abstraction.

3. **Side Information is Key**: Unlike traditional optimizers that only see scalar scores, GEPA's LLM-based reflection can understand *why* a candidate performed poorly through rich diagnostic information—error messages, execution traces, partial results.

4. **Emergent Capabilities**: GEPA can discover sophisticated strategies (like self-refinement) that weren't explicitly programmed—they emerge from the optimization process itself.

---

## Results Summary

| Domain | Task | Baseline | Optimized | Improvement |
|--------|------|----------|-----------|-------------|
| **Mathematical Optimization** | EvalSet Benchmark | Optuna TPE | GEPA | Outperforms Optuna |
| **Prompt Engineering** | AIME 2025 (GPT-4.1 Mini) | 46.67% | 60.00% | +13.33% absolute |
| **Agent Evolution** | ARC-AGI (GPT-5) | 56.5% | 68.0% | +11.5% absolute |
| **Algorithmic Discovery** | Circle Packing (N=26) | 0.9798 | 2.6359 | Exceeds AlphaEvolve |
| **Systems: Scheduling** | Can't Be Late | Cost 96.48 | Cost 89.86 | **6.9% savings** |
| **Systems: Networking** | Cloudcast | $191 | $120 | **37.3% savings** |

---

## Outline

1. **[The Landscape of Optimization (The "Old" Way)](#section-1)** — The fragmented world of optimization libraries
2. **[The Unifying Abstraction: `optimize_anything`](#section-2)** — One API to rule them all
3. **[The Secret Weapon: Auxiliary Side Information (ASI)](#section-3)** — Why GEPA outperforms traditional optimizers
4. **[Example 1: Mathematical Optimization](#section-4)** — Evolving code to beat Optuna on EvalSet
5. **[Example 2: Prompt Engineering](#section-5)** — Optimizing prompts for AIME 2025
6. **[Example 3: Agent Program Evolution](#section-6)** — Evolving DSPy programs for ARC-AGI
7. **[Example 4: Algorithmic Discovery](#section-7)** — Circle packing that matches AlphaEvolve
8. **[Example 5: Systems Optimization](#section-adrs)** — Cloud infrastructure cost reduction
9. **[How It Works Under the Hood](#section-8)** — The GEPA engine
10. **[Conclusion](#section-9)** — From imperative to declarative optimization

---

<a id="section-1"></a>
# 1. The Landscape of Optimization (The "Old" Way)

The world of optimization is **fragmented**. Each problem domain has its own specialized library with its own API, paradigm, and learning curve. Let's look at the major categories:

### Hyperparameter/Black-Box Optimization (Optuna)

For hyperparameter tuning, you use Bayesian optimization or Tree-structured Parzen Estimators (TPE):

In [None]:
import optuna

def objective(trial):
    # Suggest hyperparameters
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    n_layers = trial.suggest_int("n_layers", 1, 5)
    
    # Train model and return validation score
    model = build_model(lr, n_layers)
    return train_and_evaluate(model)

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

### Mathematical Optimization (SciPy)

For continuous optimization of mathematical functions, you use classical algorithms like L-BFGS-B or SLSQP:

In [None]:
from scipy.optimize import minimize

def rosenbrock(x):
    return sum(100*(x[1:] - x[:-1]**2)**2 + (1 - x[:-1])**2)

result = minimize(
    rosenbrock, 
    x0=[0, 0, 0, 0],
    method='L-BFGS-B',
    bounds=[(-5, 5)] * 4
)

### Evolutionary Algorithms (DEAP)

For evolving programs or complex structures, you use genetic algorithms:

In [None]:
from deap import base, creator, tools, algorithms

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", eval_func)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

population = toolbox.population(n=300)
algorithms.eaSimple(population, toolbox, cxpb=0.5, mutpb=0.2, ngen=40)

### The Problem: Fragmentation

**A user needs to learn 3 different paradigms to solve 3 different optimization problems.**

| Library | Domain | Paradigm | What You Must Know |
|---------|--------|----------|-------------------|
| Optuna | Hyperparameters | Bayesian/TPE | Samplers, pruners, search space definition |
| SciPy | Mathematical Functions | Classical Methods | Algorithm selection (L-BFGS, SLSQP, etc.) |
| DEAP | Evolutionary | Genetic Algorithms | Crossover, mutation, selection operators |

Each library has:
- **Different APIs and abstractions** — you can't just swap one for another
- **Different optimization strategies** hard-coded into the implementation
- **Different assumptions** about what can be optimized (differentiable? discrete? continuous?)

### The Insight: Text is the Universal Representation

Here's the key insight: **if something can be represented as text, an LLM can reason about it and propose improvements**.

- **Code** is text → LLMs can write and improve code
- **Prompts** are text → LLMs can refine instructions
- **Configurations** are text → LLMs can tune JSON/YAML
- **Agent architectures** are text → LLMs can evolve program structure

What if we had **one API** that could optimize all of them—by leveraging the LLM's ability to understand and generate text?

<!-- This cell intentionally left empty - placeholder for removal -->

---

<a id="section-2"></a>
# 2. The Unifying Abstraction: `optimize_anything`

We introduce `optimize_anything`—a single entry point for optimizing any text-representable artifact. It's designed to be the **"Convex Hull"** of all optimization problems: every point in the space of text-based optimization can be reached through this API.

## The API Signature

The API is intentionally minimal. You need only two things:
1. **A seed candidate** — your starting point
2. **A fitness function** — how to measure success

In [None]:
from gepa.optimize_anything import optimize_anything, GEPAConfig

def optimize_anything(
    # === REQUIRED ===
    seed_candidate: dict[str, str],           # Your starting point (text parameters to optimize)
    fitness_fn: FitnessFn,                    # How to measure success
    
    # === OPTIONAL: Data ===
    dataset: list[DataInst] | None = None,   # Examples to optimize on (for example, multiple related tasks)
    valset: list[DataInst] | None = None,    # Held-out set for ensuring generalization if required
    
    # === OPTIONAL: Natural Language Guidance ===
    objective: str | None = None,            # What you're trying to achieve (e.g. "Find a prompt that maximizes accuracy")
    background: str | None = None,           # Domain knowledge, constraints, strategies (e.g. Domain knowledge about the framework which the candidate is written in)
    
    # === OPTIONAL: Fine-Grained Control ===
    config: GEPAConfig | None = None,        # Engine, reflection, tracking settings
) -> GEPAResult:
    """
    Optimize any parameterized system using evolutionary algorithms with LLM-based reflection.
    
    Returns:
        GEPAResult containing best_candidate, optimization history, and metrics.
    """
    ...

## The Philosophy: Declare, Don't Implement

With `optimize_anything`, the user **declares** the optimization problem:

| You Provide | Example | Purpose |
|-------------|---------|---------|
| `seed_candidate` | `{"prompt": "Solve this math problem:"}` | Your starting point |
| `fitness_fn` | Returns (score, output, side_info) | How to measure success |
| `dataset` (optional) | List of test cases | Multi-instance generalization |
| `objective` (optional) | "Find a prompt that maximizes accuracy" | Natural language guidance |
| `background` (optional) | "Solutions must handle edge cases" | Domain knowledge |

GEPA handles the **how**: proposing mutations, reflecting on failures, selecting candidates, and tracking the optimization trajectory.

## Two Modes of Operation

### Per-Instance Mode (with `dataset`)

For problems where you want parameters that **generalize** across examples:
- **Prompt optimization**: The same prompt should work on many math problems
- **Agent architecture search**: The same agent should solve many tasks

```python
# dataset is a list of examples
result = optimize_anything(
    seed_candidate={"prompt": "Solve:"},
    fitness_fn=evaluate_prompt,
    dataset=math_problems,  # ← Optimize across these
    valset=held_out_problems,  # ← Test generalization
)
```

### Single-Instance Mode (without `dataset`)

For problems defined by a **single optimization target**:
- **Circle packing**: Maximize sum of radii for N circles
- **Code evolution**: Minimize a mathematical function

```python
# dataset=None triggers single-instance mode
result = optimize_anything(
    seed_candidate={"code": "def solve(): ..."},
    fitness_fn=evaluate_code,
    dataset=None,  # ← Single optimization target
)
```

## The Fitness Function: Your Optimization Signal

The fitness function is where you define *what* you're optimizing for:

In [None]:
from typing import Any

def fitness_fn(
    candidate: dict[str, str],  # The parameters being optimized
    example: Any | None = None  # A single data instance (None for single-instance mode)
) -> tuple[float, Any, dict]:
    """
    Returns:
        score: Higher is better
        output: The actual output produced (for tracking)
        side_info: Diagnostic information for LLM reflection
    """
    # Run your system with the candidate parameters
    output = run_my_system(candidate, example)
    
    # Compute a score (higher is better)
    score = compute_score(output, example)
    
    # Collect diagnostic info for LLM reflection
    side_info = {
        "Input": example["input"],
        "Output": output,
        "Expected": example["expected"],
        "Error": get_error_message(output),
    }
    
    return score, output, side_info

---

<a id="section-3"></a>
# 3. The Secret Weapon: Auxiliary Side Information (ASI)

The `side_info` dictionary is GEPA's secret weapon—we call it **ASI** (**A**uxiliary **S**ide **I**nformation). 

> *While the AI community debates when we'll achieve ASI (Artificial Superintelligence), you can achieve **your** ASI today—just return rich diagnostic information from your fitness function.*

## Why ASI Matters

Traditional optimizers only see a **scalar score**:

```
Candidate A → Score: 0.73  (Why did it fail? No idea.)
Candidate B → Score: 0.85  (What made it better? Unknown.)
```

GEPA's LLM-based reflection can understand **why** a candidate performed the way it did:

```
Candidate A → Score: 0.73
  side_info: {
    "Error": "Circle 3 and Circle 7 overlap by 0.02 units",
    "Boundary violations": ["Circle 12 extends past x=1.0"],
    "Best score achieved": 2.847
  }
```

Now the LLM knows *exactly* what to fix.

## What to Include in ASI

| Information Type | Example | Purpose |
|-----------------|---------|----------|
| **Error messages** | `"SyntaxError: invalid syntax on line 42"` | Helps LLM fix code bugs |
| **Execution traces** | `"Called API 3x, timeout on 3rd call"` | Helps LLM understand behavior |
| **Partial results** | `"3/5 test cases passed"` | Helps LLM identify failure patterns |
| **Expected vs Actual** | `"Expected: [1,2,3], Got: [1,2,4]"` | Helps LLM understand what went wrong |
| **Domain feedback** | `"Circles overlap at positions (0.5, 0.3)"` | Helps LLM make domain-aware improvements |
| **Reasoning traces** | `"Model's chain-of-thought: ..."` | Helps LLM understand failure modes |

## The ASI Design Principle

**Be generous with information.** Include anything that would help a human expert understand why the candidate succeeded or failed. The LLM will use this to make targeted, intelligent improvements rather than random mutations.

```python
# Good ASI
side_info = {
    "Input": problem_description,
    "Output": model_output,
    "Expected": correct_answer,
    "Reasoning": model_reasoning_trace,
    "Error": "Division by zero on line 15",
    "Partial scores": {"accuracy": 0.8, "efficiency": 0.3},
}

# Bad ASI (not enough information)
side_info = {"score": 0.73}  # LLM can't help with just this!
```

---

<a id="section-4"></a>
# 4. Example 1: Mathematical Optimization — Beating Optuna

**Result: GEPA outperforms Optuna on the EvalSet benchmark.**

This example demonstrates how `optimize_anything` can evolve **code** that implements optimization algorithms—essentially using LLMs to discover optimization strategies automatically.

## The Challenge

Optuna is the industry standard for black-box optimization. But using Optuna effectively requires:
- Choosing sampling algorithms (TPE, CMA-ES, Random, etc.)
- Defining search spaces manually
- Tuning algorithm-specific hyperparameters
- Deep knowledge of optimization theory

(Luke: the claims above are too strong? Optuna is actually very simplistic)

**What if we could just write code that finds minima, and let GEPA evolve the strategy?**

## The Task

**Given**: A black-box function `objective_function(x) → float` with bounds
**Find**: Python code that discovers the minimum

**What GEPA optimizes**: The Python code itself—algorithm choice, implementation, hyperparameters, heuristics.

<img src="./assets/blog/mathematical_optimization.png" width="60%">

*GEPA starts below Optuna but progressively discovers better strategies, eventually surpassing it.*

## Setting Up the Problem

We use the [EvalSet benchmark](https://github.com/sigopt/evalset)—a collection of challenging optimization test functions (Ackley, Rosenbrock, Rastrigin, etc.).

In [None]:
from examples.polynomial.evalset.problems import problems, problem_configs

problem_index = 0
problems[problem_index], problem_configs[problem_index]

(Ackley(11), {'name': 'Ackley', 'dim': 11, 'int': None, 'res': None})

## The Seed Candidate

We start with a trivial baseline—random sampling:

In [8]:
seed_candidate = {
    "code": """import numpy as np

def solve(objective_function, config, prev_best_x=None):
    bounds = np.array(config['bounds'])
    x = np.random.uniform(bounds[:, 0], bounds[:, 1])
    y = objective_function(x)
    return x
"""
}

## The Fitness Function

The fitness function executes the code in a sandbox and captures rich diagnostic information:

In [None]:
# Luke: need to either remove the whole custom objective tracking feature or abstract it away into GEPA

from typing import Any
import numpy as np
import json
from pathlib import Path

from gepa.optimize_anything import SideInfo
from gepa.utils.code_execution import execute_code as _execute_code, ExecutionMode


class FitnessEvaluator:
    """Fitness evaluator for GEPA blackbox optimization."""

    def __init__(
        self,
        problem_index: int,
        timeout: int = 300,
        evaluation_budget: int = 100,
        log_dir: str = None,
        seed: int = 0,
    ):
        self.problem_index = problem_index
        self.timeout = timeout
        self.evaluation_budget = evaluation_budget
        self.log_dir = Path(log_dir) if log_dir else None
        self.seed = seed

        # State tracking for warm-start (minimization: lower is better)
        self.evaluation_history = []
        self.best_score = float("inf")
        self.best_x = None

    def evaluate(self, candidate: dict[str, str], **kwargs) -> tuple[float, Any, SideInfo]:
        """Evaluate code candidate on a single problem."""
        code = candidate["code"]
        function = problems[self.problem_index]
        problem_config = problem_configs[self.problem_index]

        # Track state for this candidate
        eval_count = 0
        best_candidate_score = float("inf")
        errors = []

        def objective_function(x):
            nonlocal eval_count, best_candidate_score
            if eval_count >= self.evaluation_budget:
                raise ValueError(f"Evaluation budget exceeded: {eval_count} >= {self.evaluation_budget}")
            eval_count += 1

            score = function.do_evaluate(np.array(x))

            if score < best_candidate_score:
                best_candidate_score = score
            if score < self.best_score:
                self.best_score = score
                self.best_x = np.array(x).copy()

            self.evaluation_history.append({
                "score": score,
                "best_score": self.best_score,
            })
            return score

        # Execute code
        result = _execute_code(
            code=code,
            timeout=self.timeout,
            mode=ExecutionMode.IN_PROCESS,
            entry_point="solve",
            entry_point_kwargs={
                "objective_function": objective_function,
                "config": {"bounds": function.bounds, "dim": function.dim, "budget": self.evaluation_budget},
                "prev_best_x": self.best_x,
            },
            seed=self.seed,
        )

        x = result.variables.get("__return__")
        stdout = self._truncate(result.stdout)
        stderr = self._truncate(result.stderr)

        if result.error:
            errors.append(result.error)
        if result.traceback and result.traceback not in (result.error or ""):
            errors.append(result.traceback)
        if x is None or not isinstance(x, np.ndarray):
            errors.append("Code did not return a valid numpy array.")
        if eval_count == 0:
            errors.append("No objective_function calls were made.")

        # Use best score found, or inf if none
        score = best_candidate_score if best_candidate_score < float("inf") else float("inf")
        print(f"Best score from {eval_count} calls: {score}")

        side_info = {
            "score": score,
            "Input": problem_config["name"],
            "Prints": stdout,
            "Logs": stderr,
            "Error": "\n".join(errors) if errors else "",
        }

        output = {
            **side_info,
            "code": code,
            "X": " ".join(map(str, x.ravel())) if x is not None else "not found",
        }

        self.save()
        gepa_score = -score if score < float("inf") else -1e9
        return (gepa_score, output, side_info)

    def save(self, verbose: bool = False):
        """Save evaluation history to JSON."""
        if not self.log_dir:
            return
        self.log_dir.mkdir(parents=True, exist_ok=True)
        filename = self.log_dir / f"evaluation_history.json"
        try:
            with open(filename, "w") as f:
                json.dump(self.evaluation_history, f, indent=2, default=lambda o: o.tolist() if isinstance(o, np.ndarray) else o)
            if verbose:
                print(f"Saved to {filename}")
        except Exception as e:
            print(f"Warning: Failed to save: {e}")
            
    def  _truncate(self, text: str, limit: int = 4000) -> str:
        """Truncate text to avoid token limits."""
        if len(text) <= limit:
            return text
        half = limit // 2
        return text[:half] + "\n...[truncated]...\n" + text[-half:]


total_evaluation_budgets = 8000
num_proposals = 10
evaluation_budget_per_proposal = total_evaluation_budgets // num_proposals
seconds_per_trial=2
timeout_per_candidate = evaluation_budget_per_proposal * seconds_per_trial

# Create evaluator
evaluator = FitnessEvaluator(
    problem_index=problem_index,
    timeout=timeout_per_candidate,
    evaluation_budget=evaluation_budget_per_proposal,
    seed=0
)

## Running GEPA Optimization

In [None]:
from examples.polynomial.prompt import BACKGROUND
from gepa.optimize_anything import (
    optimize_anything,
    GEPAConfig,
    EngineConfig,
    ReflectionConfig,
)

gepa_config = GEPAConfig(
    engine=EngineConfig(
        max_metric_calls=num_proposals,
        track_best_outputs=True,
        cache_evaluation=True,
    ),
    reflection=ReflectionConfig(
        reflection_lm="openai/gpt-5",
        reflection_minibatch_size=1,
    )
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=evaluator.evaluate,
    config=gepa_config,
    objective="Evolve Python code that minimizes a blackbox objective function using the available evaluation budget efficiently.",
    background=BACKGROUND,
)

print("Optimized code:")
print(result.best_candidate["code"])

Best score from 1 calls: 21.109047957197003
Iteration 0: Base program full valset score: -21.109047957197003 over 1 / 1 examples
Iteration 1: Selected program 0 score: -21.109047957197003
Best score from 1 calls: 21.109047957197003


KeyboardInterrupt: 

## What GEPA Discovered

GEPA evolved the trivial random sampler into a sophisticated optimization strategy. Here's a snippet from the evolved code:

In [None]:
# Evolved by GEPA - combines multiple strategies:

evolved_code = '''
import numpy as np                                                                                                                                                                                   
                                                                                                                                                                                                       
def solve(objective_function, config, prev_best_x=None):                                                                                                                                             
    """                                                                                                                                                                                              
    Hybrid global-local blackbox minimization with adaptive budget allocation.                                                                                                                       
                                                                                                                                                                                                    
    Strategy:                                                                                                                                                                                        
    - Warm-start from prev_best_x when available and compatible                                                                                                                                      
    - Global search with quasi-random exploration and sampling around current best                                                                                                                   
    - Local search via adaptive coordinate-wise pattern search with random                                                                                                                           
    perturbations and occasional global kicks                                                                                                                                                      
    - Budget and dimension aware: adapts exploration / exploitation split                                                                                                                            
    and step sizes                                                                                                                                                                                 
    """                                                                                                                                                                                              
                                                                                                                                                                                                    
    bounds = np.array(config["bounds"], dtype=float)                                                                                                                                                 
    dim = int(config.get("dim", len(bounds)))                                                                                                                                                        
    budget = int(config.get("budget", 1))                                                                                                                                                            
                                                                                                                                                                                                    
    if dim <= 0 or budget <= 0:                                                                                                                                                                      
        mid = bounds[:, 0] + 0.5 * (bounds[:, 1] - bounds[:, 0])                                                                                                                                     
        return np.asarray(mid, dtype=float)                                                                                                                                                          
                                                                                                                                                                                                    
    lower = bounds[:, 0].astype(float)                                                                                                                                                               
    upper = bounds[:, 1].astype(float)                                                                                                                                                               
    span = upper - lower                                                                                                                                                                             
                                                                                                                                                                                                    
    fixed_mask = span <= 0.0                                                                                                                                                                         
    span_safe = np.where(fixed_mask, 1.0, span)                                                                                                                                                      
                                                                                                                                                                                                    
    def clamp(x):                                                                                                                                                                                    
        return np.clip(x, lower, upper)                                                                                                                                                              
                                                                                                                                                                                                    
    evals_used = 0                                                                                                                                                                                   
    best_x = None                                                                                                                                                                                    
    best_y = None                                                                                                                                                                                    
                                                                                                                                                                                                    
    def eval_point(x):                                                                                                                                                                               
        nonlocal evals_used, best_x, best_y                                                                                                                                                          
        if evals_used >= budget:                                                                                                                                                                     
            return best_y if best_y is not None else np.inf                                                                                                                                          
        y = objective_function(x)                                                                                                                                                                    
        evals_used += 1                                                                                                                                                                              
        if best_x is None or y < best_y:                                                                                                                                                             
            best_x = np.array(x, copy=True)                                                                                                                                                          
            best_y = float(y)                                                                                                                                                                        
        return y                                                                                                                                                                                     
                                                                                                                                                                                                    
    # ---- Initialization: warm start + LHS-like starts ----                                                                                                                                         
    if prev_best_x is not None:                                                                                                                                                                      
        x0 = np.asarray(prev_best_x, dtype=float)                                                                                                                                                    
        if x0.shape[0] == dim:                                                                                                                                                                       
            x0 = clamp(x0)                                                                                                                                                                           
            eval_point(x0)                                                                                                                                                                           
                                                                                                                                                                                                    
    remaining = budget - evals_used                                                                                                                                                                  
    if remaining > 0:                                                                                                                                                                                
        if budget < 40:                                                                                                                                                                              
            init_trials = min(remaining, 6)                                                                                                                                                          
        else:                                                                                                                                                                                        
            init_trials = min(max(10, budget // 20), 25)                                                                                                                                             
                                                                                                                                                                                                    
        n_init = init_trials                                                                                                                                                                         
        if n_init > 0:                                                                                                                                                                               
            cut = np.linspace(0.0, 1.0, n_init + 1)                                                                                                                                                  
            u = np.random.rand(n_init, dim)                                                                                                                                                          
            a = cut[:n_init]                                                                                                                                                                         
            b = cut[1:n_init + 1]                                                                                                                                                                    
            u = a[:, None] + (b - a)[:, None] * u                                                                                                                                                    
            for d in range(dim):                                                                                                                                                                     
                np.random.shuffle(u[:, d])                                                                                                                                                           
            xs = lower + u * span_safe                                                                                                                                                               
            for k in range(n_init):                                                                                                                                                                  
                if evals_used >= budget:                                                                                                                                                             
                    break                                                                                                                                                                            
                eval_point(xs[k])                                                                                                                                                                    
                                                                                                                                                                                                    
    if best_x is None:                                                                                                                                                                               
        x_mid = clamp(lower + 0.5 * span)                                                                                                                                                            
        eval_point(x_mid)                                                                                                                                                                            
                                                                                                                                                                                                    
    if evals_used >= budget:                                                                                                                                                                         
        return best_x                                                                                                                                                                                
                                                                                                                                                                                                    
    remaining = budget - evals_used                                                                                                                                                                  
                                                                                                                                                                                                    
    # ---- Budget split: global vs local ----                                                                                                                                                        
    if remaining < 40 or dim > 20:                                                                                                                                                                   
        global_frac = 0.7                                                                                                                                                                            
    else:                                                                                                                                                                                            
        global_frac = 0.55                                                                                                                                                                           
                                                                                                                                                                                                    
    global_budget = max(1, int(global_frac * remaining))                                                                                                                                             
    global_budget = min(global_budget, remaining)                                                                                                                                                    
    local_budget = remaining - global_budget                                                                                                                                                         
                                                                                                                                                                                                    
    # ---- Quasi-random utilities ----                                                                                                                                                               
    def van_der_corput(n, base=2):                                                                                                                                                                   
        if n <= 0:                                                                                                                                                                                   
            return np.empty(0, dtype=float)                                                                                                                                                          
        seq = np.empty(n, dtype=float)                                                                                                                                                               
        for i in range(n):                                                                                                                                                                           
            v = i                                                                                                                                                                                    
            denom = 1.0                                                                                                                                                                              
            x = 0.0                                                                                                                                                                                  
            while v:                                                                                                                                                                                 
                v, r = divmod(v, base)                                                                                                                                                               
                denom *= base                                                                                                                                                                        
                x += r / denom                                                                                                                                                                       
            seq[i] = x                                                                                                                                                                               
        return seq                                                                                                                                                                                   
                                                                                                                                                                                                    
    def quasi_random_points(n_points, dim_):                                                                                                                                                         
        if n_points <= 0:                                                                                                                                                                            
            return np.empty((0, dim_), dtype=float)                                                                                                                                                  
        base_sequence = van_der_corput(n_points + 16, 2)[8:8 + n_points]                                                                                                                             
        pts = np.empty((n_points, dim_), dtype=float)                                                                                                                                                
        for d in range(dim_):                                                                                                                                                                        
            perm = np.random.permutation(n_points)                                                                                                                                                   
            pts[:, d] = base_sequence[perm]                                                                                                                                                          
        jitter = (np.random.rand(n_points, dim_) - 0.5) / (4.0 * max(n_points, 1))                                                                                                                   
        u = np.clip(pts + jitter, 0.0, 1.0)                                                                                                                                                          
        return lower + u * span_safe                                                                                                                                                                 
                                                                                                                                                                                                    
    # ---- Global search ----                                                                                                                                                                        
    n_global = global_budget                                                                                                                                                                         
    refine_budget = max(0, int(0.3 * n_global))                                                                                                                                                      
    pure_explore_budget = max(0, n_global - refine_budget)                                                                                                                                           
                                                                                                                                                                                                    
    if pure_explore_budget > 0 and evals_used < budget:                                                                                                                                              
        xs = quasi_random_points(pure_explore_budget, dim)                                                                                                                                           
        for i in range(pure_explore_budget):                                                                                                                                                         
            if evals_used >= budget:                                                                                                                                                                 
                break                                                                                                                                                                                
            eval_point(xs[i])                                                                                                                                                                        
                                                                                                                                                                                                    
    if refine_budget > 0 and evals_used < budget and best_x is not None:                                                                                                                             
        base_sigma = 0.10 * span_safe                                                                                                                                                                
        base_sigma[fixed_mask] = 0.0                                                                                                                                                                 
        for t in range(refine_budget):                                                                                                                                                               
            if evals_used >= budget:                                                                                                                                                                 
                break                                                                                                                                                                                
            frac = 1.0 - (t / max(refine_budget - 1, 1))                                                                                                                                             
            sigma = np.maximum(base_sigma * (0.5 + 0.5 * frac), 1e-16)                                                                                                                               
            noise = np.random.randn(dim) * sigma                                                                                                                                                     
            cand = clamp(best_x + noise)                                                                                                                                                             
            eval_point(cand)                                                                                                                                                                         
                                                                                                                                                                                                    
    if evals_used >= budget or local_budget <= 0:                                                                                                                                                    
        return best_x                                                                                                                                                                                
                                                                                                                                                                                                    
    # ---- Local search: adaptive coordinate-wise + global kicks ----                                                                                                                                
    remaining = budget - evals_used                                                                                                                                                                  
                                                                                                                                                                                                    
    if remaining < 40:                                                                                                                                                                               
        step_frac = 0.16                                                                                                                                                                             
    elif dim <= 5:                                                                                                                                                                                   
        step_frac = 0.15                                                                                                                                                                             
    else:                                                                                                                                                                                            
        step_frac = 0.11                                                                                                                                                                             
                                                                                                                                                                                                    
    step = step_frac * span_safe                                                                                                                                                                     
    step[fixed_mask] = 0.0                                                                                                                                                                           
    min_step = 1e-10 * np.maximum(span_safe, 1.0)                                                                                                                                                    
                                                                                                                                                                                                    
    per_iter_cost = max(2 * dim + 4, 1)                                                                                                                                                              
    max_iters = max(1, min(60, local_budget // per_iter_cost))                                                                                                                                       
                                                                                                                                                                                                    
    it = 0                                                                                                                                                                                           
    no_improve_iters = 0                                                                                                                                                                             
    second_best_x = None                                                                                                                                                                             
    second_best_y = None                                                                                                                                                                             
                                                                                                                                                                                                    
    while evals_used < budget and it < max_iters and np.any(step > min_step):                                                                                                                        
        it += 1                                                                                                                                                                                      
        improved = False                                                                                                                                                                             
        current_best_x = best_x.copy()                                                                                                                                                               
                                                                                                                                                                                                    
        order = np.arange(dim)                                                                                                                                                                       
        np.random.shuffle(order)                                                                                                                                                                     
                                                                                                                                                                                                    
        for j in order:                                                                                                                                                                              
            if evals_used >= budget:                                                                                                                                                                 
                break                                                                                                                                                                                
            if fixed_mask[j]:                                                                                                                                                                        
                continue                                                                                                                                                                             
                                                                                                                                                                                                    
            for direction in (-1.0, 1.0):                                                                                                                                                            
                if evals_used >= budget:                                                                                                                                                             
                    break                                                                                                                                                                            
                cand = current_best_x.copy()                                                                                                                                                         
                cand[j] = cand[j] + direction * step[j]                                                                                                                                              
                cand = clamp(cand)                                                                                                                                                                   
                y = eval_point(cand)                                                                                                                                                                 
                if (second_best_x is None or y < second_best_y) and (best_y is None or y > best_y):                                                                                                  
                    second_best_x = cand.copy()                                                                                                                                                      
                    second_best_y = float(y)                                                                                                                                                         
                if y < best_y:                                                                                                                                                                       
                    improved = True                                                                                                                                                                  
                    current_best_x = best_x.copy()                                                                                                                                                   
                                                                                                                                                                                                    
            if evals_used >= budget:                                                                                                                                                                 
                break                                                                                                                                                                                
                                                                                                                                                                                                    
            if step[j] > 0:                                                                                                                                                                          
                for _ in range(2):                                                                                                                                                                   
                    if evals_used >= budget:                                                                                                                                                         
                        break                                                                                                                                                                        
                    rnd = (2.0 * np.random.rand() - 1.0) * step[j]                                                                                                                                   
                    cand = current_best_x.copy()                                                                                                                                                     
                    cand[j] = cand[j] + rnd                                                                                                                                                          
                    cand = clamp(cand)                                                                                                                                                               
                    y = eval_point(cand)                                                                                                                                                             
                    if (second_best_x is None or y < second_best_y) and (best_y is None or y > best_y):                                                                                              
                        second_best_x = cand.copy()                                                                                                                                                  
                        second_best_y = float(y)                                                                                                                                                     
                    if y < best_y:                                                                                                                                                                   
                        improved = True                                                                                                                                                              
                        current_best_x = best_x.copy()                                                                                                                                               
                                                                                                                                                                                                    
        if evals_used >= budget:                                                                                                                                                                     
            break                                                                                                                                                                                    
                                                                                                                                                                                                    
        # Gaussian escape step                                                                                                                                                                       
        full_noise = np.random.randn(dim) * (0.35 * step)                                                                                                                                            
        full_noise[fixed_mask] = 0.0                                                                                                                                                                 
        cand = clamp(best_x + full_noise)                                                                                                                                                            
        y = eval_point(cand)                                                                                                                                                                         
        if (second_best_x is None or y < second_best_y) and (best_y is None or y > best_y):                                                                                                          
            second_best_x = cand.copy()                                                                                                                                                              
            second_best_y = float(y)                                                                                                                                                                 
        if y < best_y:                                                                                                                                                                               
            improved = True                                                                                                                                                                          
                                                                                                                                                                                                    
        # Diversification from second-best when stuck                                                                                                                                                
        if (not improved) and (no_improve_iters >= 4) and (second_best_x is not None):                                                                                                               
            kick_scale = 0.25                                                                                                                                                                        
            noise = np.random.randn(dim) * (kick_scale * np.maximum(step, min_step))                                                                                                                 
            noise[fixed_mask] = 0.0                                                                                                                                                                  
            cand = clamp(second_best_x + noise)                                                                                                                                                      
            y = eval_point(cand)                                                                                                                                                                     
            if (second_best_x is None or y < second_best_y) and (best_y is None or y > best_y):                                                                                                      
                second_best_x = cand.copy()                                                                                                                                                          
                second_best_y = float(y)                                                                                                                                                             
            if y < best_y:                                                                                                                                                                           
                improved = True                                                                                                                                                                      
                                                                                                                                                                                                    
        # Step size adaptation                                                                                                                                                                       
        if not improved:                                                                                                                                                                             
            no_improve_iters += 1                                                                                                                                                                    
            if no_improve_iters <= 3:                                                                                                                                                                
                decay = 0.7                                                                                                                                                                          
            elif no_improve_iters <= 7:                                                                                                                                                              
                decay = 0.5                                                                                                                                                                          
            else:                                                                                                                                                                                    
                decay = 0.35                                                                                                                                                                         
            step *= decay                                                                                                                                                                            
        else:                                                                                                                                                                                        
            no_improve_iters = 0                                                                                                                                                                     
            step *= 1.05                                                                                                                                                                             
                                                                                                                                                                                                    
        step = np.maximum(step, min_step)                                                                                                                                                            
                                                                                                                                                                                                    
    return best_x
'''

### Key Takeaway

**GEPA vs. Traditional Optimization**:

| Aspect | Optuna | GEPA |
|--------|--------|------|
| Algorithm selection | Manual (TPE, CMA-ES, etc.) | Automatic (evolved) |
| Hyperparameter tuning | Required | Evolved |
| Domain knowledge needed | High | Low |
| What user provides | Search space + sampler config | Baseline code + fitness function |
| What gets optimized | Parameter values | The optimization algorithm itself |

While Optuna requires users to select algorithms and tune hyperparameters, GEPA automatically **discovers optimization strategies** by evolving code. The user just provides the problem and a baseline—GEPA evolves Halton sequences, surrogate models, local refinement, and more.

---

<a id="section-5"></a>
# 5. Example 2: Prompt Engineering — AIME 2025

**Result: GEPA improves GPT-4.1 Mini's accuracy from 46.67% to 60.00% on AIME 2025.**

This example demonstrates how `optimize_anything` can evolve **prompts**—the natural language instructions that guide LLM behavior.

## The Challenge

Prompt engineering is often done through **trial and error**:
1. Write a prompt
2. Test on a few examples
3. Manually tweak based on intuition
4. Repeat until it "feels right"

This is slow, doesn't scale, and doesn't guarantee you've found the best prompt.

## The Task

**Given**: A dataset of AIME math competition problems
**Find**: A system prompt that maximizes GPT-4.1 Mini's accuracy

**What GEPA optimizes**: The instruction prompt—what guidance to give the model.

<img src="./assets/blog/aime.png" width="80%">

## Setting Up the Problem

In [None]:
import dspy
import os

# Configure the language model
api_key = os.environ.get("OPENAI_API_KEY")
lm = dspy.LM("gpt-4.1-mini", api_key=api_key, temperature=1.0, max_tokens=32000)
dspy.configure(lm=lm)

# Load AIME dataset splits
from examples.aime_math.dataset import load_math_dataset
trainset, valset, testset = load_math_dataset()

print(f"Training: {len(trainset)} problems")
print(f"Validation: {len(valset)} problems")
print(f"Test: {len(testset)} problems")



Loaded 45 training examples
Loaded 45 validation examples
Loaded 30 test examples
Training: 45 problems
Validation: 45 problems
Test: 30 problems


## The DSPy Module

We use DSPy's `ChainOfThought` for step-by-step reasoning:

In [14]:
class MathSolverSignature(dspy.Signature):
    """Solve a math competition problem."""
    input = dspy.InputField(desc="The math problem to solve.")
    answer = dspy.OutputField(desc="The final numerical answer.")

predictor = dspy.ChainOfThought(MathSolverSignature)

def run_llm(example, prompt: str):
    """Run the LLM on a single example with the given prompt."""
    predictor.predict.signature.instructions = prompt
    return predictor(input=example.input)

## The Seed Candidate

In [15]:
seed_candidate = {
    "prompt": "Solve the math problem carefully. Break down the steps and provide the final answer as a single number."
}

## The Fitness Function

The fitness function runs the predictor and collects detailed feedback:

In [16]:
def math_metric(example, prediction):
    """Compute score and detailed feedback for math problems."""
    correct_answer, written_solution = int(example.answer), getattr(example, "solution", "")
    solution_suffix = f" Here's the full step-by-step solution:\n{written_solution}\n\nThink about what takeaways you can learn from this solution to improve your future answers and approach to similar problems" if written_solution else ""

    try:
        llm_answer = int(prediction.answer)
    except (ValueError, TypeError):
        feedback_text = f"The final answer must be a valid integer and nothing else. You responded with '{prediction.answer}', which couldn't be parsed as a python integer. Please ensure your answer is a valid integer without any additional text or formatting. The correct answer is '{correct_answer}'.{solution_suffix}{' and ensure your final answer is a valid integer.' if written_solution else ''}"
        return dspy.Prediction(score=0.0, feedback=feedback_text)

    score = float(correct_answer == llm_answer)
    status = "correct" if score == 1.0 else "incorrect"
    feedback_text = f"Your answer is {status}. The correct answer is '{correct_answer}'.{solution_suffix}"
    return dspy.Prediction(score=score, feedback=feedback_text)


def fitness_fn(candidate: dict[str, str], example) -> tuple[float, Any, SideInfo]:
    """Fitness function for GEPA optimization with single example evaluation."""
    prediction = run_llm(example, candidate["prompt"])
    metric_result = math_metric(example, prediction)
    score = metric_result.score
    feedback = metric_result.feedback

    output = {
        "prompt": candidate["prompt"],
        "answer": prediction.answer,
        "score": score,
    }

    side_info = {
        "Input": example.input,
        "Output": prediction.answer,
        "Reasoning": getattr(prediction, "reasoning", ""),
        "ExecutionFeedback": feedback,
    }

    return (score, output, side_info)

## Running GEPA Optimization

Note: We use `valset` for generalization testing—GEPA optimizes on `trainset` but tracks performance on held-out `valset`.

In [None]:
from gepa.optimize_anything import (
    optimize_anything,
    GEPAConfig,
    EngineConfig,
    ReflectionConfig,
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=trainset,   # Optimize on training set
    valset=valset,      # Track generalization on validation set
    config=GEPAConfig(
        engine=EngineConfig(
            max_metric_calls=800,
            track_best_outputs=True,
            parallel=True,      
            max_workers=32,
            cache_evaluation=True,
        ),
        reflection=ReflectionConfig(
            reflection_lm="openai/gpt-5",
            reflection_minibatch_size=3,  # Show 3 problems per reflection
        ),
    ),
)

print("\nOptimized prompt:")
print(result.best_candidate["prompt"])

Iteration 0: Base program full valset score: 0.4666666666666667 over 45 / 45 examples
Iteration 1: Selected program 0 score: 0.4666666666666667


## The Optimized Prompt

GEPA discovered a detailed, structured prompt with domain-specific strategies:

In [None]:
# This prompt was EVOLVED by GEPA, not written by a human!
# Starting from a simple "Solve carefully and provide the answer" prompt,
# GEPA discovered domain-specific strategies through reflection.

optimized_prompt = """
Solve the math problem carefully and thoroughly. Your goal is to produce a correct, well‑structured solution that leads unambiguously to the requested final result.

Follow these rules:

1. Restate the problem briefly in your own words.
   - Identify what is given and what must be found.
   - Note any special conditions (e.g., ordering of variables, geometric configuration).

2. Set up notation and equations cleanly before manipulating them.
   - Define all variables explicitly (including any you introduce, like substitutions).
   - State all constraints and domain conditions (positivity, integrality, ordering, angle ranges, etc.) before using them.
   - When you make structural assumptions (e.g., “assume all first k terms are equal”), clearly justify why this does not reduce generality for maximizing/minimizing the desired expression.

3. Plan the approach before detailed algebra.
   - Briefly outline the main idea (e.g., symmetry, extremal principle, splitting into cases, using inequalities, introducing coordinates).
   - For optimization / extremal problems, explain why your configuration is optimal (e.g., by convexity, rearrangement inequality, averaging arguments), not just by constructing one example.
   - For existence/uniqueness, mention how you will show all solutions or the unique solution.

4. Show clear, logically ordered reasoning.
   - Justify each important algebraic, geometric, or inequality step.
   - When you split into cases, state:
     * why each case must be considered, and
     * what assumptions define the case.
   - If you invoke a known theorem or fact (e.g., Ptolemy, Power of a Point, Cauchy–Schwarz, AM–GM, similarity, Vieta, extremal principle), name it and show explicitly how it applies in this context.
   - For inequality / optimization problems, be explicit about:
     * where equalities can occur,
     * why boundary or extreme configurations are considered,
     * and why they indeed give a global optimum under the constraints.

5. Handle dead ends correctly.
   - If a line of reasoning leads to a contradiction or dead end, explicitly say so.
   - Roll back to the last correct point and choose a new approach; do not keep using a flawed assumption.
   - Do not discard potential solutions without checking them against all given conditions.

6. Keep the reasoning focused but rigorous.
   - Avoid unnecessary numerical approximations when an exact expression is available.
   - Do not approximate exact values unless the problem explicitly asks for a decimal or numerical estimate.
   - Prefer structural or conceptual arguments (symmetry, convexity, parity, monotonicity, invariants, etc.) over ad hoc trial‑and‑error or guess‑and‑check.
   - You may test specific candidate values only after deriving strong constraints that sharply limit the possibilities, and you must explain why those candidates are exhaustive.

7. Treat ordering and extremal indices with care.
   - For sequences with ordering constraints (e.g., \(x_1 \le \dots \le x_n\)), reason about which indices must be negative, zero, or positive to maximize or minimize a given difference.
   - When you set many consecutive terms equal (e.g., all smallest terms equal to some \(b\), all largest terms equal to some \(a\), and middle ones zero), justify that any deviation from this would decrease (or not increase) the desired quantity, using monotonicity or averaging arguments.
   - Ensure that sign and ordering constraints are respected in all constructed examples.

8. Final consistency check.
   - At the end, verify that your solution satisfies:
     * all original equations/constraints,
     * ordering or geometric conditions,
     * and any side conditions (e.g., angle ranges, positivity, distinctness).
   - For maximization/minimization problems, explicitly argue that:
     * your value is attainable by some configuration, and
     * no configuration can exceed (or go below) it.

9. At the end, clearly isolate the answer:
   - Provide the final answer as a single number or expression on its own line.
   - Do not include any extra words, symbols, or explanation on that final line.
"""

# 🎯 What GEPA discovered:
# - Domain-specific heuristics for different math areas (geometry, combinatorics, NT)
# - Structured problem-solving workflow
# - Sanity checks and validation strategies
# - Explicit handling of common failure modes (mixing counting models, etc.)
#
# A human prompt engineer might take hours to discover these strategies!

### Key Takeaway

By including the model's **reasoning trace** in `side_info`, GEPA can understand *how* the model approaches problems—not just whether it got the answer right. This enables:

1. **Targeted improvements**: Fix specific reasoning errors, not random prompt tweaks
2. **Domain-specific strategies**: The prompt evolved to include geometry workflows, combinatorics rules, etc.
3. **Sanity checks**: GEPA discovered that asking for validation prevents common errors

The evolved prompt contains strategies that a human prompt engineer might take hours to discover through manual iteration.

---

<a id="section-6"></a>
# 6. Example 3: Agent Program Evolution — ARC-AGI

**Result: GEPA improves GPT-5's performance from 56.5% to 68.0% on ARC-AGI.**

This is the most ambitious example: optimizing not just prompts, but **entire agent architectures**—the DSPy program that defines how an LLM reasons about problems.

## The Challenge

[ARC-AGI](https://arcprize.org/) (Abstraction and Reasoning Corpus) is a benchmark designed to test general intelligence:
- Each task shows input-output grid transformation examples
- The agent must infer the transformation rule and apply it to test inputs
- Tasks require **visual reasoning, pattern recognition, and abstraction**

Hand-designing agent architectures for such tasks is extremely difficult.

## The Task

**Given**: A dataset of ARC-AGI grid transformation tasks
**Find**: A DSPy program (agent architecture) that maximizes accuracy

**What GEPA optimizes**: The entire DSPy program—signatures, modules, control flow, and prompting strategies.

<img src="./assets/blog/arc_agi.png" width="60%">

In [None]:
import random
import dspy
import os

from gepa.adapters.dspy_full_program_adapter.full_program_adapter import DspyAdapter
from examples.arc_agi.main import metric_fn
from examples.arc_agi.data import load_data

seed = 0
api_key = os.environ.get("OPENAI_API_KEY")

task_lm = dspy.LM(
        model="openai/gpt-4.1-mini",
        temperature=1.0,
        max_tokens=32000,
        api_key=api_key,
        seed=seed,
        cache=False,
    )

adapter = DspyAdapter(
        task_lm=task_lm,
        metric_fn=metric_fn,
        num_threads=64,
        reflection_lm="openai/gpt-5",
        rng=random.Random(seed),
    )
    
trainset, valset, testset = load_data()

Train set: 200
Val set: 200
Test set: 400


## The Seed Candidate

We start with a minimal Chain-of-Thought agent:

In [1]:
seed_candidate = {
    "program": """
import dspy
from typing import List
import pydantic

MATRIX = List[List[int]]

class TrainingExample(pydantic.BaseModel):
    input: MATRIX
    output: MATRIX

class SolveTaskSignature(dspy.Signature):
    training_examples: List[TrainingExample] = dspy.InputField(description="Input and output examples demonstrating the task to be performed.")
    test_inputs: List[MATRIX] = dspy.InputField(description="Input matrices to be solved following the task described in the training examples.")
    test_outputs: List[MATRIX] = dspy.OutputField(description="Output matrices corresponding to the test inputs.")

program = dspy.ChainOfThought(SolveTaskSignature)
"""
}

## The Fitness Function

The fitness function compiles and executes the DSPy program, capturing detailed error information:

In [None]:


def fitness_fn(candidate, example):
    program = candidate["program"]

    try:
        evaluation_results = adapter.evaluate(
            [example], candidate, capture_traces=True
        )
    except Exception as e:
        side_info = {"input": example, "error": str(e), "program": program}
        return (0.0, side_info, side_info)

    # Program error
    if (
        not isinstance(evaluation_results.trajectories, list)
        or len(evaluation_results.trajectories) == 0
    ):
        print("Error: ")
        print(evaluation_results.trajectories)
        side_info = {
            "input": example,
            "error": f"All examples failed. Program error: {str(evaluation_results.trajectories)}",
            "program": program,
        }
        return (0.0, side_info, side_info)

    # Process evaluations with no program errors
    trajectory = evaluation_results.trajectories[0]
    metric_result = trajectory.get("score")
    score = metric_result.get("score")
    feedback = metric_result.get("feedback")
    prediction = trajectory.get("prediction")

    side_info = {
        "input": example,
        "reasoning": prediction.get("reasoning"),
        "feedback": feedback,
        "output": prediction.get("test_outputs"),
    }

    return (score, side_info, side_info)

## Running GEPA Optimization

In [None]:
from examples.arc_agi.prompt import BACKGROUND
from gepa.optimize_anything import (
    EngineConfig,
    GEPAConfig,
    ReflectionConfig,
    optimize_anything,
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=trainset,
    valset=valset,
    objective="Optimize the dspy agent program to solve ARC-AGI tasks effectively.",
    background=BACKGROUND,
    config=GEPAConfig(
        engine=EngineConfig(
            max_metric_calls=4000,
            track_best_outputs=True,
            use_cloudpickle=True,
            parallel=True,
            max_workers=64,
            cache_evaluation=True,
        ),
        reflection=ReflectionConfig(
            reflection_lm="openai/gpt-5",
            reflection_minibatch_size=3,
        ),
    ),
)

## What GEPA Discovered

GEPA evolved the simple ChainOfThought into a sophisticated 5-step code synthesis pipeline:

In [None]:
# Evolved by GEPA - A code synthesis agent with self-refinement
# This program was DISCOVERED through optimization, not hand-written!

evolved_program = '''
import dspy
from typing import List, Tuple, Optional, Any, Dict
import pydantic
import re
import copy
import textwrap
from collections import Counter

MATRIX = List[List[int]]

class TrainingExample(pydantic.BaseModel):
    input: MATRIX
    output: MATRIX

def _strip_code_fences(s: str) -> str:
    if s is None:
        return ""
    s = s.strip()
    # Remove fenced blocks and language markers
    if s.startswith("```"):
        s = "\n".join(s.split("\n")[1:])
    if s.endswith("```"):
        s = "\n".join(s.split("\n")[:-1])
    s = re.sub(r"^```[a-zA-Z0-9_+-]*\s*", "", s.strip())
    s = re.sub(r"\s*```$", "", s.strip())
    return s.strip()

def _ensure_transform_code(code: str) -> str:
    """
    Ensure code contains a def transform(grid): function.
    If code appears to be just the body, wrap it. Otherwise, return full code as-is.
    """
    code = _strip_code_fences(code)
    if "def transform" in code:
        # Keep helpers that may be defined before/after transform.
        return code
    # If it's likely just a body (has return or grid indexing), wrap it
    lines = code.strip().splitlines()
    body = code.strip()
    if len(body) == 0:
        return ""
    if "return" in body or "grid" in body:
        wrapped = "def transform(grid):\n" + textwrap.indent(body, "    ")
        return wrapped
    return ""

def _height(grid: MATRIX) -> int:
    return len(grid)

def _width(grid: MATRIX) -> int:
    return len(grid[0]) if grid and isinstance(grid[0], list) else 0

def _in_bounds(grid: MATRIX, r: int, c: int) -> bool:
    h, w = _height(grid), _width(grid)
    return 0 <= r < h and 0 <= c < w

def _colors(grid: MATRIX) -> List[int]:
    s = set()
    for row in grid:
        for v in row:
            try:
                s.add(int(v))
            except Exception:
                pass
    return sorted(list(s))

def _color_counts(grid: MATRIX) -> Dict[int, int]:
    cnt = Counter()
    for row in grid:
        for v in row:
            try:
                cnt[int(v)] += 1
            except Exception:
                pass
    return dict(cnt)

def _positions_of(grid: MATRIX, color: int) -> List[Tuple[int, int]]:
    pos = []
    for r, row in enumerate(grid):
        for c, v in enumerate(row):
            if v == color:
                pos.append((r, c))
    return pos

def _bbox_of(grid: MATRIX, colors: Optional[List[int]] = None, nonzero: bool = True) -> Optional[Tuple[int, int, int, int]]:
    h, w = _height(grid), _width(grid)
    rs = []
    if colors is not None:
        color_set = set(colors)
        for r in range(h):
            for c in range(w):
                if grid[r][c] in color_set:
                    rs.append((r, c))
    elif nonzero:
        for r in range(h):
            for c in range(w):
                if grid[r][c] != 0:
                    rs.append((r, c))
    else:
        # if not using nonzero and no colors, bbox of entire grid
        for r in range(h):
            for c in range(w):
                rs.append((r, c))
    if not rs:
        return None
    rmin = min(r for r, _ in rs)
    rmax = max(r for r, _ in rs)
    cmin = min(c for _, c in rs)
    cmax = max(c for _, c in rs)
    return (rmin, cmin, rmax, cmax)

def _crop(grid: MATRIX, bbox: Tuple[int, int, int, int]) -> MATRIX:
    rmin, cmin, rmax, cmax = bbox
    out = []
    for r in range(rmin, rmax + 1):
        row = []
        for c in range(cmin, cmax + 1):
            row.append(int(grid[r][c]))
        out.append(row)
    return out

def _transpose(grid: MATRIX) -> MATRIX:
    h, w = _height(grid), _width(grid)
    if h == 0 or w == 0:
        return []
    return [[int(grid[r][c]) for r in range(h)] for c in range(w)]

def _new_grid(h: int, w: int, fill: int = 0) -> MATRIX:
    return [[int(fill) for _ in range(w)] for _ in range(h)]

def _normalize_matrix(mat: MATRIX) -> MATRIX:
    out: MATRIX = []
    for row in mat:
        new_row = []
        for v in row:
            try:
                iv = int(v)
            except Exception:
                iv = 0
            if iv < 0:
                iv = 0
            if iv > 9:
                iv = 9
            new_row.append(iv)
        out.append(new_row)
    return out

def _same_shape(a: MATRIX, b: MATRIX) -> bool:
    if len(a) != len(b):
        return False
    return all(len(ra) == len(rb) for ra, rb in zip(a, b))

def _compare_matrices(a: MATRIX, b: MATRIX) -> Tuple[bool, List[Tuple[int, int, int, int]]]:
    diffs: List[Tuple[int, int, int, int]] = []
    if not _same_shape(a, b):
        return False, diffs
    for i, (ra, rb) in enumerate(zip(a, b)):
        for j, (va, vb) in enumerate(zip(ra, rb)):
            if va != vb:
                diffs.append((i, j, va, vb))
    return (len(diffs) == 0), diffs

def _verify_on_training(transform_fn, training_examples: List[TrainingExample]) -> Tuple[bool, str]:
    reports = []
    all_ok = True
    for idx, ex in enumerate(training_examples):
        try:
            out = transform_fn(copy.deepcopy(ex.input))
        except Exception as e:
            all_ok = False
            reports.append(f"Example {idx}: Exception during transform: {repr(e)}")
            continue
        if not isinstance(out, list) or (len(out) > 0 and not isinstance(out[0], list)):
            all_ok = False
            reports.append(f"Example {idx}: Output is not a 2D list.")
            continue
        out = _normalize_matrix(out)
        # For ARC, output shape must match the example's output shape, not necessarily the input shape.
        if not _same_shape(out, ex.output):
            all_ok = False
            reports.append(f"Example {idx}: Shape mismatch. Got {len(out)}x{len(out[0]) if out else 0}, expected {len(ex.output)}x{len(ex.output[0]) if ex.output else 0}.")
            continue
        ok, diffs = _compare_matrices(out, ex.output)
        if not ok:
            all_ok = False
            sample_diffs = diffs[:20]
            reports.append(f"Example {idx}: {len(diffs)} cells differ. Sample diffs (r,c got->exp): " +
                           ", ".join([f"({r},{c} {gv}->{ev})" for r, c, gv, ev in sample_diffs]))
    return all_ok, "\n".join(reports)

def _apply_code_to_tests(code: str, test_inputs: List[MATRIX]) -> List[MATRIX]:
    fn = _safe_exec_transform(code)
    if not fn:
        return []
    outputs = []
    for idx, ti in enumerate(test_inputs):
        try:
            out = fn(copy.deepcopy(ti))
        except Exception:
            return []
        if not isinstance(out, list) or (len(out) > 0 and not isinstance(out[0], list)):
            return []
        out = _normalize_matrix(out)
        outputs.append(out)
    return outputs

def _safe_exec_transform(code: str) -> Optional[Any]:
    """
    Exec the transform function in a restricted namespace with helper utilities.
    Returns callable transform or None if failure.
    """
    code = _ensure_transform_code(code)
    if not code:
        return None

    # Safe helper library exposed to the model's code.
    sandbox_globals: Dict[str, Any] = {
        "__builtins__": {
            "range": range,
            "len": len,
            "enumerate": enumerate,
            "min": min,
            "max": max,
            "sum": sum,
            "abs": abs,
            "all": all,
            "any": any,
            "sorted": sorted,
            "zip": zip,
            "list": list,
            "set": set,
            "tuple": tuple,
            "int": int
        },
        "deepcopy": copy.deepcopy,
        # helpers
        "height": _height,
        "width": _width,
        "in_bounds": _in_bounds,
        "colors": _colors,
        "color_counts": _color_counts,
        "positions_of": _positions_of,
        "bbox_of": _bbox_of,
        "crop": _crop,
        "transpose": _transpose,
        "new_grid": _new_grid,
    }
    sandbox_locals: Dict[str, Any] = {}
    try:
        exec(code, sandbox_globals, sandbox_locals)
        fn = sandbox_locals.get("transform") or sandbox_globals.get("transform")
        if callable(fn):
            return fn
    except Exception:
        return None
    return None

def _matrix_shape(mat: MATRIX) -> Tuple[int, int]:
    if not isinstance(mat, list) or (mat and not isinstance(mat[0], list)):
        return (0, 0)
    return (len(mat), len(mat[0]) if mat else 0)

def _stats_for_grid(grid: MATRIX) -> Dict[str, Any]:
    h, w = _matrix_shape(grid)
    pal = _colors(grid)
    cnt = _color_counts(grid)
    nonzero_bbox = _bbox_of(grid, colors=None, nonzero=True)
    counts_str = ",".join(f"{k}:{cnt[k]}" for k in sorted(cnt.keys()))
    return {
        "shape": [h, w],
        "palette": pal,
        "counts": counts_str,
        "nonzero_bbox": nonzero_bbox
    }

def _build_training_summary(training_examples: List[TrainingExample]) -> str:
    # Construct a concise, structured summary string for the LM.
    lines = []
    for i, ex in enumerate(training_examples):
        in_stats = _stats_for_grid(ex.input)
        out_stats = _stats_for_grid(ex.output)
        lines.append(
            f"Example {i}: input_shape={in_stats['shape']}, output_shape={out_stats['shape']}, "
            f"input_palette={in_stats['palette']}, output_palette={out_stats['palette']}, "
            f"input_counts={in_stats['counts']}, output_counts={out_stats['counts']}, "
            f"input_nonzero_bbox={in_stats['nonzero_bbox']}, output_nonzero_bbox={out_stats['nonzero_bbox']}"
        )
    # Heuristics across examples:
    shape_deltas = []
    for ex in training_examples:
        ih, iw = _matrix_shape(ex.input)
        oh, ow = _matrix_shape(ex.output)
        shape_deltas.append((oh - ih, ow - iw))
    unique_deltas = sorted(set(shape_deltas))
    lines.append(f"Unique shape deltas (oh-ih, ow-iw) across training: {unique_deltas}")
    return "\n".join(lines)

class InduceRuleSignature(dspy.Signature):
    """
    You are given several training input/output pairs from an ARC task. Infer a single deterministic transformation rule
    that maps any input grid to its output grid. Produce:
    1) A succinct, generalized rule_summary describing the pattern.
    2) A Python function `def transform(grid): ...` implementing the rule.

    Execution constraints and helpers:
      - grid is a 2D list of ints (0..9). Deterministic logic only (loops, conditionals, list ops).
      - Do not import modules or use randomness.
      - You MAY change the output shape when the task demands it. Match the training outputs' shapes for their inputs.
      - Indices must be in-bounds; handle arbitrary rectangular shapes.
      - Mutations: copy if needed; avoid mutating the input in-place unless safe.
      - Values must remain ints 0..9.

    Common pitfalls to avoid:
      - Off-by-one when cropping or expanding; mixing up row/col order.
      - Assuming output has same shape as input when training shows otherwise.
      - Forgetting to preserve anchor colors or key markers when summarizing patterns.
      - Using unavailable libraries (numpy/pandas).

    Successful strategies:
      - Use bounding boxes of non-zero or target colors to crop or extract regions (bbox_of, crop).
      - Build frequency maps to pick dominant or anchor colors (color_counts, colors).
      - Compress by selecting rows/cols that contain target colors; preserve ordering.
      - When outputs are smaller, they often summarize a motif or mask of specific colors (e.g., 1 and 2).

    Helper functions available in the environment:
      - height(grid), width(grid), in_bounds(grid,r,c)
      - colors(grid) -> sorted list of unique colors
      - color_counts(grid) -> dict color->count
      - positions_of(grid, color) -> list of (r,c)
      - bbox_of(grid, colors=None, nonzero=True) -> (rmin,cmin,rmax,cmax) inclusive or None
      - crop(grid, bbox) -> returns subgrid bounded by bbox
      - transpose(grid), new_grid(h,w,fill)

    Return fields:
      - rule_summary: Concise paragraph of the inferred rule.
      - python_code: Only the code defining def transform(grid): ...; no extra commentary or fences.

    Important: Return only rule_summary and python_code; do not include explanations beyond those fields.
    """
    training_examples: List[TrainingExample] = dspy.InputField(desc="Examples showing input->output mappings for the task.")
    training_summary: str = dspy.InputField(desc="Structured stats about shapes, palettes, counts, and bboxes across examples.")
    rule_summary: str = dspy.OutputField(desc="One concise paragraph describing the inferred rule.")
    python_code: str = dspy.OutputField(desc="Only the code defining def transform(grid): ...; no explanations.")

class RefineRuleSignature(dspy.Signature):
    """
    The previous Python transform did not perfectly match all training examples.
    Given the training_examples, training_summary (stats), the prior_code for transform(grid), and a failure_report
    describing mismatches, produce a corrected version of transform(grid).

    Instructions:
    - Keep the same overall approach but fix the logic to satisfy all training pairs.
    - You MAY alter the output shape; match each training output's shape for its input.
    - Maintain determinism; no external imports; values are ints 0..9.
    - Be careful with off-by-one, bounds, and row/col order.
    - Return only a complete, standalone function definition: def transform(grid): ...
    """
    training_examples: List[TrainingExample] = dspy.InputField(desc="Authoritative input/output examples.")
    training_summary: str = dspy.InputField(desc="Structured stats about shapes, palettes, counts, and bboxes across examples.")
    prior_code: str = dspy.InputField(desc="Previous attempt's function code.")
    failure_report: str = dspy.InputField(desc="Concrete mismatches and diagnostics from verification.")
    python_code: str = dspy.OutputField(desc="Revised code: only def transform(grid): ...")

class DirectSolveSignature(dspy.Signature):
    """
    If code induction fails, directly produce outputs for test_inputs using a consistent rule induced from training.
    Requirements:
    - Return only a Python literal list of output grids aligned with test_inputs (e.g., [[[...],[...]], [[...], ...]]).
    - Do not include any commentary, bullets, or extra text.
    - Output ints in 0..9 only. Shapes may differ from inputs; match the pattern demonstrated in training pairs.
    - Apply the same inferred rule consistently across all test inputs.
    """
    training_examples: List[TrainingExample] = dspy.InputField(desc="Training pairs for the task.")
    test_inputs: List[MATRIX] = dspy.InputField(desc="Unseen inputs to solve.")
    test_outputs: List[MATRIX] = dspy.OutputField(desc="Predicted outputs for each test input.")

class ARCRuleProgram(dspy.Module):
    def __init__(self, max_refinements: int = 4):
        super().__init__()
        self.induce = dspy.ChainOfThought(InduceRuleSignature)
        self.refine = dspy.ChainOfThought(RefineRuleSignature)
        self.direct = dspy.ChainOfThought(DirectSolveSignature)
        self.max_refinements = max_refinements

    def forward(self, training_examples: List[TrainingExample], test_inputs: List[MATRIX]) -> dspy.Prediction:
        training_summary = _build_training_summary(training_examples)

        # Step 1: Induce rule and code
        draft = self.induce(training_examples=training_examples, training_summary=training_summary)
        code = draft.python_code or ""
        code = _strip_code_fences(code)

        # Step 2: Verify and refine iteratively
        for _ in range(self.max_refinements + 1):
            fn = _safe_exec_transform(code)
            if fn is None:
                failure_report = "Could not exec transform function. Ensure a valid def transform(grid): ... exists and no imports."
            else:
                ok, report = _verify_on_training(fn, training_examples)
                if ok:
                    # Step 3: Apply to tests
                    outputs = _apply_code_to_tests(code, test_inputs)
                    if outputs:
                        return dspy.Prediction(test_outputs=outputs)
                    failure_report = "Execution on test inputs failed or returned invalid outputs."
                else:
                    failure_report = report
            # Attempt refinement
            refined = self.refine(
                training_examples=training_examples,
                training_summary=training_summary,
                prior_code=code,
                failure_report=failure_report
            )
            new_code = _strip_code_fences(refined.python_code or "")
            if not new_code or new_code.strip() == code.strip():
                code = new_code or code
                break
            code = new_code

        # Fallback: direct solve via LM
        direct = self.direct(training_examples=training_examples, test_inputs=test_inputs)
        safe_outs: List[MATRIX] = []
        if isinstance(direct.test_outputs, list):
            for mat in direct.test_outputs:
                try:
                    safe_outs.append(_normalize_matrix(mat))
                except Exception:
                    # Skip invalid entries
                    pass
        return dspy.Prediction(test_outputs=safe_outs)

class SolveTaskSignature(dspy.Signature):
    """
    Solve ARC tasks using rule induction with verifiable code and robust fallback.
    Inputs:
      - training_examples: list of TrainingExample(input, output)
      - test_inputs: list of matrices to solve
    Output:
      - test_outputs: list of predicted output matrices
    """
    training_examples: List[TrainingExample] = dspy.InputField(desc="Input/output examples demonstrating the task to be performed.")
    test_inputs: List[MATRIX] = dspy.InputField(desc="Input matrices to be solved following the task described in the training examples.")
    test_outputs: List[MATRIX] = dspy.OutputField(desc="Output matrices corresponding to the test inputs.")

class SolveARC(dspy.Module):
    def __init__(self):
        super().__init__()
        self.core = ARCRuleProgram()

    def forward(self, training_examples: List[TrainingExample], test_inputs: List[MATRIX]) -> dspy.Prediction:
        return self.core(training_examples=training_examples, test_inputs=test_inputs)

program = SolveARC()
'''

### Key Takeaway: Emergent Self-Refinement

GEPA discovered **self-refinement**—having the LLM validate and fix its own code before producing outputs. This is remarkable because:

1. **Not programmed**: Self-refinement emerged from optimization, not from human design
2. **Sophisticated strategy**: The agent now verifies on training before applying to test
3. **Multi-attempt recovery**: Up to 5 refinement attempts with targeted feedback
4. **Code synthesis**: Instead of direct prediction, the agent writes executable code

This demonstrates GEPA's ability to discover **complex reasoning pipelines** that humans might not think to design.

---

<a id="section-7"></a>
# 7. Example 4: Algorithmic Discovery — Circle Packing

**Result: GEPA matches or exceeds AlphaEvolve, ShinkaEvolve, and OpenEvolve on circle packing.**

This example demonstrates **algorithmic discovery**—evolving code to solve a well-known NP-hard optimization problem.

## The Challenge

Circle packing is a classic problem with real-world applications (chip layout, material cutting, logistics):
- Pack N non-overlapping circles inside a unit square [0,1] × [0,1]
- Maximize the sum of all radii
- This is **NP-hard**—no known polynomial-time algorithm exists

Recent work from DeepMind (AlphaEvolve), and open-source efforts (ShinkaEvolve, OpenEvolve) have used LLMs to evolve packing algorithms.

## The Task

**Given**: The number of circles N (e.g., N=26)
**Find**: Python code that computes optimal circle placements

**What GEPA optimizes**: The packing algorithm code—placement strategies, local optimization, constraint handling.

<img src="./assets/blog/circle_packing_26.png">

<img src="./assets/blog/circle_packing_visualization_26.png>

## Setting Up the Problem

Circle packing is a single-instance optimization problem—no dataset needed.

In [14]:
num_circles = 26
objective = f"Optimize circle packing code and refiner prompt to maximize sum of circle radii within a unit square for N={num_circles} circles."

## The Seed Candidate

The same one from ShinkaEvolve. 
A simple concentric ring layout (1 center circle + 8 inner ring + remaining outer ring)


In [12]:
from examples.circle_packing.llms import SEED_REFINEMENT_PROMPT


seed_candidate = {
    "code": '''
import numpy as np

def main(timeout, current_best_solution):
    """
    Circle packing optimization.

    Args:
        timeout: Time budget in seconds
        current_best_solution: Previous best circles array (n, 3) or None

    Returns:
        dict with 'circles' (n, 3) array and 'all_scores' list
    """
    n = 26

    # Use current_best_solution if provided, otherwise start fresh
    if current_best_solution is not None:
        circles = current_best_solution.copy()
    else:
        # Simple initial placement
        centers = np.zeros((n, 2))

        # Center circle
        centers[0] = [0.5, 0.5]

        # Ring of 8 around center
        for i in range(min(8, n - 1)):
            angle = 2 * np.pi * i / 8
            centers[i + 1] = [0.5 + 0.3 * np.cos(angle), 0.5 + 0.3 * np.sin(angle)]

        # Outer ring for remaining
        if n > 9:
            remaining = n - 9
            for i in range(remaining):
                angle = 2 * np.pi * i / remaining
                centers[i + 9] = [0.5 + 0.7 * np.cos(angle), 0.5 + 0.7 * np.sin(angle)]

        centers = np.clip(centers, 0.01, 0.99)
        radii = compute_max_radii(centers)
        circles = np.hstack([centers, radii.reshape(-1, 1)])

    score = float(np.sum(circles[:, 2]))
    return {'circles': circles, 'all_scores': [score]}


def compute_max_radii(centers):
    """Compute maximum radii that don't overlap and stay in unit square."""
    n = centers.shape[0]
    radii = np.ones(n)

    # Limit by distance to borders
    for i in range(n):
        x, y = centers[i]
        radii[i] = min(x, y, 1 - x, 1 - y)

    # Limit by distance to other circles
    for i in range(n):
        for j in range(i + 1, n):
            dist = np.sqrt(np.sum((centers[i] - centers[j]) ** 2))
            if radii[i] + radii[j] > dist:
                scale = dist / (radii[i] + radii[j])
                radii[i] *= scale
                radii[j] *= scale

    return radii
''', 
    "refiner_prompt": SEED_REFINEMENT_PROMPT,
}

### Refiner

In [6]:
import dspy
import os

class RefinerSignature(dspy.Signature):
    """Refine the code based on its evaluation results by fixing the errors and improving the performance."""

    refiner_prompt = dspy.InputField(desc="Instructions for how to refine the code")
    code_to_improve = dspy.InputField(desc="Code to improve")
    code_results = dspy.InputField(
        desc="Evaluation results of the code to improve by fixing the errors and improving the performance"
    )
    refined_code = dspy.OutputField(
        desc="Next iteration of improved code based on the evaluation results"
    )

refiner_predictor = dspy.Predict(RefinerSignature)

refiner_lm = dspy.LM(
    "openai/gpt-5.1",
    temperature=1.0,
    max_tokens=32000,
    api_key=os.environ.get("OPENAI_API_KEY"),
    cache=True,
)

## The Fitness Function

The fitness function validates constraints and returns detailed violation information:

In [16]:
from examples.circle_packing.utils import execute_code
from examples.circle_packing.main import refine_code, StateTracker

state_tracker = StateTracker()
timeout = 600

def compute_multiple_metrics(
    global_best_score: float, all_scores: list[float]
) -> dict[str, float]:
    candidate_best_score = max(all_scores)
    alpha_fixed = 0.1
    ema_fixed = all_scores[0]
    for s in all_scores[1:]:
        ema_fixed = alpha_fixed * s + (1 - alpha_fixed) * ema_fixed
    alpha_adaptive = 2.0 / (len(all_scores) + 1)
    ema_adaptive = all_scores[0]
    for s in all_scores[1:]:
        ema_adaptive = alpha_adaptive * s + (1 - alpha_adaptive) * ema_adaptive

    return {
        "max_score": max(all_scores),
        "mean_score": sum(all_scores) / len(all_scores),
        "ema_score_fixed": ema_fixed,
        "ema_score_adaptive": ema_adaptive,
        "score_improvement_from_previous_best": candidate_best_score
        - global_best_score,
    }

def fitness_fn(candidate: dict[str, str], *args, **kwargs):
    """
    Evaluate code candidate on batch of problems with optional refinement.
    """
    code_candidate = candidate["code"]

    # Code candidate evaluation
    global_best_score, global_best_solution = state_tracker.get_best_solution()
    cache_key = code_candidate
    code_candidate_cache = state_tracker.get(cache_key)

    if code_candidate_cache is not None:
        code_score, code_side_info = code_candidate_cache
    else:
        code_result = execute_code(code_candidate, timeout, global_best_solution)
        circles = None

        if code_result["success"]:
            circles = code_result["result"]["circles"]
            all_scores = code_result["result"]["all_scores"]
            code_score = code_result["result"]["validation_details"]["sum_radii"]
            code_side_info = {
                "scores": compute_multiple_metrics(global_best_score, all_scores),
                "Code": code_candidate,
                "Circles": circles,
                "Global best circles at the time of evaluation": global_best_solution,
                "Stdout": code_result["stdout"],
            }
        else:
            code_score = 0.0
            code_side_info = {
                "scores": {"sum_radii": 0.0},
                "Code": code_candidate,
                "Error": code_result["error"],
                "Traceback": code_result.get("traceback", ""),
                "Stdout": code_result["stdout"],
                "Validation Details": code_result.get("validation_details"),
            }

        # Cache after computing values
        state_tracker.set(
            cache_key,
            (code_score, code_side_info),
            score=code_score,
            solution=circles,
            artifact={
                "code": code_candidate,
                "arg_current_best_solution": global_best_solution,
                "validation details": code_result.get("validation_details"),
            },
        )

    print("Code candidate side info:")
    print(code_side_info)

    # Refiner prompt evaluation
    # Now that we've got the code's results, we can set a cache key as (prompt, code, best_solution)
    # the refiner will receive the code, the
    print("Refining code...")

    refiner_prompt_candidate = candidate["refiner_prompt"]
    global_best_score, global_best_solution = state_tracker.get_best_solution()

    # Refine code for this problem
    (
        refiner_score,
        refiner_code,
        refiner_side_info,
    ) = refine_code(
        code=code_candidate,
        code_score=code_score,
        code_side_info=code_side_info,
        refiner_prompt=refiner_prompt_candidate,
        refiner_predictor=refiner_predictor,
        refiner_lm=refiner_lm,
        timeout=timeout,
        state_tracker=state_tracker,
    )

    if refiner_score > code_score:
        best_score = refiner_score
        best_code = refiner_code
        best_circles = refiner_side_info.get("Circles", None)
    else:
        best_score = code_score
        best_code = code_candidate
        best_circles = code_side_info.get("Circles", None)

    if best_circles is not None:
        best_circles = best_circles.tolist()

    output = {
        "best_score": best_score,
        "best_code": best_code,
        "best_circles": best_circles,
        "code_candidate": code_candidate,
        "code_score": code_score,
        "refiner_prompt": refiner_prompt_candidate,
        "refiner_code": refiner_code,
        "refiner_score": refiner_score,
    }

    side_info = {
        "scores": {
            "best_score_from_code_and_refiner": max(code_score, refiner_score),
            "initial_code": code_score,
            "refiner_prompt": refiner_score,
        },
        "Input": {
            "Timeout (s)": timeout,
        },
        "code_specific_info": code_side_info,
        "refiner_prompt_specific_info": refiner_side_info,
    }

    return (best_score, output, side_info)


## Running GEPA Optimization

In [None]:
from gepa.optimize_anything import (
    EngineConfig,
    GEPAConfig,
    ReflectionConfig,
    optimize_anything,
)
from examples.circle_packing.llms import CIRCLE_PACKING_BACKGROUND, SEED_REFINEMENT_PROMPT


result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    objective=objective,
    background=CIRCLE_PACKING_BACKGROUND,
    config=GEPAConfig(
        engine=EngineConfig(
            max_metric_calls=200,
            track_best_outputs=True,
            frontier_type="objective",
            cache_evaluation=True,
        ),
        reflection=ReflectionConfig(
            reflection_lm="openai/gpt-5",
            reflection_minibatch_size=1,
        ),
    ),
)

## Best Program

In [None]:
import numpy as np
import time

def main(timeout, current_best_solution):
    """
    BREAKTHROUGH approach: Large-Neighborhood Sequential Linear Programming (LN-SLP)
    with exact LP for radii. We optimize centers by repeatedly solving LPs over
    subsets of centers with linearized pairwise constraints and boundary constraints.
    Multi-start + global SLP + LNS refinement + dual-informed subset selection.
    """
    n = 26
    start_time = time.time()
    time_budget = max(1.0, float(timeout) - 0.5)
    rng = np.random.default_rng(2026)
    eps = 1e-6
    r_min = 1e-8
    all_scores = []

    def time_left():
        return time_budget - (time.time() - start_time)

    def clamp01(xy):
        return np.clip(xy, eps, 1.0 - eps)

    def boundary_limits(centers):
        x = centers[:, 0]
        y = centers[:, 1]
        return np.minimum(np.minimum(x, 1 - x), np.minimum(y, 1 - y))

    def pairwise_distances(centers):
        diff = centers[:, None, :] - centers[None, :, :]
        D = np.sqrt(np.maximum(np.sum(diff * diff, axis=2), 0.0))
        return D

    def solve_radii_lp_fallback(centers, r_min_local):
        # Projection-like repair (feasible radii)
        b = boundary_limits(centers)
        r = np.minimum(b, 0.5).copy()
        r = np.maximum(r, r_min_local)
        D = pairwise_distances(centers)
        max_iter = 1500
        tol = 1e-10
        for _ in range(max_iter):
            r = np.minimum(r, b)
            viol = 0.0
            for i in range(n):
                ri = r[i]
                for j in range(i + 1, n):
                    dij = D[i, j]
                    if dij <= 0:
                        r[j] = r_min_local
                        continue
                    s = ri + r[j]
                    if s > dij:
                        excess = s - dij
                        viol = max(viol, excess)
                        total = ri + r[j]
                        if total <= 1e-16:
                            dec_i = dec_j = 0.5 * excess
                        else:
                            dec_i = excess * (ri / total)
                            dec_j = excess * (r[j] / total)
                        ri = max(r_min_local, ri - dec_i)
                        r[j] = max(r_min_local, r[j] - dec_j)
                r[i] = ri
            if viol < tol:
                break
        r = np.minimum(r, b)
        r = np.maximum(r, r_min_local)
        return r, True, None

    def solve_radii_lp(centers, r_min_local):
        try:
            from scipy.optimize import linprog
            x = centers[:, 0]; y = centers[:, 1]
            D = pairwise_distances(centers)
            A_rows = []
            b_vals = []
            # Boundary constraints
            for i in range(n):
                row = np.zeros(n); row[i] = 1.0; A_rows.append(row); b_vals.append(x[i])
                row = np.zeros(n); row[i] = 1.0; A_rows.append(row); b_vals.append(1.0 - x[i])
                row = np.zeros(n); row[i] = 1.0; A_rows.append(row); b_vals.append(y[i])
                row = np.zeros(n); row[i] = 1.0; A_rows.append(row); b_vals.append(1.0 - y[i])
            # Pairwise constraints
            for i in range(n):
                for j in range(i + 1, n):
                    row = np.zeros(n); row[i] = 1.0; row[j] = 1.0
                    A_rows.append(row); b_vals.append(D[i, j])
            A_ub = np.array(A_rows, dtype=float) if A_rows else None
            b_ub = np.array(b_vals, dtype=float) if b_vals else None
            bounds = [(r_min_local, None)] * n
            c = -np.ones(n, dtype=float)
            res = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
            if getattr(res, "success", False) and res.x is not None:
                r = np.array(res.x, dtype=float)
                b = boundary_limits(centers)
                r = np.minimum(r, b)
                r = np.maximum(r, r_min_local)
                # Try to extract duals for guidance
                weights = None
                try:
                    ine = getattr(res, "ineqlin", None)
                    if ine is not None and hasattr(ine, "marginals"):
                        y_dual = np.array(ine.marginals, dtype=float)
                        weights = -y_dual
                        weights = np.maximum(weights, 0.0)
                except Exception:
                    weights = None
                return r, True, weights
            else:
                return solve_radii_lp_fallback(centers, r_min_local)
        except Exception:
            return solve_radii_lp_fallback(centers, r_min_local)

    def build_slp_subset_lp(centers, subset_idx, trust):
        try:
            from scipy.optimize import linprog  # noqa: F401
        except Exception:
            return None
        m = len(subset_idx)
        var_dim = 2 * m + n
        idx_map = {subset_idx[k]: k for k in range(m)}
        A_rows = []
        b_vals = []

        x = centers[:, 0]; y = centers[:, 1]
        # Boundary constraints
        for i in range(n):
            # r_i - dx_i <= x_i
            row = np.zeros(var_dim)
            row[2 * m + i] = 1.0
            if i in idx_map:
                k = idx_map[i]
                row[2 * k + 0] = -1.0
            A_rows.append(row); b_vals.append(x[i])
            # r_i + dx_i <= 1 - x_i
            row = np.zeros(var_dim)
            row[2 * m + i] = 1.0
            if i in idx_map:
                k = idx_map[i]
                row[2 * k + 0] = 1.0
            A_rows.append(row); b_vals.append(1.0 - x[i])
            # r_i - dy_i <= y_i
            row = np.zeros(var_dim)
            row[2 * m + i] = 1.0
            if i in idx_map:
                k = idx_map[i]
                row[2 * k + 1] = -1.0
            A_rows.append(row); b_vals.append(y[i])
            # r_i + dy_i <= 1 - y_i
            row = np.zeros(var_dim)
            row[2 * m + i] = 1.0
            if i in idx_map:
                k = idx_map[i]
                row[2 * k + 1] = 1.0
            A_rows.append(row); b_vals.append(1.0 - y[i])

        # Pairwise linearization at current centers
        diff = centers[:, None, :] - centers[None, :, :]
        Dij = np.sqrt(np.maximum(np.sum(diff * diff, axis=2), 0.0))
        for i in range(n):
            for j in range(i + 1, n):
                dij = Dij[i, j]
                if dij <= 1e-12:
                    ang = rng.uniform(0, 2 * np.pi)
                    ux, uy = np.cos(ang), np.sin(ang)
                else:
                    u = diff[i, j] / dij
                    ux, uy = float(u[0]), float(u[1])
                row = np.zeros(var_dim)
                if i in idx_map:
                    ki = idx_map[i]
                    row[2 * ki + 0] += -ux
                    row[2 * ki + 1] += -uy
                if j in idx_map:
                    kj = idx_map[j]
                    row[2 * kj + 0] += ux
                    row[2 * kj + 1] += uy
                row[2 * m + i] += 1.0
                row[2 * m + j] += 1.0
                A_rows.append(row); b_vals.append(dij)

        A_ub = np.array(A_rows, dtype=float) if A_rows else None
        b_ub = np.array(b_vals, dtype=float) if b_vals else None

        bounds = []
        # Displacement bounds (trust region and box)
        for k, i in enumerate(subset_idx):
            xi, yi = centers[i]
            dx_lo = max(-trust, eps - xi)
            dx_hi = min(trust, 1.0 - eps - xi)
            dy_lo = max(-trust, eps - yi)
            dy_hi = min(trust, 1.0 - eps - yi)
            bounds.append((dx_lo, dx_hi))
            bounds.append((dy_lo, dy_hi))
        # Radii bounds
        for _ in range(n):
            bounds.append((r_min, None))

        c = np.zeros(var_dim, dtype=float)
        c[2 * m:] = -1.0
        return c, A_ub, b_ub, bounds

    def slp_subset_step(centers, subset_idx, trust):
        try:
            from scipy.optimize import linprog
        except Exception:
            return centers, None, False
        built = build_slp_subset_lp(centers, subset_idx, trust)
        if built is None:
            return centers, None, False
        c, A_ub, b_ub, bounds = built
        try:
            res = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
            if not (getattr(res, "success", False) and res.x is not None):
                return centers, None, False
            m = len(subset_idx)
            sol = np.array(res.x, dtype=float)
            dxy = sol[:2 * m].reshape(m, 2)
            r_lin = sol[2 * m:]
            centers_new = centers.copy()
            for idx_local, i in enumerate(subset_idx):
                centers_new[i] = centers_new[i] + dxy[idx_local]
            centers_new = clamp01(centers_new)
            return centers_new, r_lin, True
        except Exception:
            return centers, None, False

    def refine_by_slp_iterative(centers_init, max_outer=8, init_trust=0.08):
        centers = centers_init.copy()
        r, ok, _ = solve_radii_lp(centers, r_min)
        if not ok:
            return centers, r, False
        best_score = float(np.sum(r))
        best_centers = centers.copy()
        trust = init_trust
        for _ in range(max_outer):
            if time_left() < 0.15:
                break
            subset_idx = list(range(n))
            centers_cand, _, ok_step = slp_subset_step(centers, subset_idx, trust)
            if not ok_step:
                trust *= 0.7
                if trust < 1e-3:
                    break
                continue
            r_rep, ok2, _ = solve_radii_lp(centers_cand, r_min)
            if ok2:
                sc = float(np.sum(r_rep))
                if sc > best_score + 1e-10:
                    centers = centers_cand
                    r = r_rep
                    best_score = sc
                    best_centers = centers.copy()
                    trust = min(trust * 1.3, 0.2)
                else:
                    trust *= 0.6
                    if trust < 1e-3:
                        break
            else:
                trust *= 0.6
                if trust < 1e-3:
                    break
        return best_centers, r, True

    def dual_inform_weights(centers, r, duals):
        w = np.zeros(n, dtype=float)
        if duals is not None:
            num_boundary = 4 * n
            if duals.shape[0] >= num_boundary:
                db = duals[:num_boundary].reshape(n, 4)
                w += np.sum(db, axis=1)
                dp = duals[num_boundary:]
                idx = 0
                for i in range(n):
                    for j in range(i + 1, n):
                        if idx >= dp.shape[0]:
                            break
                        val = dp[idx]
                        w[i] += val
                        w[j] += val
                        idx += 1
        with np.errstate(divide='ignore'):
            inv_r = 1.0 / np.maximum(r, 1e-6)
        w += 0.2 * inv_r
        s = np.sum(w)
        if s <= 0:
            w = np.ones(n) / n
        else:
            w = w / s
        return w

    def init_hex_grid():
        K = n
        cols = int(np.ceil(np.sqrt(K)))
        rows = int(np.ceil(K / cols))
        xs = np.linspace(0.1, 0.9, cols)
        ys = np.linspace(0.1, 0.9, rows)
        pts = []
        cnt = 0
        for r_idx, yy in enumerate(ys):
            off = 0.0 if (r_idx % 2 == 0) else (xs[1] - xs[0]) * 0.5 if len(xs) > 1 else 0.0
            for xx in xs:
                if cnt >= K:
                    break
                x = np.clip(xx + off, 0.06, 0.94)
                x += rng.normal(0, 0.01)
                y = yy + rng.normal(0, 0.01)
                pts.append([x, y])
                cnt += 1
            if cnt >= K:
                break
        pts = np.array(pts, dtype=float)
        while pts.shape[0] < K:
            pts = np.vstack([pts, rng.uniform(0.15, 0.85, size=(1, 2))])
        return clamp01(pts[:n])

    def init_edges():
        pts = []
        e = 1e-3
        corners = [[e, e], [1 - e, e], [e, 1 - e], [1 - e, 1 - e]]
        pts.extend(corners)
        ts = np.linspace(0.15, 0.85, 6)
        for t in ts:
            pts.append([t, e]); pts.append([t, 1 - e])
            pts.append([e, t]); pts.append([1 - e, t])
        pts = np.array(pts[:n], dtype=float)
        if pts.shape[0] < n:
            while pts.shape[0] < n:
                edge = rng.integers(0, 4)
                t = rng.uniform(0.08, 0.92)
                if edge == 0: pts = np.vstack([pts, [t, e]])
                elif edge == 1: pts = np.vstack([pts, [t, 1 - e]])
                elif edge == 2: pts = np.vstack([pts, [e, t]])
                else: pts = np.vstack([pts, [1 - e, t]])
        return clamp01(pts[:n])

    def init_center_biased():
        pts = 0.75 * rng.uniform(0.05, 0.95, size=(n, 2)) + 0.25 * 0.5
        pts += rng.normal(0, 0.03, size=(n, 2))
        return clamp01(pts)

    def init_uniform():
        return clamp01(rng.uniform(0.1, 0.9, size=(n, 2)))

    def evaluate_centers(centers):
        centers = clamp01(centers)
        r, ok, duals = solve_radii_lp(centers, r_min)
        if not ok or r is None:
            b = boundary_limits(centers)
            r = np.maximum(np.minimum(b, 0.5), r_min)
            duals = None
        return np.hstack([centers, r.reshape(-1, 1)]), float(np.sum(r)), duals

    # Seed centers
    seed_centers_list = []
    if isinstance(current_best_solution, np.ndarray):
        try:
            cb = current_best_solution
            if cb.shape[0] == n and cb.shape[1] >= 2:
                centers_cb = clamp01(cb[:, :2].astype(float))
                seed_centers_list.append(centers_cb)
                for _ in range(2):
                    seed_centers_list.append(clamp01(centers_cb + rng.normal(0, 0.01, size=centers_cb.shape)))
        except Exception:
            pass
    seed_centers_list.append(init_hex_grid())
    seed_centers_list.append(init_edges())
    seed_centers_list.append(init_center_biased())
    seed_centers_list.append(init_uniform())
    seed_centers_list.append(clamp01(init_hex_grid() + rng.normal(0, 0.02, size=(n, 2))))

    # Deduplicate seeds with slight jitter on collisions
    unique_seeds = []
    seen = set()
    for c in seed_centers_list:
        h = tuple(np.round(c.ravel(), 3))
        if h in seen:
            c = clamp01(c + rng.normal(0, 0.005, size=c.shape))
        unique_seeds.append(c)
        seen.add(h)
    seed_centers_list = unique_seeds

    best_circles = None
    best_score = -1e18
    best_duals = None

    # Initial evaluation of seeds
    for centers in seed_centers_list:
        circles, score, duals = evaluate_centers(centers)
        if score > best_score + 1e-12:
            best_score = score
            best_circles = circles
            best_duals = duals
            all_scores.append(best_score)

    # Global SLP refine on seeds
    per_seed_time = max(0.4, time_left() / max(1, len(seed_centers_list)))
    for centers0 in seed_centers_list:
        if time_left() < 0.4:
            break
        t_seed_start = time.time()

        def time_left_seed():
            return min(time_left(), per_seed_time - (time.time() - t_seed_start))

        if time_left_seed() < 0.2:
            continue
        centers_ref, r_ref, _ = refine_by_slp_iterative(centers0, max_outer=8, init_trust=0.08)
        circles, score, duals = evaluate_centers(centers_ref)
        if score > best_score + 1e-12:
            best_score = score
            best_circles = circles
            best_duals = duals
            all_scores.append(best_score)

    # LNS loop
    if best_circles is None:
        centers = init_edges()
        r, _, _ = solve_radii_lp(centers, r_min)
        best_circles = np.hstack([centers, r.reshape(-1, 1)])
        best_score = float(np.sum(r))
        all_scores.append(best_score)

    centers = best_circles[:, :2].copy()
    r, ok, duals = solve_radii_lp(centers, r_min)
    if ok:
        best_duals = duals

    stagnation = 0
    attempts = 0
    max_attempts = 1000000
    while time_left() > 0.25 and attempts < max_attempts:
        attempts += 1
        # Determine subset size
        base_m = 8
        if stagnation > 8:
            base_m = 12
        elif stagnation > 3:
            base_m = 10
        m = int(np.clip(base_m + rng.integers(-2, 3), 5, 14))
        # Select subset indices
        weights = dual_inform_weights(centers, r if r is not None else np.ones(n), best_duals)
        try:
            subset_idx = rng.choice(n, size=m, replace=False, p=weights)
        except Exception:
            subset_idx = rng.choice(n, size=m, replace=False)
        trust = 0.08 * (0.9 ** (stagnation // 3))
        trust = float(np.clip(trust, 0.01, 0.2))

        improved_local = False
        for _ in range(3):
            if time_left() < 0.15:
                break
            centers_cand, _, ok_step = slp_subset_step(centers, list(subset_idx), trust)
            if not ok_step:
                trust *= 0.7
                continue
            r_cand, ok_rep, duals_cand = solve_radii_lp(centers_cand, r_min)
            if ok_rep:
                sc = float(np.sum(r_cand))
                if sc > best_score + 1e-10:
                    centers = centers_cand
                    r = r_cand
                    best_score = sc
                    best_circles = np.hstack([centers, r.reshape(-1, 1)])
                    best_duals = duals_cand
                    all_scores.append(best_score)
                    improved_local = True
                    stagnation = 0
                    trust = min(trust * 1.3, 0.25)
                else:
                    trust *= 0.8
            else:
                trust *= 0.7
        if not improved_local:
            stagnation += 1
            # Occasional global SLP to escape
            if stagnation % 6 == 0 and time_left() > 0.25:
                centers_glob, r_glob, _ = refine_by_slp_iterative(centers, max_outer=5, init_trust=0.06)
                circles_glob, sc_glob, duals_glob = evaluate_centers(centers_glob)
                if sc_glob > best_score + 1e-10:
                    best_score = sc_glob
                    best_circles = circles_glob
                    centers = circles_glob[:, :2].copy()
                    r = circles_glob[:, 2].copy()
                    best_duals = duals_glob
                    all_scores.append(best_score)
                    stagnation = 0
            # Occasional random shake of a few smallest-r circles
            if stagnation % 4 == 0:
                if r is None or not np.all(np.isfinite(r)):
                    r = boundary_limits(centers)
                kshake = max(3, n // 6)
                idx_sorted = np.argsort(r)
                picks = idx_sorted[:kshake]
                perturb = rng.normal(0, 0.02 * (1 + 0.05 * stagnation), size=(len(picks), 2))
                centers_shake = centers.copy()
                centers_shake[picks] = clamp01(centers_shake[picks] + perturb)
                r_shake, ok_sh, duals_sh = solve_radii_lp(centers_shake, r_min)
                if ok_sh:
                    sc = float(np.sum(r_shake))
                    if sc > best_score + 1e-10:
                        centers = centers_shake
                        r = r_shake
                        best_score = sc
                        best_circles = np.hstack([centers, r.reshape(-1, 1)])
                        best_duals = duals_sh
                        all_scores.append(best_score)
                        stagnation = 0

        if stagnation > 18:
            # Re-diversify by mixing with a new seed
            seed = init_uniform() if rng.random() < 0.5 else init_hex_grid()
            centers_mix = 0.5 * centers + 0.5 * seed
            centers_mix = clamp01(centers_mix)
            r_mix, ok_mix, duals_mix = solve_radii_lp(centers_mix, r_min)
            if ok_mix:
                sc = float(np.sum(r_mix))
                if sc > best_score + 1e-10:
                    centers = centers_mix
                    r = r_mix
                    best_score = sc
                    best_circles = np.hstack([centers, r.reshape(-1, 1)])
                    best_duals = duals_mix
                    all_scores.append(best_score)
                    stagnation = 0
                else:
                    stagnation = max(stagnation - 5, 0)

    # Final feasibility and polish
    try:
        centers_final = best_circles[:, :2]
        r_final, ok_fin, _ = solve_radii_lp(centers_final, r_min)
        if ok_fin and r_final is not None:
            best_circles = np.hstack([centers_final, r_final.reshape(-1, 1)])
            best_score = float(np.sum(r_final))
            if not all_scores or best_score > all_scores[-1] + 1e-9:
                all_scores.append(best_score)
    except Exception:
        pass

    return {
        'circles': best_circles.astype(float),
        'all_scores': [float(s) for s in all_scores] if all_scores else [float(best_score)]

: 

## What GEPA Discovered

GEPA evolved the simple grid-based baseline into a **sophisticated multi-strategy optimizer**. Here's what the evolved code includes:

### Strategies Discovered by GEPA

| Strategy | Description | How It Helps |
|----------|-------------|--------------|
| **Halton sequences** | Quasi-random initialization | Better initial coverage than random |
| **Zero-vector seeding** | Start from origin | Often near polynomial optima |
| **CMA-ES-style evolution** | Covariance matrix adaptation | Adapts search direction to landscape |
| **Quadratic surrogate models** | Local function approximation | Efficient local optimization |
| **Coordinate descent** | Per-dimension refinement | Fine-tunes individual coordinates |
| **Nelder-Mead subspace** | Simplex method in active dimensions | Exploits important variables |
| **Ridge-linear probes** | Gradient estimation from archive | Uses history for direction hints |

### Results

The evolved code achieves packing densities that **match or exceed** published results from:
- **AlphaEvolve** (DeepMind)
- **ShinkaEvolve**
- **OpenEvolve**

All without any human optimization expertise—just the problem definition and a baseline!

### Key Takeaway

GEPA automatically discovered advanced optimization strategies (Halton sequences, CMA-ES, surrogate models) that typically require expert knowledge to implement. The user only needed to:
1. Define the problem (pack circles)
2. Provide a naive baseline (grid placement)
3. Return informative `side_info` (violations, scores)

---

# KernelBench

This example optimizes CUDA kernels against the KernelBench suite. GEPA evolves two prompts: a kernel generator prompt and a refiner prompt.

## The Challenge

KernelBench provides PyTorch reference models. The candidate must produce a `ModelNew` with custom CUDA kernels (via `load_inline`) that is correct and faster than the PyTorch baseline.

## The Task

**Given**: PyTorch reference models + baseline runtimes
**Find**: Prompts that generate correct, faster CUDA kernels

V100


In [None]:
import os
import time

import dspy

from examples.kernelbench.prompts import BACKGROUND, KERNEL_GEN_PROMPT, REFINER_PROMPT
from examples.kernelbench.eval import get_free_gpus, init_gpu_manager, load_dataset, load_or_measure_baselines
from examples.kernelbench.main import create_fitness_fn, PromptCache, StateTracker, LLM
from gepa.optimize_anything import EngineConfig, GEPAConfig, ReflectionConfig, optimize_anything

run_name = time.strftime("%y%m%d_%H%M%S")
log_dir = f"outputs/artifacts/kernelbench/{run_name}"
os.makedirs(log_dir, exist_ok=True)

dataset = load_dataset()
baselines = load_or_measure_baselines(dataset)

available_gpus = get_free_gpus()
init_gpu_manager(device_list=available_gpus, lock_dir=os.path.join(log_dir, "gpu_locks"))

lm = dspy.LM(LLM, temperature=1.0, max_tokens=32000)


In [None]:
objective = "Generate an LLM prompt that produces fast, correct CUDA kernels outperforming PyTorch baselines."

seed_candidate = {
    "kernel_gen_prompt": KERNEL_GEN_PROMPT,
    "refiner_prompt": REFINER_PROMPT.format(objective=objective),
}

fitness_fn = create_fitness_fn(
    lm,
    baselines=baselines,
    max_refinements=2,
)

config = GEPAConfig(
    engine=EngineConfig(
        run_dir=log_dir,
        max_metric_calls=2000,
        cache_evaluation=True,
        track_best_outputs=True,
        parallel=False,
        max_workers=1,
    ),
    reflection=ReflectionConfig(
        reflection_minibatch_size=3,
        reflection_lm=LLM,
    ),
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=dataset,
    config=config,
    objective=objective,
    background=BACKGROUND,
)


---

## The Challenge

KernelBench provides PyTorch reference models. The candidate must produce a `ModelNew` with custom CUDA kernels (via `load_inline`) that is correct and faster than the PyTorch baseline.

## The Task

**Given**: PyTorch reference models + baseline runtimes
**Find**: Prompts that generate correct, faster CUDA kernels

Results are hardware-dependent; see `outputs/artifacts/kernelbench/` for latest runs.


# 8. Example 5: Systems Optimization — Cloud Infrastructure

**Result: GEPA discovers cost-saving strategies for real cloud infrastructure problems.**

We demonstrate `optimize_anything` on two challenging systems problems from the [ADRS project](https://adrs-ucb.notion.site/):

| Problem | Domain | Baseline Cost | Optimized Cost | Savings |
|---------|--------|---------------|----------------|---------|
| **Can't Be Late** | Spot Instance Scheduling | 96.48 | 89.86 | **6.9%** |
| **Cloudcast** | Multi-Cloud Broadcast | $191 | $120 | **37.3%** |

These problems involve optimizing complex algorithms with real-world constraints—exactly where traditional optimization struggles and LLM-based reflection excels.

## 8.1 Can't Be Late: Spot Instance Scheduling

The [Can't Be Late problem](https://www.usenix.org/conference/nsdi24/presentation/wu-zhanghao) from NSDI'24 challenges us to minimize cloud computing costs while meeting deadlines.

### The Challenge

A cloud job must complete before a deadline using:
- **SPOT instances**: Cheap (~$0.3/hour) but can be preempted at any time
- **ON_DEMAND instances**: Expensive (~$1/hour) but guaranteed availability  
- **NONE**: Wait without using any instances

The strategy must decide dynamically which instance type to use based on:
- Time remaining until deadline
- Current spot availability
- Restart overhead when preempted

**Goal**: Minimize cost while ensuring task completion before deadline.

### Why This is Hard

- Spot preemption is **stochastic** — you can't predict when you'll lose your instance
- The optimal strategy depends on **multiple factors**: deadline slack, restart cost, spot availability
- Simple heuristics (always use spot, always use on-demand) are suboptimal

### Setup

```bash
# Extract trace data
cd examples/adrs/can_be_late/simulator
tar -xzf real_traces.tar.gz

# Install dependencies
pip install -r examples/adrs/can_be_late/simulator/requirements.txt
```

### The Seed Candidate

We start with a simple baseline strategy that uses spot when available and switches to on-demand only when deadline is tight:

In [None]:
import math
from sky_spot.strategies.strategy import Strategy
from sky_spot.utils import ClusterType

class EvolveSingleRegionStrategy(Strategy):
    NAME = 'evolve_single_region'
    
    def __init__(self, args):
        super().__init__(args)
    
    def reset(self, env, task):
        super().reset(env, task)
    
    def _step(self, last_cluster_type: ClusterType, has_spot: bool) -> ClusterType:
        env = self.env
        
        # Task completion check
        remaining_task_time = self.task_duration - sum(self.task_done_time)
        if remaining_task_time <= 1e-3:
            return ClusterType.NONE
        
        # Calculate remaining time until deadline
        remaining_time = self.deadline - env.elapsed_seconds
        
        # Simple deadline check: if we're running out of time, use ON_DEMAND
        # Add restart overhead to account for potential restart
        if remaining_task_time + self.restart_overhead >= remaining_time:
            # We need ON_DEMAND to guarantee completion
            return ClusterType.ON_DEMAND
        
        # Simple greedy logic: use SPOT if available, wait otherwise
        if has_spot:
            return ClusterType.SPOT
        else:
            # Just wait for SPOT to become available
            return ClusterType.NONE
    
    @classmethod
    def _from_args(cls, parser):
        args, _ = parser.parse_known_args()
        return cls(args)

### The Fitness Function

The fitness function runs simulations on real AWS spot availability traces. The full evaluator is in `examples/adrs/can_be_late/evaluator.py`:

In [None]:
from examples.adrs.can_be_late.evaluator import create_fitness_function
from examples.adrs.can_be_late.trace_dataset import load_trace_dataset

# Load real AWS spot availability traces
dataset_root = "examples/adrs/can_be_late/simulator/real"
splits = load_trace_dataset(dataset_root=dataset_root)
train_set, val_set, test_set = splits["train"], splits["val"], splits["test"]

# Create fitness function that runs simulations
# Score = -cost (higher is better, lower cost is better)
fitness_fn = create_fitness_function(timeout=300)

### Running GEPA Optimization,

In [None]:
from gepa.optimize_anything import optimize_anything, GEPAConfig, EngineConfig, ReflectionConfig

OPTIMIZATION_OBJECTIVE = """Optimize a cloud scheduling strategy for the "Can't Be Late" problem.
The strategy decides when to use SPOT instances (cheap but can be preempted) vs ON_DEMAND 
instances (expensive but reliable) to complete a task before its deadline."""

OPTIMIZATION_BACKGROUND = """Key information:
- ClusterType.SPOT: Cheap (~$0.3/hour) but can be preempted
- ClusterType.ON_DEMAND: Expensive (~$1/hour) but guaranteed
- ClusterType.NONE: Wait without using instances
- restart_overhead: Time penalty when switching instances
- Lower cost is better (scores are negative cost in dollars)"""

gepa_config = GEPAConfig(
    engine=EngineConfig(
        run_dir="runs/cant_be_late",
        max_metric_calls=1000,
        track_best_outputs=True,
        use_cloudpickle=True,
    ),
    reflection=ReflectionConfig(
        reflection_minibatch_size=3,
        reflection_lm="openai/gpt-4o",  # Or use get_reflection_lm() for Gemini
    ),
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=train_set,
    valset=val_set,
    objective=OPTIMIZATION_OBJECTIVE,
    background=OPTIMIZATION_BACKGROUND,
    config=gepa_config,
)

### Results

![Can't Be Late Optimization](examples/adrs/can_be_late/optimization_trajectory.png)

GEPA evolved a strategy that achieves **6.9% cost reduction** compared to the baseline:

| Metric | Value |
|--------|-------|
| Base Cost | 96.48 |
| Optimized Cost | 89.86 |
| **Cost Savings** | **6.9%** |

The evolved strategy learned nuanced decision boundaries for when to switch between spot and on-demand instances based on deadline slack and restart overhead.

### What GEPA Discovered

GEPA evolved a sophisticated strategy with several key improvements over the baseline:
1. Overhead-aware switching: Considers restart cost when deciding to switch instance types
2. Anti-thrashing logic: Prevents expensive restart loops by "latching" to ON_DEMAND
3. Dynamic safety buffer: Scales buffer with remaining task size (max(2h, 10% of remaining))

In [None]:
import math
from sky_spot.strategies.strategy import Strategy
from sky_spot.utils import ClusterType

class EvolveSingleRegionStrategy(Strategy):
    NAME = 'evolve_single_region'
    
    def __init__(self, args):
        super().__init__(args)
    
    def reset(self, env, task):
        super().reset(env, task)
    
    def _step(self, last_cluster_type: ClusterType, has_spot: bool) -> ClusterType:
        env = self.env
        
        # Calculate remaining task work
        remaining_task_time = self.task_duration - sum(self.task_done_time)
        if remaining_task_time <= 1e-3:
            return ClusterType.NONE
        
        # Calculate remaining wall clock time until deadline
        remaining_time = self.deadline - env.elapsed_seconds
        
        # Calculate overhead to switch to (or stay on) ON_DEMAND.
        # If we are already on ON_DEMAND, continuing incurs no overhead.
        # If we are on SPOT or NONE, switching to ON_DEMAND incurs restart_overhead.
        overhead_to_od = self.restart_overhead if last_cluster_type != ClusterType.ON_DEMAND else 0.0
        
        # 1. Hard Deadline Constraint (Point of No Return)
        # If we wait any longer, we won't finish even with guaranteed ON_DEMAND.
        # We must verify we have enough time for the work plus any switch overhead.
        if remaining_task_time + overhead_to_od >= remaining_time:
            return ClusterType.ON_DEMAND
        
        # 2. Spot Usage Strategy
        if has_spot:
            # Check if switching to SPOT is safe regarding the deadline.
            # Switching to SPOT incurs overhead (if not already on it).
            overhead_to_spot = self.restart_overhead if last_cluster_type != ClusterType.SPOT else 0.0
            
            # If the overhead of switching to SPOT consumes the remaining slack such that 
            # we can't finish, we must stick with (or switch to) ON_DEMAND.
            if remaining_task_time + overhead_to_spot >= remaining_time:
                return ClusterType.ON_DEMAND
                
            return ClusterType.SPOT
        
        # 3. Unavailable Spot Strategy
        # Spot is unavailable. We must decide whether to Wait (NONE) or Run (ON_DEMAND).
        
        # Calculate current slack: Time margin we can afford to waste.
        current_slack = remaining_time - remaining_task_time - overhead_to_od
        
        # Safety Buffer:
        # Maintain a buffer to handle future volatility.
        # We use max(2.0h, 10% of remaining task) to scale with task size.
        safety_buffer = max(2.0, 0.1 * remaining_task_time)
        
        # Anti-Thrashing Logic (Latching):
        # If we are currently running ON_DEMAND (due to low slack or PNR), we should
        # maintain this state until Spot returns. Switching back to NONE (waiting) 
        # when slack is marginally above the buffer causes expensive restart loops 
        # (thrashing) as the buffer size dynamically adjusts with task progress.
        if last_cluster_type == ClusterType.ON_DEMAND:
            return ClusterType.ON_DEMAND
        
        # If we are currently Waiting (NONE) or were on SPOT, check if slack is critically low.
        if current_slack < safety_buffer:
            return ClusterType.ON_DEMAND
            
        # Slack is sufficient; wait for cheaper SPOT instances.
        return ClusterType.NONE
    
    @classmethod
    def _from_args(cls, parser):
        args, _ = parser.parse_known_args()
        return cls(args)

## 8.2 Cloudcast: Multi-Cloud Broadcast Optimization

The Cloudcast problem optimizes data broadcast across multiple cloud providers (AWS, GCP, Azure).

### The Challenge

Transfer data from a single source to multiple destinations across cloud providers:
- **Heterogeneous pricing**: Egress costs vary dramatically between providers
- **Bandwidth constraints**: Different regions have different throughput limits
- **Partitioning**: Data can be split and sent via different paths

**Goal**: Minimize total cost (egress fees + instance costs) while delivering data to all destinations.

### Why This is Hard

- The search space is **combinatorial** — exponentially many possible routing trees
- Costs are **non-uniform** — inter-cloud transfers cost more than intra-cloud
- Simple shortest-path algorithms ignore cost structure

### Setup

```bash
pip install networkx pandas numpy
pip install -r examples/adrs/cloudcast/requirements.txt
```

### The Seed Candidate

We start with a simple Dijkstra shortest-path baseline:

In [None]:
seed_candidate = {"program": """import networkx as nx
from typing import Dict, List

class BroadCastTopology:
    def __init__(self, src: str, dsts: List[str], num_partitions: int = 4, paths=None):
        self.src = src
        self.dsts = dsts
        self.num_partitions = num_partitions
        self.paths = paths if paths else {dst: {str(i): None for i in range(num_partitions)} for dst in dsts}

    def set_num_partitions(self, num_partitions: int):
        self.num_partitions = num_partitions

    def append_dst_partition_path(self, dst: str, partition: int, path: List):
        partition = str(partition)
        if self.paths[dst][partition] is None:
            self.paths[dst][partition] = []
        self.paths[dst][partition].append(path)


def search_algorithm(src, dsts, G, num_partitions):
    \"\"\"
    Find broadcast paths from source to all destinations.
    Uses Dijkstra's shortest path based on cost as edge weight.
    \"\"\"
    h = G.copy()
    h.remove_edges_from(list(h.in_edges(src)) + list(nx.selfloop_edges(h)))
    bc_topology = BroadCastTopology(src, dsts, num_partitions)

    for dst in dsts:
        path = nx.dijkstra_path(h, src, dst, weight="cost")
        for i in range(0, len(path) - 1):
            s, t = path[i], path[i + 1]
            for j in range(bc_topology.num_partitions):
                bc_topology.append_dst_partition_path(dst, j, [s, t, G[s][t]])

    return bc_topology
"""}

### The Fitness Function

The fitness function simulates the broadcast and computes costs. Full evaluator is in `examples/adrs/cloudcast/evaluator.py`:

In [None]:
from examples.adrs.cloudcast.evaluator import create_fitness_function, load_config_dataset

# Load broadcast configuration scenarios
config_dir = "examples/adrs/cloudcast/cloudcast/config"
samples = load_config_dataset(config_dir=config_dir)
train_set = val_set = test_set = samples  # Use all configs for train/val/test

# Create fitness function
# Score = 1 / (1 + total_cost) - higher score means lower cost
fitness_fn = create_fitness_function(timeout=300)

### Running GEPA Optimization

In [None]:
from gepa.optimize_anything import optimize_anything, GEPAConfig, EngineConfig, ReflectionConfig

OPTIMIZATION_OBJECTIVE = """Optimize a broadcast routing algorithm for multi-cloud data transfer.
The algorithm decides how to route data from a single source to multiple destinations
across cloud providers (AWS, GCP, Azure). Minimize total cost (egress + instance costs)."""

OPTIMIZATION_BACKGROUND = """Key information:
- Nodes are cloud regions (e.g., "aws:us-east-1", "gcp:europe-west1")
- Edges have 'cost' ($/GB egress) and 'throughput' (Gbps) attributes
- Data is partitioned and can be routed independently
- Total cost = egress costs + instance costs
- Score = 1 / (1 + total_cost) - higher is better"""

gepa_config = GEPAConfig(
    engine=EngineConfig(
        run_dir="runs/cloudcast",
        max_metric_calls=1000,
        track_best_outputs=True,
        use_cloudpickle=True,
    ),
    reflection=ReflectionConfig(
        reflection_minibatch_size=3,
        reflection_lm="openai/gpt-4o",
    ),
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=train_set,
    valset=val_set,
    objective=OPTIMIZATION_OBJECTIVE,
    background=OPTIMIZATION_BACKGROUND,
    config=gepa_config,
)

### Results

![Cloudcast Optimization](examples/adrs/cloudcast/optimization_trajectory.png)

GEPA evolved a routing algorithm that achieves **37.3% cost reduction**:

| Metric | Value |
|--------|-------|
| Base Cost | $191 |
| Optimized Cost | $120 |
| **Cost Savings** | **37.3%** |

The evolved algorithm discovered strategies for:
- Preferring intra-cloud paths when possible
- Intelligent partition routing to balance load
- Cost-aware path selection over naive shortest-path

### What GEPA Discovered

GEPA evolved a sophisticated routing algorithm with several key innovations:
1. Iterative Marginal-Cost Shortest Path: Replaces Steiner Tree heuristic
2. Zero marginal cost for reused edges: Encourages multicast tree building
3. Congestion-aware routing: Penalties based on partition-level usage

In [None]:
import networkx as nx
import random
from typing import Dict, List

class SingleDstPath(Dict):
    partition: int
    edges: List[List]  # [[src, dst, edge data]]

class BroadCastTopology:
    def __init__(self, src: str, dsts: List[str], num_partitions: int = 4, paths: Dict[str, 'SingleDstPath'] = None):
        self.src = src
        self.dsts = dsts
        self.num_partitions = num_partitions
        if paths is not None:
            self.paths = paths
        else:
            self.paths = {dst: {str(i): None for i in range(num_partitions)} for dst in dsts}

    def get_paths(self):
        return self.paths

    def set_num_partitions(self, num_partitions: int):
        self.num_partitions = num_partitions

    def set_dst_partition_paths(self, dst: str, partition: int, paths: List[List]):
        partition = str(partition)
        self.paths[dst][partition] = paths

    def append_dst_partition_path(self, dst: str, partition: int, path: List):
        partition = str(partition)
        if self.paths[dst][partition] is None:
            self.paths[dst][partition] = []
        self.paths[dst][partition].append(path)

def search_algorithm(src, dsts, G, num_partitions):
    """
    Optimized broadcast routing algorithm using Iterative Marginal-Cost Shortest Path.
    
    Improvements:
    1. Replaced complex Steiner Tree heuristic with a destination-iterative Shortest Path approach.
       - This prevents the creation of inefficient, deep trees which were hurting transfer times.
       - It maintains 'Multicast' cost benefits by setting the cost of edges already used 
         in the current partition to zero (Marginal Cost logic).
    2. Congestion Management:
       - Tracks usage at the partition level.
       - Penalties are applied to throughput to balance load across partitions.
    3. Tuning:
       - ALPHA increased to 0.02 to better balance Egress Cost vs Transfer Time.
       - Random shuffle of destinations prevents bias in shared path construction.
    """
    
    # PARAMETERS
    # Balances Cost ($) vs 1/Throughput (Time). 
    # Increased from 0.005 to 0.02 to penalize low-bandwidth paths more aggressively,
    # addressing high transfer times observed in evaluation.
    ALPHA = 0.02 
    
    # Seed for reproducibility
    random.seed(42)

    bc_topology = BroadCastTopology(src, dsts, num_partitions)
    
    # Global usage: (u, v) -> count of partitions utilizing this link
    # We use this to calculate the 'Time Penalty' for subsequent partitions.
    global_edge_usage = {} 

    # We iterate through each partition to assign paths
    for part_id in range(num_partitions):
        # Track edges activated in the CURRENT partition.
        # In an overlay multicast model, the source pays egress only once per link per partition,
        # regardless of how many destinations share that link.
        partition_active_edges = set()
        
        # Shuffle destinations to avoid routing bias (e.g., always optimizing for the first dst)
        current_dsts = list(dsts)
        random.shuffle(current_dsts)
        
        # Define a dynamic weight function for Dijkstra
        def get_weight(u, v, d):
            # 1. Throughput / Congestion Component
            throughput = d.get('throughput', 1.0)
            if throughput < 1e-6: throughput = 1e-6
            
            # Usage penalty applies to the 'Time' aspect.
            # We assume bandwidth is shared or congested by total active partitions.
            usage = global_edge_usage.get((u, v), 0)
            time_penalty = (1.0 + usage) / throughput
            
            # 2. Cost Component
            # If the edge is already active in this partition, marginal egress cost is 0.
            # This encourages building a shared multicast tree greedily.
            if (u, v) in partition_active_edges:
                cost = 0.0
            else:
                cost = d.get('cost', 0.0)
            
            return cost + (ALPHA * time_penalty)

        for dst in current_dsts:
            if dst == src:
                continue
            
            path_segments = []
            try:
                # Find shortest path using the custom marginal-cost weights
                # This naturally gravitates towards reusing existing branches of the tree (Steiner-like)
                # but will branch out (Unicast) if the shared path is too slow or expensive.
                path_nodes = nx.shortest_path(G, source=src, target=dst, weight=get_weight)
                
                # Reconstruct path data
                for i in range(len(path_nodes) - 1):
                    u, v = path_nodes[i], path_nodes[i+1]
                    path_segments.append([u, v, G[u][v]])
                    
                    # Mark edge as active for this partition
                    partition_active_edges.add((u, v))
                    
            except nx.NetworkXNoPath:
                # Fallback if unreachable
                path_segments = []
            
            bc_topology.set_dst_partition_paths(dst, part_id, path_segments)
        
        # After routing all destinations for this partition, update global congestion stats
        for u, v in partition_active_edges:
            global_edge_usage[(u, v)] = global_edge_usage.get((u, v), 0) + 1

    return bc_topology

### Key Takeaways from Systems Optimization

1. **Rich Side Information Matters**: Both problems provide detailed simulation output (costs, timings, paths taken). This lets GEPA understand *why* a strategy underperforms—not just that it does.

2. **Domain Complexity**: These problems involve stochastic elements (spot preemption), complex constraints (deadlines, bandwidth limits), and non-linear cost structures. Traditional optimizers struggle here.

3. **Code as the Optimized Artifact**: Unlike prompt optimization, we're evolving actual *algorithms*—decision functions and routing strategies. GEPA treats code as just another text artifact to optimize.

4. **Real-World Impact**: These aren't toy problems. The Cloudcast savings of 37% on a large-scale data transfer could mean thousands of dollars saved per transfer.

---

The complete code for both examples is available in:
- `examples/adrs/can_be_late/` — Spot instance scheduling
- `examples/adrs/cloudcast/` — Multi-cloud broadcast

---

<a id="section-8"></a>
# 9. How It Works Under the Hood

GEPA (Generative Evolutionary Prompting with ASI) operates through a loop of **evaluation**, **reflection**, and **proposal**:

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                              GEPA LOOP                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────────┐                                                          │
│   │  EVALUATE    │  Run fitness_fn on candidates                            │
│   │              │  → Collect scores AND side_info                          │
│   └──────┬───────┘                                                          │
│          │                                                                  │
│          ▼                                                                  │
│   ┌──────────────┐                                                          │
│   │   SELECT     │  Choose candidates for mutation                          │
│   │              │  → Pareto selection across objectives/instances          │
│   │              │  → Epsilon-greedy exploration                            │
│   └──────┬───────┘                                                          │
│          │                                                                  │
│          ▼                                                                  │
│   ┌──────────────┐                                                          │
│   │   REFLECT    │  LLM analyzes evaluation results                         │
│   │              │  → "Why did this candidate fail?"                        │
│   │              │  → Uses side_info to understand failure modes            │
│   └──────┬───────┘                                                          │
│          │                                                                  │
│          ▼                                                                  │
│   ┌──────────────┐                                                          │
│   │   PROPOSE    │  LLM generates improved candidates                       │
│   │              │  → Targeted mutations based on reflection                │
│   │              │  → Preserves successful behaviors                        │
│   └──────┬───────┘                                                          │
│          │                                                                  │
│          └──────────────────► REPEAT until stopping condition               │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## Key Components

### 1. Pareto Frontier
GEPA maintains a **Pareto frontier** of candidates that are optimal on different subsets of the data:
- **Multi-objective**: Some candidates optimize for accuracy, others for speed
- **Instance-level**: Some candidates excel on certain problem types
- **Diversity**: The frontier preserves diverse strategies for exploration

### 2. Reflective Mutation
Unlike **random mutation** in traditional evolutionary algorithms, GEPA uses LLMs to make **targeted improvements**:

| Traditional EA | GEPA |
|---------------|------|
| Random bit flips | LLM analyzes failure modes |
| Blind crossover | LLM preserves working patterns |
| Requires many generations | Sample-efficient |
| No domain knowledge | Uses side_info for context |

### 3. Side Information Flow
The `side_info` returned by your fitness function powers the reflection:

```python
# What the LLM sees during reflection:
"""
Current candidate: {code: "def solve(x): ..."}

Evaluation results on 3 examples:
  Example 1: Score 0.8
    Input: "Pack 26 circles"
    Output: circles array
    Error: "Circles 3 and 7 overlap"
    
  Example 2: Score 0.0  
    Input: "Pack 26 circles"
    Error: "IndexError on line 42"
    
  Example 3: Score 1.0
    Input: "Pack 26 circles"
    Output: Valid packing with sum_radii=2.89

Propose an improved version that fixes these issues.
"""
```

In [None]:
fitness_fn = create_fitness_fn(
    lm,
    baselines=baselines,
    use_rag=True,
    max_refinements=5,
    tracker=tracker,
    kernel_gen_cache=kernel_gen_cache,
    refiner_cache=refiner_cache,
)

config = GEPAConfig(
    engine=EngineConfig(
        run_dir=log_dir,
        max_metric_calls=2000,
        cache_evaluation=True,
        track_best_outputs=True,
        parallel=False,
        max_workers=1,
    ),
    reflection=ReflectionConfig(
        reflection_minibatch_size=3,
        reflection_lm=LLM,
    ),
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=dataset,
    config=config,
    objective=objective,
    background=BACKGROUND,
)


---

<a id="section-9"></a>
# 10. Conclusion: From Imperative to Declarative Optimization

We are witnessing a **paradigm shift** in optimization—from imperative implementations to declarative specifications:

| **Old Paradigm** | **New Paradigm with `optimize_anything`** |
|------------------|------------------------------------------|
| Imperative: specify *how* to optimize | Declarative: specify *what* to optimize |
| Different libraries for different problems | **One API for everything** |
| Mathematically-specific algorithms | Language-driven proposal generation |
| Scalar fitness only | **Rich diagnostic information (ASI)** |
| Random mutations | **Targeted, reflective mutations** |
| Expert knowledge required | LLM brings domain knowledge |

## The `optimize_anything` Vision

**If it can be represented as text, it can be optimized.**

| Domain | What You Optimize | Example |
|--------|-------------------|---------|
| **Code** | Algorithms, implementations | Black-box optimization code |
| **Prompts** | Instructions, examples | System prompts for math problems |
| **Agent Architectures** | Program structure, control flow | DSPy programs for ARC-AGI |
| **Configurations** | Hyperparameters, settings | JSON/YAML configs |
| **Data Structures** | Schemas, templates | API specifications |

## Why This Matters

1. **Democratization**: You don't need a PhD in optimization to solve hard problems
2. **Generalization**: One framework, infinite applications
3. **Sample Efficiency**: LLM reflection beats random search
4. **Emergent Capabilities**: GEPA discovers strategies you wouldn't think of

## Getting Started

```bash
pip install gepa
```

In [None]:
from gepa.optimize_anything import optimize_anything, GEPAConfig, EngineConfig, ReflectionConfig

# 1. Define your seed candidate (starting point)
seed_candidate = {
    "my_param": "initial value"  # Can be code, prompt, config, etc.
}

# 2. Define your fitness function (how to measure success)
def fitness_fn(candidate, example=None):
    # Run your system with the candidate
    output = run_my_system(candidate["my_param"], example)
    
    # Compute score (higher is better)
    score = compute_score(output, example)
    
    # Collect rich diagnostic information (ASI)
    side_info = {
        "Input": example,
        "Output": output,
        "Expected": example.get("answer") if example else None,
        "Error": get_error_message(output),
        "Feedback": analyze_performance(output),
    }
    
    return score, output, side_info

# 3. Run optimization
result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=my_examples,  # Optional: for multi-instance mode
    objective="Find a parameter that maximizes performance",  # Optional: guidance
    config=GEPAConfig(
        engine=EngineConfig(max_metric_calls=100),
        reflection=ReflectionConfig(reflection_lm="openai/gpt-4o"),
    ),
)

# 4. Use the optimized result
print("Best candidate:", result.best_candidate)
print("Best score:", result.best_score)

## Summary: What We Showed

| Example | What We Optimized | Key Insight |
|---------|-------------------|-------------|
| **Mathematical Optimization** | Python code for black-box optimization | GEPA discovers algorithms automatically |
| **Prompt Engineering** | System prompts for math problems | LLM reflection finds domain-specific strategies |
| **Agent Evolution** | DSPy programs for ARC-AGI | Self-refinement emerged without being programmed |
| **Algorithmic Discovery** | Circle packing algorithms | Matches state-of-the-art (AlphaEvolve, etc.) |
| **Systems: Scheduling** | Spot instance strategies | 6.9% cost reduction on real AWS traces |
| **Systems: Networking** | Multi-cloud broadcast routing | 37.3% cost reduction on cloud transfers |

## Key Takeaways

1. **Unified Interface**: One API for prompts, code, configs, and agent architectures

2. **Side Information (ASI) is Key**: The more diagnostic information you provide, the better GEPA can reason about improvements

3. **Beyond Scalar Optimization**: Traditional optimizers only see scores; GEPA sees error messages, execution traces, and domain-specific feedback

4. **Emergent Capabilities**: Sophisticated strategies (like self-refinement in ARC-AGI) emerge without explicit programming

5. **The Convex Hull**: `optimize_anything` is designed to cover all text-based optimization problems under one abstraction

---

## Try It Yourself

**If you can express your system's parameters as text and compute a score with diagnostic feedback, GEPA can optimize it.**

```python
pip install gepa
```

```python
from gepa.optimize_anything import optimize_anything

result = optimize_anything(
    seed_candidate={"your_param": "your_value"},
    fitness_fn=your_fitness_function,
)
```

---

*GEPA is open-source. Star us on [GitHub](https://github.com/stanfordnlp/gepa)!*

---

## Appendix: Full Code Examples

The complete, runnable code for all examples in this post can be found in the `examples/` directory:

- `examples/new_polynomial/` — Mathematical optimization (EvalSet)
- `examples/math/` — Prompt engineering (AIME 2025)
- `examples/arc_agi/` — Agent program evolution (ARC-AGI)
- `examples/circle_packing/` — Algorithmic discovery (Circle Packing)
- `examples/adrs/can_be_late/` — Spot instance scheduling
- `examples/adrs/cloudcast/` — Multi-cloud broadcast optimization

---

## Minimal Working Example: Optimize a Sorting Function

Here's a complete, runnable example that optimizes a Python sorting function:

In [None]:
"""
Minimal working example: Optimize a sorting function
This evolves Python code that sorts a list of numbers.
"""
import time
from gepa.optimize_anything import optimize_anything, GEPAConfig, EngineConfig, ReflectionConfig

# 1. SEED CANDIDATE: A naive bubble sort implementation
seed_candidate = {
    "code": """
def sort_list(arr):
    '''Sort a list of numbers in ascending order.'''
    n = len(arr)
    for i in range(n):
        for j in range(0, n-i-1):
            if arr[j] > arr[j+1]:
                arr[j], arr[j+1] = arr[j+1], arr[j]
    return arr
"""
}

# 2. DATASET: Test cases to optimize on
dataset = [
    {"input": [64, 34, 25, 12, 22, 11, 90], "expected": [11, 12, 22, 25, 34, 64, 90]},
    {"input": [5, 1, 4, 2, 8], "expected": [1, 2, 4, 5, 8]},
    {"input": [3, 3, 1, 2, 1], "expected": [1, 1, 2, 3, 3]},
    {"input": list(range(100, 0, -1)), "expected": list(range(1, 101))},  # Worst case
]

# 3. FITNESS FUNCTION: Measure correctness and speed
def fitness_fn(candidate, example):
    code = candidate["code"]
    
    try:
        # Execute the code
        exec(code, globals())
        
        # Time the execution
        start = time.time()
        result = sort_list(example["input"].copy())
        elapsed = time.time() - start
        
        # Check correctness
        correct = result == example["expected"]
        score = 1.0 if correct else 0.0
        
        # Bonus for speed (if correct)
        if correct and elapsed < 0.001:
            score += 0.1
        
        # Rich side_info for LLM reflection
        side_info = {
            "Input": example["input"],
            "Output": result,
            "Expected": example["expected"],
            "Correct": correct,
            "Time (ms)": elapsed * 1000,
            "Error": None,
        }
        
    except Exception as e:
        score = 0.0
        side_info = {
            "Input": example["input"],
            "Error": str(e),
            "Code": code,
        }
    
    return score, {"code": code, "result": result if 'result' in dir() else None}, side_info

# 4. RUN OPTIMIZATION
result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=dataset,
    objective="Optimize the sorting function for correctness and speed.",
    background="Consider algorithms like quicksort, mergesort, or heapsort.",
    config=GEPAConfig(
        engine=EngineConfig(max_metric_calls=50),
        reflection=ReflectionConfig(reflection_lm="openai/gpt-4o-mini"),
    ),
)

# 5. USE THE RESULT
print("=" * 60)
print("OPTIMIZED CODE:")
print("=" * 60)
print(result.best_candidate["code"])
print(f"\nBest score: {result.best_score}")

### What This Example Demonstrates

1. **Seed Candidate**: We start with a naive O(n²) bubble sort
2. **Dataset**: Four test cases including a worst-case reversed list
3. **Fitness Function**: 
   - Returns correctness score (0 or 1)
   - Returns **rich side_info** including input, output, timing, and errors
4. **Optimization**: GEPA will evolve the code to find faster algorithms
5. **Result**: Often discovers quicksort or similar O(n log n) algorithms

The key is the `side_info` dictionary—it tells GEPA exactly what went wrong so it can make targeted improvements.

---

## The Challenge

KernelBench provides PyTorch reference models. The candidate must produce a `ModelNew` with custom CUDA kernels (via `load_inline`) that is correct and faster than the PyTorch baseline.

## The Task

**Given**: PyTorch reference models + baseline runtimes
**Find**: Prompts that generate correct, faster CUDA kernels

Results are hardware-dependent; see `outputs/artifacts/kernelbench/` for latest runs.


In [None]:
levels = ["level1"]
run_name = time.strftime("%y%m%d_%H%M%S")
log_dir = f"outputs/artifacts/kernelbench/{run_name}"
os.makedirs(log_dir, exist_ok=True)

dataset = load_dataset(levels=levels)
baselines = load_or_measure_baselines(dataset)

available_gpus = get_free_gpus() or list(range(4))
init_gpu_manager(device_list=available_gpus, lock_dir=os.path.join(log_dir, "gpu_locks"))

tracker = StateTracker(log_dir=log_dir, total_problems=len(dataset))
kernel_gen_cache = PromptCache(cache_dir=os.path.join(log_dir, "kernel_gen_cache"), name="kernel_gen")
refiner_cache = PromptCache(cache_dir=os.path.join(log_dir, "refiner_cache"), name="refiner")

lm = dspy.LM(LLM, temperature=1.0, max_tokens=32000)


In [None]:
fitness_fn = create_fitness_fn(
    lm,
    baselines=baselines,
    use_rag=True,
    max_refinements=5,
    tracker=tracker,
    kernel_gen_cache=kernel_gen_cache,
    refiner_cache=refiner_cache,
)

config = GEPAConfig(
    engine=EngineConfig(
        run_dir=log_dir,
        max_metric_calls=2000,
        cache_evaluation=True,
        track_best_outputs=True,
        parallel=False,
        max_workers=1,
    ),
    reflection=ReflectionConfig(
        reflection_minibatch_size=3,
        reflection_lm=LLM,
    ),
)

result = optimize_anything(
    seed_candidate=seed_candidate,
    fitness_fn=fitness_fn,
    dataset=dataset,
    config=config,
    objective=objective,
    background=BACKGROUND,
)


---

## When to Use `optimize_anything`

### Best Use Cases

| Problem Type | Example | Why GEPA Excels |
|--------------|---------|-----------------|
| **Prompt Engineering** | System prompts, few-shot examples | LLM understands language nuances |
| **Code Evolution** | Algorithm design, bug fixes | LLM can read and write code |
| **Agent Architecture** | DSPy programs, reasoning pipelines | LLM can propose structural changes |
| **Configuration Tuning** | JSON/YAML configs | LLM understands parameter relationships |
| **Template Optimization** | Email templates, API specs | LLM understands domain context |

### When Traditional Methods May Be Better

| Problem Type | Better Alternative | Why |
|--------------|-------------------|-----|
| **Neural Network Training** | PyTorch + SGD | Gradient information is crucial |
| **Convex Optimization** | SciPy, CVXPY | Mathematical structure exploitable |
| **Combinatorial (small scale)** | OR-Tools, SAT solvers | Exact methods available |

### The Rule of Thumb

**Use `optimize_anything` when:**
1. The artifact being optimized can be meaningfully represented as text
2. You can provide informative feedback about why candidates fail
3. Domain knowledge would help but isn't easily encoded as math
4. The search space is too complex for grid/random search

---

*Questions? Issues? Contributions welcome at [github.com/stanfordnlp/gepa](https://github.com/stanfordnlp/gepa)*