In this example, we will see GEPA evolve the whole DSPy program (not just the instruction), including modifying the structure/dataflow of the program. We will use GEPA to tune a simple dspy.ChainOfThought module for crossword puzzles into a full DSPy program.

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Verify OPENAI_API_KEY is loaded
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("OPENAI_API_KEY not found in .env file")

In [None]:
import dspy

In [None]:
import json
import random


# Create a simple dataset object
class CrosswordsDataset:
    def __init__(self, train, dev, test):
        self.train = train
        self.dev = dev
        self.test = test

def load_jsonl(file_path):
    examples = []
    try:
        with open(file_path, 'r') as f:
            for line in f:
                data = json.loads(line.strip())
                # Convert to DSPy format for crosswords
                examples.append(dspy.Example(
                    puzzle=data['input'],
                    solution=data['scoring_data']['solution']
                ).with_inputs('puzzle'))
    except FileNotFoundError:
        print(f"Warning: {file_path} not found")
        return []
    return examples

def load_crosswords_data(data_dir, difficulty=None):
    """Load crosswords dataset from JSONL files

    Args:
        data_dir: Base directory containing crosswords data
        difficulty: Optional difficulty level ("monday", "tuesday", etc.)
    """
    # Load from difficulty-specific subdirectory if it exists
    difficulty_dir = f"{data_dir}/{difficulty}"
    train_data = load_jsonl(f"{difficulty_dir}/train.jsonl")
    dev_data = load_jsonl(f"{difficulty_dir}/valid.jsonl")
    test_data = load_jsonl(f"{difficulty_dir}/test.jsonl")

    random.Random(0).shuffle(train_data)
    random.Random(0).shuffle(dev_data)

    test_data = train_data[:20]  # Crosswords can take a while to run, so use a smaller test set

    return CrosswordsDataset(train_data, dev_data, test_data)

# Load the crosswords dataset
DIFFICULTY = "monday"  # Can be changed to "tuesday", "wednesday", etc., or None for all data
DATA_DIR = "./data/crosswords"
dataset = load_crosswords_data(DATA_DIR, DIFFICULTY)

print(len(dataset.train), len(dataset.dev), len(dataset.test))

Let's inspect an example from the training set.

In [None]:
example = dataset.train[0]
print("Puzzle:")
print(example.puzzle)
print("\nSolution:")
print(example.solution)

Let's define a simple DSPy program to solve crossword puzzles.

Unlike dspy.GEPA that can take an instantiated DSPy module as input, here, we want to evolve the full DSPy program. Hence, a candidate here is the source code as string. The seed program does not need to be sophisticated, it just needs to demonstrate what the expected input/output interface is, and possibly the available tools. You can also include any additional information about the environment as a comment.

In [None]:
program_src = """import dspy
class CrosswordSolver(dspy.Module):
    def forward(self, puzzle: str) -> dspy.Prediction:
        return "solution"

program = CrosswordSolver()"""

GEPA interfaces with external frameworks through an adapter. In this case, we integrate GEPA with a DspyAdapter.

In [None]:
from score_strict import score

def metric_fn(example, pred, trace=None):
    """Metric function that provides detailed feedback for GEPA using strict crosswords scoring"""
    if not hasattr(pred, 'solution'):
        return dspy.Prediction(score=0.0, feedback="No solution field found in prediction.")

    raw_solution = pred.solution.strip() if pred.solution else ""
    expected_solution = example.solution.strip()

    # Use the strict scoring function
    try:
        score_value, correct_count, total_count = score(expected_solution, raw_solution)

        feedback = f"{correct_count}/{total_count} squares correct ({score_value:.1%})."
        if score_value != 1.0:
            feedback += "\n\nExpected solution:\n" + expected_solution

    except Exception as e:
        score_value = 0.0
        feedback = f"Error in scoring: {str(e)}"

    return dspy.Prediction(score=score_value, feedback=feedback)

In [None]:
# Test the crosswords scoring logic with a simple example
test_solution = """A B C
D E F
G H I

Across:
  1. ABC
  4. DEF
  7. GHI

Down:
  1. ADG
  2. BEH
  3. CFI"""

# Test with correct solution
example = dspy.Example(puzzle="test", solution=test_solution)
pred = dspy.Prediction(solution=test_solution)
metric_result = metric_fn(example, pred)
print(f"Perfect match - Score: {metric_result.score}")
print(f"Feedback: {metric_result.feedback}")
print()

# Test with partial solution
partial_solution = """A B C
D . F
G H I

Across:
  1. ABC
  4. D.F
  7. GHI

Down:
  1. ADG
  2. B.H
  3. CFI"""
pred = dspy.Prediction(solution=partial_solution)
metric_result = metric_fn(example, pred)
print(f"Partial match - Score: {metric_result.score}")
print(f"Feedback: {metric_result.feedback}")
print()

# Test with empty solution
pred = dspy.Prediction(solution="")
metric_result = metric_fn(example, pred)
print(f"Empty solution - Score: {metric_result.score}")
print(f"Feedback: {metric_result.feedback}")

In [None]:
from custom_full_program_adapter.full_program_adapter import DspyAdapter
from lm import get_dspy_lm

SCAFFOLDER_NAME = "openai/o3"
# EXECUTOR_NAME = "openai/gpt-4.1-nano"
# SCAFFOLDER_NAME = "deepinfra/Qwen/Qwen3-Next-80B-A3B-Thinking"
EXECUTOR_NAME = "deepinfra/Qwen/Qwen2.5-72B-Instruct"

reflection_lm = get_dspy_lm(SCAFFOLDER_NAME, max_tokens=32000, temperature=1.0)
adapter = DspyAdapter(
    task_lm=get_dspy_lm(EXECUTOR_NAME, max_tokens=32000, temperature=1.0),
    metric_fn=metric_fn,
    num_threads=80,
    reflection_lm=lambda x: reflection_lm(x)[0],
)

Let's evaluate the base program

In [None]:
o = adapter.evaluate(dataset.test, {"program": program_src})

The base program obtains a score shown above

Let's launch the GEPA optimization.

In [None]:
from gepa import optimize
from datetime import datetime

now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
o = optimize(
    seed_candidate={"program": program_src},
    trainset=dataset.train,
    valset=dataset.dev,
    adapter=adapter,
    reflection_lm=lambda x: reflection_lm(x)[0],
    max_metric_calls=1000,
    reflection_minibatch_size=3,
    display_progress_bar=True,
    run_dir=f"outputs/crosswords_{DIFFICULTY}_{now}"
)

Let's see the DSPy program found by GEPA

In [None]:
print(o.best_candidate["program"])

In [None]:
o

In [None]:
print("Best index:")
print(o.best_idx)
print()
print("Number of validation instances where candidate is the best:")
for i in range(len(o.candidates)):
    print(sum(i in s for s in o.per_val_instance_best_candidates))
print()
print("Validation aggregate scores:")
for score in o.val_aggregate_scores:
    print(score)
print()
print("Parents:")
for c in o.parents:
    print(c)


Evaluating the optimized program

In [None]:
_ = adapter.evaluate([dataset.test[0]], o.best_candidate)

In [None]:
human_src = r'''import dspy

class ClueSolvingSignature(dspy.Signature):
    """
    Given a crossword puzzle clue, output the precise answer that fits the clue. 
    Ensure that the answer is a common crossword solution that matches the clue exactly.
    """
    clue: str = dspy.InputField(desc="The textual clue for the crossword")
    answer: str = dspy.OutputField(desc="The correct word or phrase for the clue")

class CrosswordSolver(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predictor = dspy.ChainOfThought(ClueSolvingSignature)
    
    def forward(self, puzzle: str) -> dspy.Prediction:
        lines = puzzle.strip().split('\n')
        grid = []
        across_clues = []
        down_clues = []
        current_section = None
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            if "Across:" in line:
                current_section = "across"
                continue
            if "Down:" in line:
                current_section = "down"
                continue
            if current_section == "across":
                across_clues.append(line)
            elif current_section == "down":
                down_clues.append(line)
        
        answers_dict = {}
        
        for clue_line in across_clues:
            parts = clue_line.split('.', 1)
            clue_text = parts[1].strip() if len(parts) > 1 else clue_line
            ans = self.predictor(clue=clue_text).answer
            if parts[0].strip():
                answers_dict[parts[0].strip()] = ans
        
        for clue_line in down_clues:
            parts = clue_line.split('.', 1)
            clue_text = parts[1].strip() if len(parts) > 1 else clue_line
            ans = self.predictor(clue=clue_text).answer
            if parts[0].strip():
                answers_dict[parts[0].strip()] = ans
        
        across_str = "Across:\n"
        for clue_line in across_clues:
            num = clue_line.split('.', 1)[0].strip() if '.' in clue_line else ""
            if num:
                across_str += f"  {num}. {answers_dict.get(num, 'UNK')}\n"
        
        down_str = "Down:\n"
        for clue_line in down_clues:
            num = clue_line.split('.', 1)[0].strip() if '.' in clue_line else ""
            if num:
                down_str += f"  {num}. {answers_dict.get(num, 'UNK')}\n"
        
        solution_str = f"{across_str}\n{down_str}"
        return dspy.Prediction(solution=solution_str)

program = CrosswordSolver()
'''

human_eval = adapter.evaluate([dataset.test[0]], {"program": human_src})
print(human_eval)