In [1]:
from datasets import load_dataset

ds = load_dataset("dataartist/arc-agi")

In [2]:
import dspy

trainset = [
    dspy.Example(
        training_examples=ex["train"],
        test_inputs=[x["input"] for x in ex["test"]],
        test_outputs=[x["output"] for x in ex["test"]],
    ).with_inputs("training_examples", "test_inputs")
    for ex in ds["training"]
]
testset = [
    dspy.Example(
        training_examples=ex["train"],
        test_inputs=[x["input"] for x in ex["test"]],
        test_outputs=[x["output"] for x in ex["test"]],
    ).with_inputs("training_examples", "test_inputs")
    for ex in ds["evaluation"]
]

In [3]:
len(trainset), len(testset)

(400, 400)

In [4]:
program_src = """import dspy
from typing import List
import pydantic

MATRIX = List[List[int]]

class TrainingExample(pydantic.BaseModel):
    input: MATRIX
    output: MATRIX

class SolveTaskSignature(dspy.Signature):
    training_examples: List[TrainingExample] = dspy.InputField(description="Input and output examples demonstrating the task to be performed.")
    test_inputs: List[MATRIX] = dspy.InputField(description="Input matrices to be solved following the task described in the training examples.")
    test_outputs: List[MATRIX] = dspy.OutputField(description="Output matrices corresponding to the test inputs.")

program = dspy.ChainOfThought(SolveTaskSignature)"""

In [5]:
from gepa.adapters.dspy_full_program_adapter.full_program_adapter import DspyAdapter

In [13]:
def is_valid_matrix(matrix, gold_matrix):
    if not isinstance(matrix, list):
        return False, f"The matrix must be a List[List[int]]. The correct matrix is {gold_matrix}."
    n = len(matrix)
    if n == 0:
        return False, f"The matrix must have at least one row. The correct matrix is {gold_matrix}."
    m = len(matrix[0])
    if m == 0:
        return False, f"The matrix must have at least one column. The correct matrix is {gold_matrix}."
    for i in range(n):
        if not isinstance(matrix[i], list):
            return False, f"The {i}-th row must be a List[int]. The correct matrix is {gold_matrix}."
        if len(matrix[i]) != m:
            return (
                False,
                f"The matrix is staggered. Row 0 has {m} columns, but row {i} has {len(matrix[i])} columns. The correct matrix is {gold_matrix}.",
            )
        for j in range(m):
            if not isinstance(matrix[i][j], int):
                return (
                    False,
                    f"The {i}-th row, {j}-th column must be an int, found {type(matrix[i][j])}. The correct matrix is {gold_matrix}.",
                )

    # Check consistency with gold matrix
    gold_n = len(gold_matrix)
    gold_m = len(gold_matrix[0])
    if (n, m) != (gold_n, gold_m):
        return (
            False,
            f"The matrix has dimensions {n}x{m}, but the gold matrix has dimensions {gold_n}x{gold_m}. The correct matrix is {gold_matrix}.",
        )

    same = True
    wrong_indices = []
    for i in range(n):
        for j in range(m):
            if matrix[i][j] != gold_matrix[i][j]:
                same = False
                wrong_indices.append((i, j))
    if same:
        return True, f"Your response is correct. The correct matrix is {gold_matrix}."
    else:
        if len(wrong_indices) < 10:
            return (
                False,
                f"The matrix is incorrect. The following indices are incorrect: {wrong_indices}. The correct matrix is {gold_matrix}.",
            )
        else:
            return False, f"The matrix is incorrect. The correct matrix is {gold_matrix}."


def metric_fn(example, pred, trace=None):
    task_inputs = example.test_inputs
    gold_task_outputs = example.test_outputs
    pred_task_outputs = pred.test_outputs

    if not isinstance(pred_task_outputs, list):
        return dspy.Prediction(
            score=0,
            feedback=f"The response must be a List[List[List[int]]]. The correct response is {gold_task_outputs}.",
        )

    valids = []
    feedbacks = []
    feedback = ""
    if len(task_inputs) != len(pred_task_outputs):
        feedback = f"The number of output matrices ({len(pred_task_outputs)}) must match the number of input matrices ({len(task_inputs)}). The correct response is {gold_task_outputs}."
        return dspy.Prediction(score=0, feedback=feedback)
    for i, (input, gold_output, pred_output) in enumerate(
        zip(task_inputs, gold_task_outputs, pred_task_outputs, strict=False)
    ):
        is_valid, feedback = is_valid_matrix(pred_output, gold_output)
        valids.append(is_valid)
        feedbacks.append(f"Feedback on test input {i}: {feedback}")

    score = sum(valids) / len(valids)
    feedback_text = "\n".join(feedbacks)
    return dspy.Prediction(score=score, feedback=feedback_text)

In [7]:
gemini_api_key = input("GEMINI_API_KEY: ")

In [33]:
# reflection_lm = dspy.LM(model="openai/gpt-4.1", max_tokens=32000, temperature=1)  # temperature=1

reflection_lm = dspy.LM(model="gemini/gemini-2.5-pro", max_tokens=32000, api_key=gemini_api_key)
adapter = DspyAdapter(
    task_lm=dspy.LM(model="gemini/gemini-2.5-pro", max_tokens=32000, api_key=gemini_api_key),
    metric_fn=metric_fn,
    num_threads=80,
    reflection_lm=lambda x: reflection_lm(x)[0],
)

Let's evaluate the base program

In [9]:
import random

random.Random(0).shuffle(trainset)

test_set = testset
val_set = trainset[-200:]
train_set = [ex for ex in trainset[:-200]]

In [10]:
len(train_set), len(val_set), len(test_set)

(200, 200, 400)

In [28]:
o_base = adapter.evaluate(test_set, {"program": program_src})

2025/08/30 04:55:07 INFO dspy.evaluate.evaluate: Average Metric: 176.0 / 400 (44.0%)


The base program obtains a score of 55.8%

Let's launch the GEPA optimization.

In [35]:
from gepa import optimize

o = optimize(
    seed_candidate={"program": program_src},
    trainset=train_set,
    valset=val_set,
    adapter=adapter,
    reflection_lm=lambda x: reflection_lm(x)[0],
    max_metric_calls=4000,
    display_progress_bar=True,
)



KeyboardInterrupt: 

Let's see the DSPy program found by GEPA

In [30]:
print(o.best_candidate["program"])

import dspy
from typing import List, Tuple, Optional
import pydantic
import copy
import traceback

# Define type aliases and pydantic models for clarity and structure.
MATRIX = List[List[int]]

class TrainingExample(pydantic.BaseModel):
    input: MATRIX
    output: MATRIX

# --- Signatures ---

class HypothesizeRule(dspy.Signature):
    """
    Analyze the provided input/output matrix pairs from the Abstraction and Reasoning Corpus (ARC).
    Deduce the single, underlying transformation rule that converts each input matrix to its corresponding output matrix.
    Describe this rule in clear, step-by-step, unambiguous English. Focus on the logic, not Python code.

    **Successful Strategies to Consider:**
    - **Start Simple:** First, check for simple rules. Is there a global transformation (e.g., rotation, reflection)? Is the output a subgrid of the input? Is a single color being replaced?
    - **Look for Separators:** Check if the grid is partitioned by separator lines (e.g., rows 

Evaluating the optimized program

In [29]:
o_opt = adapter.evaluate(test_set, o.best_candidate)

GEPA Optimization:   6%|██████▎                                                                                                     | 236/4000 [31:29:39<502:18:26, 480.42s/rollouts]
2025/08/30 04:55:24 INFO dspy.evaluate.evaluate: Average Metric: 198.0 / 400 (49.5%)


We see it going from **67% to 93%** in just a few rounds of optimization!

In [24]:
import pickle

with open("arc_agi_gepa_result_gemini.bin", "wb") as f:
    pickle.dump(o, f)