# Exercise 1

Use DSPy (or a simplified version if DSPy isn’t accessible) to optimize a multi-step QA pipeline. For example, pipeline: (1) retrieve relevant text from a small corpus, (2) ask LLM to answer question given retrieved text. Define the metric as accuracy of answer. Let the system tune the retrieval prompt and answer prompt. Observe what changes it makes (e.g. does it add “Let’s think step by step” automatically?). Report the before vs after performance.

## Solution

In [59]:
# Load environment variables from a repo-root `.env` (no extra dependencies)

import os
from pathlib import Path


def _find_dotenv(start: Path | None = None, max_up: int = 6) -> Path | None:
    p = (start or Path.cwd()).resolve()
    for _ in range(max_up + 1):
        candidate = p / ".env"
        if candidate.exists():
            return candidate
        if p.parent == p:
            break
        p = p.parent
    return None


def _load_env_file(dotenv_path: Path) -> None:
    for raw in dotenv_path.read_text().splitlines():
        line = raw.strip()
        if not line or line.startswith("#"):
            continue
        if line.startswith("export "):
            line = line[len("export ") :]
        if "=" not in line:
            continue
        key, value = line.split("=", 1)
        key = key.strip()
        value = value.strip()
        # strip simple quotes
        if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
            value = value[1:-1]
        os.environ.setdefault(key, value)


dotenv_path = _find_dotenv()
if dotenv_path:
    _load_env_file(dotenv_path)
    print(f"Loaded .env from: {dotenv_path}")
else:
    print("No .env found while searching upward from the current working directory.")

print("OPENAI_API_KEY set:", bool(os.getenv("OPENAI_API_KEY")))



Loaded .env from: /home/luigi/Programming/Onboarding/.env
OPENAI_API_KEY set: True


In [None]:
import os
import random
import re

import numpy as np
import dspy
from openai import OpenAI

# Cheaper embeddings are fine for synthetic corpora; switch to -large if you want max recall.
EMBED_MODEL = "text-embedding-3-small"
# DSPy v3.x uses LiteLLM-style names: "provider/model"
LM_MODEL = "openai/gpt-4.1-mini"

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise RuntimeError(
        "Missing OPENAI_API_KEY. In a notebook, set it with `%env OPENAI_API_KEY=...` "
        "or export it in your shell before starting Jupyter."
    )

client = OpenAI(api_key=OPENAI_API_KEY)

# DSPy v3.x
lm = dspy.LM(model=LM_MODEL, max_tokens=128)

# Configure DSPy to use the LLM
if hasattr(dspy, "settings"):
    dspy.settings.configure(lm=lm)

random.seed(42)


In [34]:
TOP_K = 3
NUM_DOCS = 60
QA_PER_DOC = 2
TRAIN_FRAC = 0.8

_rng = random.Random(42)

_adjs = ["Aurora","Nimbus","Orion","Kestrel","Zephyr","Raven","Maple","Helix","Osprey","Forge","Slate","Cedar","Lumen","Delta","Vesta","Atlas","Nova","Redstone","Aster","Northbridge"]
_nouns = ["Project","Protocol","Battery","Clinic","Drone","Study","Sensor","API","Route","Compiler","Festival","Satellite","Plant","Library"]
_people = ["Mara Ortiz","Jun Park","Amina Khan","Elena Petrov","Sam Rivera","Noah Chen","Ivy Patel","Luca Rossi","Fatima Ali","Owen Brooks","Hana Suzuki","Diego Silva"]
_cities = ["Portland","Austin","Berlin","Toronto","Lisbon","Oslo","Seoul","Kyoto","Nairobi","Lima","Dublin","Prague"]


def make_name(kind):
    a=_rng.choice(_adjs)
    b=_rng.choice(_adjs)
    # encourage shared tokens to make lexical matching harder
    if _rng.random() < 0.35:
        b = a
    return f"{a} {kind} {b}" if kind in {"Project","Study","Route"} else f"{a} {b} {kind}"


def generate_doc(doc_id):
    kind = _rng.choice(_nouns)

    if kind == "Project":
        name = make_name("Project")
        lead = _rng.choice(_people)
        budget = round(_rng.uniform(1.5, 9.5), 1)
        hq = _rng.choice(_cities)
        year = _rng.randint(2017, 2024)
        text = (
            f"{name}'s lead engineer is {lead}. "
            f"The project budget was {budget} million dollars. "
            f"The project started in {year} and is headquartered in {hq}."
        )
        qas = [
            (f"Who is the lead engineer for {name}?", lead),
            (f"What was the budget for {name}?", f"{budget} million dollars"),
            (f"Where is {name} headquartered?", hq),
        ]

    elif kind == "Battery":
        name = make_name("Battery")
        cap = round(_rng.uniform(2.0, 9.0), 1)
        mins = _rng.randint(12, 38)
        pct = _rng.choice([70, 75, 80, 85])
        text = (
            f"The {name} has a capacity of {cap} kWh and charges to {pct} percent in {mins} minutes."
        )
        qas = [
            (f"What is the capacity of the {name}?", f"{cap} kWh"),
            (f"How long does the {name} take to reach {pct} percent?", f"{mins} minutes"),
            (f"To what percent does the {name} charge in {mins} minutes?", f"{pct} percent"),
        ]

    elif kind == "Clinic":
        name = make_name("Clinic")
        system = _rng.choice(["Atlas","Aster","Nova","Vesta","Redstone"])
        city = _rng.choice(_cities)
        year = _rng.randint(2016, 2024)
        text = f"The {name} runs on the {system} scheduling system. The clinic opened in {city} in {year}."
        qas = [
            (f"Which scheduling system does the {name} use?", system),
            (f"In which city did the {name} open?", city),
            (f"What year did the {name} open?", str(year)),
        ]

    elif kind == "Drone":
        name = make_name("Drone")
        speed = _rng.choice([120, 130, 140, 150, 160])
        end = _rng.choice([42, 50, 55, 60, 68])
        text = f"The {name}'s top speed is {speed} kilometers per hour. Its endurance is {end} minutes."
        qas = [
            (f"What is the top speed of the {name}?", f"{speed} kilometers per hour"),
            (f"What is the endurance of the {name}?", f"{end} minutes"),
            (f"How long is the endurance of the {name}?", f"{end} minutes"),
        ]

    elif kind == "Protocol":
        name = make_name("Protocol")
        key = _rng.choice([128, 192, 256, 384])
        year = _rng.randint(2015, 2023)
        text = f"The {name} encrypts data using a {key}-bit key. It was ratified in {year}."
        qas = [
            (f"What key size does the {name} use for encryption?", f"{key}-bit"),
            (f"In what year was the {name} ratified?", str(year)),
            (f"Which year was the {name} ratified?", str(year)),
        ]

    elif kind == "Study":
        name = make_name("Study")
        sessions = _rng.choice([8, 10, 12, 14, 16])
        improve = _rng.choice([12, 15, 18, 21, 24])
        text = (
            f"In the {name}, participants completed {sessions} sessions. "
            f"The primary outcome improved by {improve} percent."
        )
        qas = [
            (f"How many sessions were completed in the {name}?", str(sessions)),
            (f"By what percent did the primary outcome improve in the {name}?", f"{improve} percent"),
            (f"What was the percent improvement in the {name}?", f"{improve} percent"),
        ]

    elif kind == "Sensor":
        name = make_name("Sensor")
        material = _rng.choice(["sapphire","quartz","ceramic","glass"])
        diam = _rng.choice([7, 8, 9, 10, 11])
        text = f"The {name} uses a {material} lens. The lens diameter is {diam} millimeters."
        qas = [
            (f"What material is the {name}'s lens made of?", material),
            (f"What is the diameter of the {name}'s lens?", f"{diam} millimeters"),
            (f"How wide is the {name}'s lens diameter?", f"{diam} millimeters"),
        ]

    elif kind == "API":
        name = make_name("API")
        limit = _rng.choice([60, 90, 120, 150, 200])
        outs = _rng.choice([
            "JSON and CSV",
            "JSON and XML",
            "CSV and Parquet",
            "JSON and YAML",
        ])
        text = f"The {name} has a default rate limit of {limit} requests per minute. It supports {outs} outputs."
        qas = [
            (f"What is the default rate limit of the {name}?", f"{limit} requests per minute"),
            (f"Which outputs does the {name} support?", outs),
            (f"What outputs does the {name} support?", outs),
        ]

    elif kind == "Route":
        name = make_name("Route")
        km = _rng.choice([420, 480, 540, 610, 690])
        day = _rng.choice(["Mondays","Tuesdays","Wednesdays","Thursdays","Fridays"])
        text = f"Cargo {name} covers {km} kilometers and departs on {day}."
        qas = [
            (f"How many kilometers does {name} cover?", f"{km} kilometers"),
            (f"On what day does {name} depart?", day),
            (f"Which day does {name} depart?", day),
        ]

    elif kind == "Compiler":
        name = make_name("Compiler")
        vm = _rng.choice(["Vesta VM","Nova VM","Atlas VM","Redstone VM"])
        ver = f"{_rng.randint(1,4)}.{_rng.randint(0,9)}"
        text = f"The {name} targets the {vm}. The latest release is version {ver}."
        qas = [
            (f"Which VM does the {name} target?", vm),
            (f"What is the latest release version of the {name}?", ver),
            (f"What version is the latest release of the {name}?", ver),
        ]

    elif kind == "Festival":
        name = make_name("Festival")
        days = _rng.choice([2, 3, 4, 5])
        month = _rng.choice(["June","July","August","September"])
        date = _rng.randint(10, 24)
        text = f"The {name} lasts {days} days and begins on {month} {date}."
        qas = [
            (f"How long does the {name} last?", f"{days} days"),
            (f"On what date does the {name} begin?", f"{month} {date}"),
            (f"When does the {name} begin?", f"{month} {date}"),
        ]

    elif kind == "Satellite":
        name = make_name("Satellite")
        alt = _rng.choice([520, 620, 710, 840])
        freq = _rng.choice(["7.6 GHz","8.2 GHz","9.1 GHz","10.4 GHz"])
        text = f"The {name} orbits at {alt} kilometers. Its downlink frequency is {freq}."
        qas = [
            (f"At what altitude does the {name} orbit?", f"{alt} kilometers"),
            (f"What is the downlink frequency of the {name}?", freq),
            (f"Which frequency is the downlink of the {name}?", freq),
        ]

    elif kind == "Plant":
        name = make_name("Plant")
        rate = _rng.choice([45, 55, 65, 75, 85])
        mold = _rng.choice(["cobalt alloy","titanium alloy","steel","ceramic"])
        text = f"The {name} produces {rate} units per hour. It uses {mold} molds."
        qas = [
            (f"How many units per hour does the {name} produce?", f"{rate} units per hour"),
            (f"What type of molds does the {name} use?", mold),
            (f"Which molds does the {name} use?", mold),
        ]

    else:  # Library
        name = make_name("Library")
        py = _rng.choice(["Python 3.9","Python 3.10","Python 3.11"])
        ser = _rng.choice(["Nova","Atlas","Aster","Redstone"]) + " serializer"
        text = f"The {name} requires {py}. It introduces the {ser}."
        qas = [
            (f"Which Python version does the {name} require?", py),
            (f"What serializer does the {name} introduce?", ser.split(' ',1)[0] if ' ' in ser else ser),
            (f"What does the {name} introduce?", ser),
        ]

    # pick QA_PER_DOC questions from this doc
    _rng.shuffle(qas)
    qas = qas[:QA_PER_DOC]

    return {"id": doc_id, "text": text}, [
        {"question": q, "answer": a, "doc_id": doc_id}
        for (q,a) in qas
    ]


# Build corpus + QAs
_docs = []
_qa = []
for i in range(NUM_DOCS):
    d, qas = generate_doc(i)
    _docs.append(d)
    _qa.extend(qas)

docs = _docs
qa_pairs = _qa

_rng.shuffle(qa_pairs)
cut = int(len(qa_pairs) * TRAIN_FRAC)
train_pairs = qa_pairs[:cut]
dev_pairs = qa_pairs[cut:]

trainset = [
    dspy.Example(question=p["question"], answer=p["answer"], doc_id=p["doc_id"]).with_inputs("question")
    for p in train_pairs
]

devset = [
    dspy.Example(question=p["question"], answer=p["answer"], doc_id=p["doc_id"]).with_inputs("question")
    for p in dev_pairs
]

print(f"docs={len(docs)} qa_pairs={len(qa_pairs)} train={len(trainset)} dev={len(devset)} TOP_K={TOP_K}")


docs=60 qa_pairs=120 train=96 dev=24 TOP_K=3


In [35]:
# Embeddings-based retrieval (top-k)

def embed_texts(texts, model=EMBED_MODEL, batch_size=64):
    # Batch to avoid provider limits when corpus grows.
    embs = []
    for start in range(0, len(texts), batch_size):
        batch = texts[start : start + batch_size]
        resp = client.embeddings.create(model=model, input=batch)
        data = sorted(resp.data, key=lambda x: x.index)
        embs.extend([d.embedding for d in data])
    return np.array(embs)

_doc_texts = [d["text"] for d in docs]
_doc_embeddings = embed_texts(_doc_texts)
_doc_norms = np.linalg.norm(_doc_embeddings, axis=1)

_query_cache = {}

def embed_query(text):
    if text not in _query_cache:
        _query_cache[text] = embed_texts([text])[0]
    return _query_cache[text]

def retrieve(query, k=TOP_K):
    q_emb = embed_query(query)
    denom = _doc_norms * (np.linalg.norm(q_emb) + 1e-9)
    sims = (_doc_embeddings @ q_emb) / denom
    topk = np.argsort(sims)[-k:][::-1]
    context = "\n\n".join([f"[{i}] {docs[i]['text']}" for i in topk])
    return context, topk.tolist()


In [36]:
# DSPy module: query rewrite -> retrieve -> answer

def normalize_text(text):
    text = text.lower().strip()
    text = re.sub(r"[^a-z0-9 ]+", "", text)
    text = re.sub(r"\s+", " ", text)
    return text

def exact_match(pred, gold):
    return normalize_text(pred) == normalize_text(gold)

class QueryRewrite(dspy.Signature):
    # Rewrite a question into a search-friendly query.
    question = dspy.InputField()
    query = dspy.OutputField(desc="concise search query with key entities")

class AnswerQuestion(dspy.Signature):
    # Answer using the provided context only.
    context = dspy.InputField(desc="retrieved passages")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="short exact answer copied from context")

class QAWithRewrite(dspy.Module):
    def __init__(self, k=TOP_K):
        super().__init__()
        self.k = k
        self.rewrite = dspy.Predict(QueryRewrite)
        self.answer = dspy.Predict(AnswerQuestion)

    def forward(self, question):
        rewritten = self.rewrite(question=question).query
        context, ids = retrieve(rewritten, k=self.k)
        pred = self.answer(context=context, question=question)
        return dspy.Prediction(
            answer=pred.answer,
            rewritten_query=rewritten,
            context_ids=ids,
        )


def evaluate(module, dataset, desc="Baseline eval"):
    """Evaluate with a progress bar (uses tqdm if installed)."""
    try:
        from tqdm.auto import tqdm  # type: ignore
        iterator = tqdm(dataset, total=len(dataset), desc=desc)
    except Exception:
        iterator = dataset

    correct = 0
    retrieval_hits = 0
    for ex in iterator:
        pred = module(question=ex.question)
        if exact_match(pred.answer, ex.answer):
            correct += 1
        if ex.doc_id in getattr(pred, "context_ids", []):
            retrieval_hits += 1
    total = len(dataset)
    return {
        "accuracy": correct / total,
        "retrieval_hit_rate": retrieval_hits / total,
    }


In [37]:
# Baseline evaluation
baseline = QAWithRewrite(k=TOP_K)
baseline_metrics = evaluate(baseline, devset)
baseline_metrics


  from .autonotebook import tqdm as notebook_tqdm
  PydanticSerializationUnexpectedValue(Expected 10 fields but got 6: Expected `Message` - serialized value may not be as expected [field_name='message', input_value=Message(content='[[ ## qu...: None}, annotations=[]), input_type=Message])
  PydanticSerializationUnexpectedValue(Expected `StreamingChoices` - serialized value may not be as expected [field_name='choices', input_value=Choices(finish_reason='st...ider_specific_fields={}), input_type=Choices])
  return self.__pydantic_serializer__.to_python(
  PydanticSerializationUnexpectedValue(Expected 10 fields but got 6: Expected `Message` - serialized value may not be as expected [field_name='message', input_value=Message(content='[[ ## an...: None}, annotations=[]), input_type=Message])
  PydanticSerializationUnexpectedValue(Expected `StreamingChoices` - serialized value may not be as expected [field_name='choices', input_value=Choices(finish_reason='st...ider_specific_fields={}), inpu

{'accuracy': 0.6666666666666666, 'retrieval_hit_rate': 1.0}

In [38]:
# DSPy optimization (query-rewrite + answer prompt)
from dspy.teleprompt import BootstrapFewShotWithRandomSearch

# DSPy passes (example, prediction, trace) to metrics.
def combined_metric(example, pred, trace=None):
    if not pred or not hasattr(pred, "answer"):
        return 0
    answer_ok = exact_match(pred.answer, example.answer)
    context_ok = example.doc_id in getattr(pred, "context_ids", [])
    return 1 if (answer_ok and context_ok) else 0

teleprompter = BootstrapFewShotWithRandomSearch(
    metric=combined_metric,
    max_bootstrapped_demos=3,
    max_labeled_demos=3,
    num_candidate_programs=3,
)

optimized = teleprompter.compile(baseline, trainset=trainset)
optimized_metrics = evaluate(optimized, devset)
optimized_metrics


Going to sample between 1 and 3 traces per predictor.
Will attempt to bootstrap 3 candidate sets.
Average Metric: 72.00 / 96 (75.0%): 100%|██████████| 96/96 [00:19<00:00,  4.87it/s]

2026/01/08 11:44:50 INFO dspy.evaluate.evaluate: Average Metric: 72 / 96 (75.0%)



New best score: 75.0 for seed -3
Scores so far: [75.0]
Best score so far: 75.0
Average Metric: 76.00 / 96 (79.2%): 100%|██████████| 96/96 [00:11<00:00,  8.51it/s]

2026/01/08 11:45:01 INFO dspy.evaluate.evaluate: Average Metric: 76 / 96 (79.2%)



New best score: 79.17 for seed -2
Scores so far: [75.0, 79.17]
Best score so far: 79.17


  4%|▍         | 4/96 [00:00<00:13,  7.02it/s]


Bootstrapped 3 full traces after 4 examples for up to 1 rounds, amounting to 4 attempts.
Average Metric: 76.00 / 96 (79.2%): 100%|██████████| 96/96 [00:21<00:00,  4.43it/s]

2026/01/08 11:45:24 INFO dspy.evaluate.evaluate: Average Metric: 76 / 96 (79.2%)



Scores so far: [75.0, 79.17, 79.17]
Best score so far: 79.17


  2%|▏         | 2/96 [00:02<01:37,  1.04s/it]


Bootstrapped 2 full traces after 2 examples for up to 1 rounds, amounting to 2 attempts.
Average Metric: 72.00 / 96 (75.0%): 100%|██████████| 96/96 [00:19<00:00,  4.97it/s]

2026/01/08 11:45:45 INFO dspy.evaluate.evaluate: Average Metric: 72 / 96 (75.0%)



Scores so far: [75.0, 79.17, 79.17, 75.0]
Best score so far: 79.17


  1%|          | 1/96 [00:00<01:17,  1.23it/s]


Bootstrapped 1 full traces after 1 examples for up to 1 rounds, amounting to 1 attempts.
Average Metric: 77.00 / 96 (80.2%): 100%|██████████| 96/96 [00:17<00:00,  5.46it/s]


2026/01/08 11:46:04 INFO dspy.evaluate.evaluate: Average Metric: 77 / 96 (80.2%)


New best score: 80.21 for seed 1
Scores so far: [75.0, 79.17, 79.17, 75.0, 80.21]
Best score so far: 80.21


  1%|          | 1/96 [00:00<01:17,  1.22it/s]


Bootstrapped 1 full traces after 1 examples for up to 1 rounds, amounting to 1 attempts.
Average Metric: 76.00 / 96 (79.2%): 100%|██████████| 96/96 [00:19<00:00,  4.85it/s]

2026/01/08 11:46:25 INFO dspy.evaluate.evaluate: Average Metric: 76 / 96 (79.2%)



Scores so far: [75.0, 79.17, 79.17, 75.0, 80.21, 79.17]
Best score so far: 80.21
6 candidate programs found.


Baseline eval: 100%|██████████| 24/24 [00:39<00:00,  1.66s/it]


{'accuracy': 0.875, 'retrieval_hit_rate': 1.0}

In [44]:
# What improved? (baseline vs optimized on dev)

print("Baseline metrics:", baseline_metrics)
print("Optimized metrics:", optimized_metrics)

def show_case(i, ex, base_pred, opt_pred):
    print("\n" + "="*80)
    print(f"[dev #{i}] Q:", ex.question)
    print("Gold:", ex.answer, "| gold doc:", ex.doc_id)
    print("- Baseline")
    print("  rewritten:", getattr(base_pred, "rewritten_query", None))
    print("  context_ids:", getattr(base_pred, "context_ids", None))
    print("  answer:", getattr(base_pred, "answer", None))
    print("  answer_ok:", exact_match(base_pred.answer, ex.answer))
    print("- Optimized")
    print("  rewritten:", getattr(opt_pred, "rewritten_query", None))
    print("  context_ids:", getattr(opt_pred, "context_ids", None))
    print("  answer:", getattr(opt_pred, "answer", None))
    print("  answer_ok:", exact_match(opt_pred.answer, ex.answer))

improved = []
regressed = []

for i, ex in enumerate(devset):
    base_pred = baseline(question=ex.question)
    opt_pred  = optimized(question=ex.question)

    base_ok = exact_match(base_pred.answer, ex.answer)
    opt_ok  = exact_match(opt_pred.answer, ex.answer)

    if (not base_ok) and opt_ok:
        improved.append((i, ex, base_pred, opt_pred))
    elif base_ok and (not opt_ok):
        regressed.append((i, ex, base_pred, opt_pred))

print(f"\nImproved cases (wrong→right): {len(improved)}")
print(f"Regressions (right→wrong): {len(regressed)}")

# Show a few examples
for tup in improved[:5]:
    show_case(*tup)

if regressed:
    print("\nShowing regressions:")
    for tup in regressed[:3]:
        show_case(*tup)

# Optional: inspect learned few-shot demos (“winning prompts”)
def show_demos(label, prog):
    print("\n" + "-"*80)
    print(label)
    try:
        preds = getattr(prog, "predictors", lambda: [])()
        for p in preds:
            print("\nPREDICTOR:", type(p).__name__)
            demos = getattr(p, "demos", None)
            if not demos:
                print("  (no demos)")
                continue
            print(f"  demos: {len(demos)}")
            for d in demos[:3]:
                # demos are dspy.Example-like objects
                print("   -", d)
    except Exception as e:
        print("Could not inspect demos:", e)

show_demos("Baseline demos", baseline)
show_demos("Optimized demos", optimized)

Baseline metrics: {'accuracy': 0.6666666666666666, 'retrieval_hit_rate': 1.0}
Optimized metrics: {'accuracy': 0.875, 'retrieval_hit_rate': 1.0}



Improved cases (wrong→right): 5
Regressions (right→wrong): 0

[dev #1] Q: What is the latest release version of the Northbridge Nimbus Compiler?
Gold: 3.4 | gold doc: 35
- Baseline
  rewritten: latest release version Northbridge Nimbus Compiler
  context_ids: [35, 46, 33]
  answer: version 3.4
  answer_ok: False
- Optimized
  rewritten: latest release version Northbridge Nimbus Compiler
  context_ids: [35, 46, 33]
  answer: 3.4
  answer_ok: True

[dev #2] Q: What version is the latest release of the Osprey Aurora Compiler?
Gold: 4.5 | gold doc: 3
- Baseline
  rewritten: latest release version Osprey Aurora Compiler
  context_ids: [3, 39, 20]
  answer: version 4.5
  answer_ok: False
- Optimized
  rewritten: latest version Osprey Aurora Compiler
  context_ids: [3, 39, 20]
  answer: 4.5
  answer_ok: True

[dev #3] Q: What outputs does the Redstone Redstone API support?
Gold: JSON and CSV | gold doc: 29
- Baseline
  rewritten: Redstone API supported output fields
  context_ids: [29, 40, 9

### Result Analysis

- **Baseline vs optimized**:
  - Baseline dev metrics were accuracy = 0.6667 and retrieval hit rate = 1.0.
  - Optimized dev metrics were accuracy = 0.875 and retrieval hit rate = 1.0.

- **Which metric improved**:
   - Accuracy improved (0.6667 → 0.875).
   - Retrieval hit rate did not change (stayed 1.0), the correct doc was already being retrieved.

- **Why accuracy improved**:
  - Most failures were answer formatting, not missing knowledge.
  - Baseline often added extra words like "version 3.4" or "JSON and CSV outputs".
  - Our metric is strict exact match, so those extra words count as wrong.
  - The optimized program learned (via DSPy’s compiled demos) to output the short exact span (e.g., "3.4", "JSON and CSV").

## Exercise 2

Coding: Implement a simple version of EvoPrompt. Represent a prompt as a list of tokens or words. Define two evolutionary operators: mutate (randomly replace or insert a word) and crossover (swap a segment between two prompts). Use an LLM (or a heuristic function) to evaluate fitness (e.g. BLEU score or any task-specific score) of prompts. Start with a few initial prompts and run a few generations of evolution. Did the prompts improve? This could be done on a trivial task (like prompt an LLM to output a specific keyword - evolve prompts to maximize the occurrence of that keyword in the response).

In [69]:
# EvoPrompt configuration (Exercise 2)
# Multi-example dev set: prompts are scored by exact-match accuracy across examples.

from dataclasses import dataclass
from typing import Dict, List, Tuple

try:
    from tqdm.auto import tqdm
except Exception:
    tqdm = None

EVO_LM_MODEL = "openai/gpt-4.1-mini"

POP_SIZE = 6
GENERATIONS = 5
ELITE_K = 2
P_MUTATE = 0.7
P_CROSSOVER = 0.6

# Dev set for prompt fitness (varied targets, so you can't hardcode one answer)
DEV_EXAMPLES = [
    {
        "question": "What is the latest release version?",
        "input": "The Northbridge Nimbus Compiler targets the Vesta VM. The latest release is version 3.4. The previous release was version 3.3.",
        "target": "3.4",
    },
    {
        "question": "What is the latest release version?",
        "input": "The Osprey Aurora Compiler targets the Redstone VM. The latest release is version 4.5. A beta build is labeled 4.6.",
        "target": "4.5",
    },
    {
        "question": "What is the latest release version?",
        "input": "The Raven Redstone Compiler targets the Nova VM. The latest release is version 4.6. The previous release was 4.5.",
        "target": "4.6",
    },
    {
        "question": "How many kilometers does the route cover?",
        "input": "Cargo Aurora Route Kestrel covers 690 kilometers and departs on Thursdays. A backup route covers 710 kilometers.",
        "target": "690 kilometers",
    },
    {
        "question": "On what day does the route depart?",
        "input": "Cargo Delta Route Orion covers 540 kilometers and departs on Mondays. A test run departed on Fridays.",
        "target": "Mondays",
    },
    {
        "question": "What key size is used for encryption?",
        "input": "The Zephyr Orion Protocol encrypts data using a 256-bit key. An older spec used 128-bit.",
        "target": "256-bit",
    },
    {
        "question": "In what year was it ratified?",
        "input": "The Northbridge Orion Protocol encrypts data using a 192-bit key. It was ratified in 2021. A draft appeared in 2019.",
        "target": "2021",
    },
    {
        "question": "What material is the lens made of?",
        "input": "The Slate Osprey Sensor uses a ceramic lens. The housing is aluminum.",
        "target": "ceramic",
    },
    {
        "question": "What is the lens diameter?",
        "input": "The Raven Nimbus Sensor uses a sapphire lens. The lens diameter is 9 millimeters, and the casing diameter is 11 millimeters.",
        "target": "9 millimeters",
    },
    {
        "question": "Which outputs does the API support?",
        "input": "The Maple Redstone API has a default rate limit of 120 requests per minute. It supports JSON and CSV outputs.",
        "target": "JSON and CSV",
    },
    {
        "question": "In which city did the clinic open?",
        "input": "The Atlas Kestrel Clinic runs on the Nova scheduling system. The clinic opened in Portland in 2022. Planning started in Austin.",
        "target": "Portland",
    },
    {
        "question": "How long does it take to reach 80 percent?",
        "input": "The Lumen Aurora Battery has a capacity of 4.2 kWh and charges to 80 percent in 22 minutes. A full charge takes 60 minutes.",
        "target": "22 minutes",
    },
    {
        "question": "How many participants were enrolled?",
        "input": "The Zephyr Atlas Study enrolled 240 participants in 2022 and reported 12 dropouts.",
        "target": "240",
    },
    {
        "question": "How many floors does the library have?",
        "input": "The Lumen Aster Library has 3 floors and 120 desks.",
        "target": "3",
    },
    {
        "question": "What is the project budget?",
        "input": "The Orion Nova Project budget is 1.2 million dollars; current spending is 0.8 million dollars.",
        "target": "1.2 million dollars",
    },
    {
        "question": "In what year was the satellite launched?",
        "input": "The Kestrel Aster Satellite launched in 2018 and was decommissioned in 2023.",
        "target": "2018",
    },
    {
        "question": "How many days does the festival run?",
        "input": "The Helix Northbridge Festival runs for 4 days and opens on June 3.",
        "target": "4 days",
    },
    {
        "question": "What is the capacity?",
        "input": "The Vesta Cedar Battery has a capacity of 4.2 kWh and a peak output of 2.1 kW.",
        "target": "4.2 kWh",
    },
]

# Clear cache/state when re-running
fitness_cache = {}

random.seed(42)
print("Exercise 2 configured:", EVO_LM_MODEL, "dev_examples=", len(DEV_EXAMPLES))


Exercise 2 configured: openai/gpt-4.1-mini dev_examples= 18


In [75]:
# Prompt representation + LLM-guided evolutionary operators

def to_text(tokens: List[str]) -> str:
    return " ".join(tokens).strip()


def from_text(text: str) -> List[str]:
    # simple tokenization; keeps exercise requirement (list of words)
    return [t for t in text.strip().split() if t]


def _call_evo_lm(user_prompt: str) -> str:
    """Call the Exercise 2 LLM without changing Exercise 1 global settings."""
    resp = client.chat.completions.create(
        model=EVO_LM_MODEL.replace("openai/", ""),
        messages=[
            {"role": "system", "content": "You are a helpful prompt engineer. Return only the prompt text."},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0,
        max_tokens=200,
    )
    return resp.choices[0].message.content.strip()


def mutate(tokens: List[str]) -> List[str]:
    prompt = to_text(tokens)
    # Provide a few examples so the operator can steer toward exact-match extraction behavior.
    demo = "\n".join([
        f"- QUESTION: {DEV_EXAMPLES[i]['question']}\n  INPUT: {DEV_EXAMPLES[i]['input']}\n  TARGET: {DEV_EXAMPLES[i]['target']}"
        for i in range(min(3, len(DEV_EXAMPLES)))
    ])

    instr = (
        "You will receive a prompt that will be evaluated on a dev set. "
        "For each dev example, the evaluation model will see:\n"
        "PROMPT\n\nQUESTION: <question>\n\nINPUT: <text>\n\nFINAL ANSWER:\n"
        "and must output exactly the TARGET string (no extra words).\n\n"
        "Improve the prompt to maximize exact-match accuracy across the dev set. "
        "Keep the prompt short and clear. Return ONLY the new prompt text.\n\n"
        "DEV EXAMPLES (format):\n"
        f"{demo}\n\n"
        f"CURRENT_PROMPT: {prompt}\n\n"
        "NEW_PROMPT:"
    )
    out = _call_evo_lm(instr)
    return from_text(out)


def crossover(a_tokens: List[str], b_tokens: List[str]) -> List[str]:
    a = to_text(a_tokens)
    b = to_text(b_tokens)

    demo = "\n".join([
        f"- QUESTION: {DEV_EXAMPLES[i]['question']}\n  INPUT: {DEV_EXAMPLES[i]['input']}\n  TARGET: {DEV_EXAMPLES[i]['target']}"
        for i in range(min(2, len(DEV_EXAMPLES)))
    ])

    instr = (
        "Combine the best parts of two prompts into one better prompt. "
        "The prompt is evaluated across multiple dev examples. "
        "For each example, the evaluation model sees QUESTION+INPUT and must output exactly TARGET (no extra words). "
        "Keep the new prompt short and unambiguous. Return ONLY the new prompt text.\n\n"
        "DEV EXAMPLES (format):\n"
        f"{demo}\n\n"
        f"PROMPT_A: {a}\n"
        f"PROMPT_B: {b}\n\n"
        "NEW_PROMPT:"
    )
    out = _call_evo_lm(instr)
    return from_text(out)


# Seed prompts (intentionally imperfect)
# These should try to extract the correct short answer from the INPUT text.
# Seed prompts (intentionally weak/vague)
# These start off bad so evolution has room to improve.
seed_prompts: List[List[str]] = [
    from_text("Answer the question."),
    from_text("Respond in a helpful way."),
    from_text("Summarize the input and answer."),
    from_text("Explain your reasoning and give the answer."),
    from_text("Give a complete response to the question."),
    from_text("Provide the best possible answer."),
]

population: List[List[str]] = seed_prompts[:POP_SIZE]
print("Initial population:")
for i,p in enumerate(population):
    print(i, to_text(p))



Initial population:
0 Answer the question.
1 Respond in a helpful way.
2 Summarize the input and answer.
3 Explain your reasoning and give the answer.
4 Give a complete response to the question.
5 Provide the best possible answer.


In [76]:

# Fitness is dev-set accuracy (exact match), not a single target phrase.
FitnessCacheVal = Tuple[float, List[str]]  # (accuracy, outputs per example)

# cache by (prompt_text, example_index)
_run_cache: Dict[Tuple[str, int], str] = {}


def run_candidate(prompt_tokens: List[str], ex_question: str, ex_input: str) -> str:
    """Run the candidate prompt on a single (question, input) pair and return the model output."""
    prompt_text = to_text(prompt_tokens)

    user_text = f"{prompt_text}\n\nQUESTION:\n{ex_question}\n\nINPUT:\n{ex_input}\n\nFINAL ANSWER:"

    resp = client.chat.completions.create(
        model=EVO_LM_MODEL.replace("openai/", ""),
        messages=[
            {
                "role": "system",
                "content": "You are a helpful assistant.",
            },
            {"role": "user", "content": user_text},
        ],
        temperature=0,
        max_tokens=64,
    )
    return resp.choices[0].message.content.strip()


def fitness(prompt_tokens: List[str]) -> FitnessCacheVal:
    prompt_text = to_text(prompt_tokens)

    outputs: List[str] = []
    correct = 0

    ex_iter = range(len(DEV_EXAMPLES))
    if tqdm is not None:
        ex_iter = tqdm(ex_iter, desc="Dev examples", total=len(DEV_EXAMPLES), leave=False)

    for j in ex_iter:
        key = (prompt_text, j)
        if key in _run_cache:
            out = _run_cache[key]
        else:
            out = run_candidate(prompt_tokens, DEV_EXAMPLES[j]["question"], DEV_EXAMPLES[j]["input"])
            _run_cache[key] = out

        outputs.append(out)
        if out.strip() == DEV_EXAMPLES[j]["target"].strip():
            correct += 1

    acc = correct / max(1, len(DEV_EXAMPLES))
    return acc, outputs


def tournament_select(pop: List[List[str]], scores: List[float], k: int = 3) -> List[str]:
    # Pick k random individuals and return the best
    idxs = [random.randrange(len(pop)) for _ in range(k)]
    best = max(idxs, key=lambda i: scores[i])
    return pop[best]


def evolve(population: List[List[str]]) -> Tuple[List[List[str]], List[dict]]:
    history = []

    gen_iter = range(GENERATIONS)
    if tqdm is not None:
        gen_iter = tqdm(gen_iter, desc="EvoPrompt generations", total=GENERATIONS)

    for gen in gen_iter:
        # Evaluate population fitness
        pop_iter = range(len(population))
        if tqdm is not None:
            pop_iter = tqdm(pop_iter, desc=f"Fitness eval (gen {gen})", total=len(population), leave=False)

        fits: List[float] = []
        outs: List[List[str]] = []
        for i in pop_iter:
            f, o_list = fitness(population[i])
            fits.append(f)
            outs.append(o_list)

        # Rank
        ranked = sorted(range(len(population)), key=lambda i: fits[i], reverse=True)
        best_i = ranked[0]
        best = population[best_i]
        history.append(
            {
                "gen": gen,
                "best_accuracy": fits[best_i],
                "best_prompt": to_text(best),
                "best_outputs": outs[best_i],
            }
        )

        # Early stop if perfect accuracy
        if fits[best_i] >= 1.0:
            break

        # Elitism
        new_pop: List[List[str]] = [population[i] for i in ranked[:ELITE_K]]

        # Refill
        while len(new_pop) < POP_SIZE:
            child: List[str]
            if random.random() < P_CROSSOVER:
                pa = tournament_select(population, fits, k=3)
                pb = tournament_select(population, fits, k=3)
                child = crossover(pa, pb)
            else:
                pa = tournament_select(population, fits, k=3)
                child = pa[:]

            if random.random() < P_MUTATE:
                child = mutate(child)

            # Basic guardrail
            if not child:
                child = from_text("Output exactly")

            new_pop.append(child)

        population = new_pop

    return population, history


final_population, evo_history = evolve(population)
print("Done. Generations run:", len(evo_history))



EvoPrompt generations:   0%|          | 0/5 [00:00<?, ?it/s]
[A


[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
EvoPrompt generations:  20%|██        | 1/5 [01:47<07:09, 107.39s/it]
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
[A
EvoPrompt generations:  20%|██        | 1/5 [02:27<09:48, 147.24s/it]

Done. Generations run: 2





In [77]:
# Reporting: did prompts improve?

if not evo_history:
    raise RuntimeError("No evolution history; did you run the evolution cell?")

print("\n=== EvoPrompt results (multi-example accuracy) ===")
print("Dev examples:", len(DEV_EXAMPLES))

# Show best per generation
print("\nBest per generation:")
for row in evo_history:
    print(f"gen={row['gen']:<2} best_accuracy={row['best_accuracy']:.3f} | prompt={row['best_prompt']}")

# Compare initial best vs final best
initial_best = evo_history[0]
final_best = evo_history[-1]

print("\nInitial best:")
print("  prompt:", initial_best["best_prompt"])
print("  accuracy:", initial_best["best_accuracy"])

print("\nFinal best:")
print("  prompt:", final_best["best_prompt"])
print("  accuracy:", final_best["best_accuracy"])

improved = final_best["best_accuracy"] > initial_best["best_accuracy"]
print("\nDid it improve?", improved)

# Show a few per-example before/after outputs
print("\nSample dev examples (before vs after):")
for i in range(min(5, len(DEV_EXAMPLES))):
    ex = DEV_EXAMPLES[i]
    base_out = initial_best["best_outputs"][i]
    final_out = final_best["best_outputs"][i]
    print("\n---")
    print("INPUT:", ex["input"])
    print("TARGET:", ex["target"])
    print("BASE_OUT:", base_out)
    print("FINAL_OUT:", final_out)

# Show the winning prompt tokens
winning_tokens = from_text(final_best["best_prompt"])
print("\nWinning prompt tokens:", winning_tokens)




=== EvoPrompt results (multi-example accuracy) ===
Dev examples: 18

Best per generation:
gen=0  best_accuracy=0.500 | prompt=Answer the question.
gen=1  best_accuracy=1.000 | prompt=Extract and return exactly the answer to the question based on the input text. Do not add any extra words or explanation.

Initial best:
  prompt: Answer the question.
  accuracy: 0.5

Final best:
  prompt: Extract and return exactly the answer to the question based on the input text. Do not add any extra words or explanation.
  accuracy: 1.0

Did it improve? True

Sample dev examples (before vs after):

---
INPUT: The Northbridge Nimbus Compiler targets the Vesta VM. The latest release is version 3.4. The previous release was version 3.3.
TARGET: 3.4
BASE_OUT: 3.4
FINAL_OUT: 3.4

---
INPUT: The Osprey Aurora Compiler targets the Redstone VM. The latest release is version 4.5. A beta build is labeled 4.6.
TARGET: 4.5
BASE_OUT: 4.5
FINAL_OUT: 4.5

---
INPUT: The Raven Redstone Compiler targets the Nova VM.

# Exercise 3

Compare reinforcement learning vs. evolutionary search for prompt optimization. If our “policy” is the prompt text and the “environment” gives a reward (quality score), RL would tweak the prompt based on gradient of reward (if possible) or black-box optimization. Evolutionary methods like GEPA/EvoPrompt treat it like a search over strings. List pros and cons of each: e.g., RL (with methods like RLPrompt or policy gradients) can directly optimize an objective but may get stuck in local optima or require many samples; evolutionary approaches are more global and can incorporate heuristic knowledge (via LLM reflections in GEPA) but might be slower if search space is huge. In practice, why might GEPA’s ability to incorporate natural language reflections be advantageous in prompt tuning?

## Solution

RL-style prompt optimization treats the prompt as a policy over tokens and improves it from a scalar reward signal. The upside is that you can directly optimize whatever reward you define and it can discover “weird but effective” prompts. The downsides are: high sample cost ( low sample efficiency and many rollouts needed), sensitivity to reward design/hyperparameters, can be unstable, the learned prompts can be brittle and uninterpretable.

Evolutionary search treats prompts (often per-module prompts in a pipeline) as candidates in a population and uses mutation/crossover + selection (often Pareto/multi-objective) to explore more globally. Upsides: better at avoiding greedy local minima, naturally supports multi-objective tradeoffs (accuracy vs cost/latency), works well for compound systems (retriever → reasoner → formatter), and can be more sample-efficient. 

GEPA’s natural-language reflection upgrades the learning signal from “reward = 0.63” to actionable, structured credit assignment. By reading execution traces (retrieval results, tool errors, reasoning steps) the reflector can say what failed and why and propose a targeted prompt patch (e.g., “don’t paraphrase the original query; retrieve missing hop entities”). That leverages 2025-era reasoning models’ strengths—turning a few rollouts into meaningful edits—whereas RL often needs many rollouts to infer the same lesson from scalar rewards.

# Exercise 4

*Coding (ACE):* Implement a simplified **ACE-style context evolution** system. Create a playbook with sections: STRATEGIES, CODE_SNIPPETS, PITFALLS. Each entry has an ID, helpful/harmful counters, and content. For a simple task (e.g., math problems), implement: (1) **Generator** produces a solution noting which playbook entries helped/hurt, (2) **Reflector** extracts a lesson from success/failure, (3) **Curator** converts the lesson to a structured bullet, checks for duplicates, and merges with counters. Run 20-30 iterations. Compare against a baseline that concatenates all lessons without structure. Does the structured approach prevent context collapse and preserve early lessons?

## Solution

In [None]:
# Exercise 4 (ACE) - setup
import json
import random
import re
from dataclasses import dataclass, field
from typing import Dict, List, Tuple

ACE_MODEL = "openai/gpt-4.1"
random.seed(42)


In [118]:
# Playbook structures + helpers
SECTION_PREFIX = {
    "STRATEGIES": "str",
    "CODE_SNIPPETS": "code",
    "PITFALLS": "pit",
}

@dataclass
class PlaybookEntry:
    section: str
    entry_id: str
    helpful: int
    harmful: int
    content: str

@dataclass
class Playbook:
    sections: Dict[str, List[PlaybookEntry]] = field(
        default_factory=lambda: {
            "STRATEGIES": [],
            "CODE_SNIPPETS": [],
            "PITFALLS": [],
        }
    )
    counters: Dict[str, int] = field(
        default_factory=lambda: {"STRATEGIES": 0, "CODE_SNIPPETS": 0, "PITFALLS": 0}
    )


def new_entry_id(playbook: Playbook, section: str) -> str:
    playbook.counters[section] += 1
    prefix = SECTION_PREFIX[section]
    return f"{prefix}-{playbook.counters[section]:05d}"


def render_playbook(playbook: Playbook) -> str:
    lines: List[str] = []
    for section in ["STRATEGIES", "CODE_SNIPPETS", "PITFALLS"]:
        lines.append(f"## {section}")
        entries = playbook.sections[section]
        if not entries:
            lines.append("(empty)")
        else:
            for e in entries:
                lines.append(f"[{e.entry_id}] helpful={e.helpful} harmful={e.harmful} :: {e.content}")
        lines.append("")
    return "\n".join(lines).strip()


def normalize_text(text: str) -> str:
    text = text.lower().strip()
    text = re.sub(r"[^a-z0-9 ]+", " ", text)
    text = re.sub(r"\s+", " ", text)
    return text


def jaccard_sim(a: str, b: str) -> float:
    a_set = set(normalize_text(a).split())
    b_set = set(normalize_text(b).split())
    if not a_set or not b_set:
        return 0.0
    return len(a_set & b_set) / len(a_set | b_set)


def find_duplicate(playbook: Playbook, section: str, content: str, threshold: float = 0.7):
    for entry in playbook.sections[section]:
        if jaccard_sim(entry.content, content) >= threshold:
            return entry
    return None


def build_id_map(playbook: Playbook) -> Dict[str, PlaybookEntry]:
    id_map: Dict[str, PlaybookEntry] = {}
    for section_entries in playbook.sections.values():
        for entry in section_entries:
            id_map[entry.entry_id] = entry
    return id_map


In [119]:
# Task generator: rule-heavy extraction domain (synthetic finance)

RULE_HINTS = [
    "Core Value = Revenue - COGS - OPEX + Rebates",
    "Stability Index = Core Value - 2*Penalties",
    "Unit Score = Stability Index / Units",
    "Use FY2024 values only (ignore FY2023 and FY2025 forecast).",
]


def _format_report(values_2024, values_2023, values_2025):
    return (
        "FY2024 (USD millions): "
        f"Revenue={values_2024['revenue']} COGS={values_2024['cogs']} "
        f"OPEX={values_2024['opex']} Rebates={values_2024['rebates']} "
        f"Penalties={values_2024['penalties']} Units={values_2024['units']}\n"
        "FY2023 (USD millions): "
        f"Revenue={values_2023['revenue']} COGS={values_2023['cogs']} "
        f"OPEX={values_2023['opex']} Rebates={values_2023['rebates']} "
        f"Penalties={values_2023['penalties']} Units={values_2023['units']}\n"
        "FY2025 Forecast (USD millions): "
        f"Revenue={values_2025['revenue']} COGS={values_2025['cogs']} "
        f"OPEX={values_2025['opex']} Rebates={values_2025['rebates']} "
        f"Penalties={values_2025['penalties']} Units={values_2025['units']}"
    )


def _generate_values(rng: random.Random):
    while True:
        revenue = rng.randint(900, 2200)
        cogs = rng.randint(250, 900)
        opex = rng.randint(180, 800)
        rebates = rng.randint(10, 150)
        penalties = rng.randint(5, 80)
        core = revenue - cogs - opex + rebates
        stability = core - 2 * penalties
        if stability <= 50:
            continue
        divisors = [d for d in range(2, 21) if stability % d == 0]
        if not divisors:
            continue
        units = rng.choice(divisors)
        values_2024 = {
            "revenue": revenue,
            "cogs": cogs,
            "opex": opex,
            "rebates": rebates,
            "penalties": penalties,
            "units": units,
        }
        return values_2024


def _jitter_values(base, rng: random.Random, scale: int):
    return {
        "revenue": max(100, base["revenue"] + rng.randint(-scale, scale)),
        "cogs": max(50, base["cogs"] + rng.randint(-scale // 2, scale // 2)),
        "opex": max(50, base["opex"] + rng.randint(-scale // 2, scale // 2)),
        "rebates": max(0, base["rebates"] + rng.randint(-20, 20)),
        "penalties": max(0, base["penalties"] + rng.randint(-15, 15)),
        "units": max(2, base["units"] + rng.choice([-2, -1, 0, 1, 2])),
    }


def _compute_metrics(values):
    core = values["revenue"] - values["cogs"] - values["opex"] + values["rebates"]
    stability = core - 2 * values["penalties"]
    unit_score = stability // values["units"]
    return {
        "core_value": core,
        "stability_index": stability,
        "unit_score": unit_score,
    }


def make_tasks(n: int, rng: random.Random) -> List[Dict[str, str]]:
    tasks: List[Dict[str, str]] = []
    q_types = [
        "core_value",
        "stability_index",
        "unit_score",
        "revenue",
        "penalties",
    ]

    for _ in range(n):
        values_2024 = _generate_values(rng)
        values_2023 = _jitter_values(values_2024, rng, 250)
        values_2025 = _jitter_values(values_2024, rng, 300)
        report = _format_report(values_2024, values_2023, values_2025)

        q_type = rng.choice(q_types)
        metrics_2024 = _compute_metrics(values_2024)
        if q_type == "core_value":
            answer = metrics_2024["core_value"]
            q_text = "What is the FY2024 Core Value? Answer with a number only."
        elif q_type == "stability_index":
            answer = metrics_2024["stability_index"]
            q_text = "What is the FY2024 Stability Index? Answer with a number only."
        elif q_type == "unit_score":
            answer = metrics_2024["unit_score"]
            q_text = "What is the FY2024 Unit Score? Answer with a number only."
        elif q_type == "revenue":
            answer = values_2024["revenue"]
            q_text = "What is FY2024 Revenue? Answer with a number only."
        else:
            answer = values_2024["penalties"]
            q_text = "What are FY2024 Penalties? Answer with a number only."

        tasks.append(
            {
                "report": report,
                "question": q_text,
                "q_type": q_type,
                "answer": str(answer),
                "values_2024": values_2024,
                "values_2023": values_2023,
                "values_2025": values_2025,
            }
        )

    return tasks


TASKS = make_tasks(80, random.Random(123))


In [120]:
# LLM helpers: generator + reflector

def call_ace_lm(system: str, user: str) -> str:
    resp = client.chat.completions.create(
        model=ACE_MODEL.replace("openai/", ""),
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        temperature=0,
        max_tokens=250,
    )
    return resp.choices[0].message.content.strip()


def parse_json(text: str) -> Dict:
    try:
        return json.loads(text)
    except Exception:
        match = re.search(r"\{.*\}", text, re.S)
        if match:
            try:
                return json.loads(match.group(0))
            except Exception:
                pass
    return {}


def normalize_number(text: str) -> str:
    return re.sub(r"[^0-9-]", "", text or "")


def clean_lesson(text: str) -> str:
    text = re.sub(r"\s+", " ", text).strip()
    return text[:180].rstrip()


def pick_section(raw_section: str, content: str, success: bool) -> str:
    if not success:
        return "PITFALLS"
    section = raw_section.upper().strip()
    if section not in {"STRATEGIES", "CODE_SNIPPETS", "PITFALLS"}:
        section = "STRATEGIES"
    if "=" in content or "*" in content or "/" in content:
        return "CODE_SNIPPETS"
    if "avoid" in content.lower() or "don't" in content.lower() or "do not" in content.lower():
        return "PITFALLS"
    return section


def format_question(task: Dict[str, str], include_hint: bool) -> str:
    hint = ("\nHINTS:\n" + "\n".join(RULE_HINTS)) if include_hint else ""
    return f"{task['report']}\n\n{task['question']}{hint}"


def generator(playbook_text: str, question: str, available_ids: List[str], pitfall_ids: List[str]) -> Dict:
    system = "You solve word problems. Return ONLY valid JSON."
    user = (
        "PLAYBOOK:\n"
        f"{playbook_text}\n\n"
        "QUESTION:\n"
        f"{question}\n\n"
        "Return JSON with keys:\n"
        "- answer: string (number only)\n"
        "- helped_ids: array of IDs from AVAILABLE_IDS that you used (include PITFALLS you avoided)\n"
        "- hurt_ids: array of IDs from AVAILABLE_IDS that misled you\n\n"
        "If AVAILABLE_IDS is non-empty, include at least one ID in helped_ids or hurt_ids.\n"
        "If PITFALL_IDS is non-empty, include at least one of those IDs in helped_ids (as an avoided mistake).\n"
        f"AVAILABLE_IDS: {available_ids}\n"
        f"PITFALL_IDS: {pitfall_ids}\n"
        "JSON:"
    )
    raw = call_ace_lm(system, user)
    data = parse_json(raw)
    answer = str(data.get("answer", "")).strip()
    helped = [x for x in data.get("helped_ids", []) if x in available_ids]
    hurt = [x for x in data.get("hurt_ids", []) if x in available_ids]

    if available_ids and not helped and not hurt:
        helped = [available_ids[0]]

    if pitfall_ids and not any(x in pitfall_ids for x in helped):
        helped.append(pitfall_ids[0])

    return {"answer": answer, "helped_ids": helped, "hurt_ids": hurt}


def _alt_answer(task: Dict[str, str], source: str, variant: str):
    values = task[source]
    core = values["revenue"] - values["cogs"] - values["opex"] + values["rebates"]
    if variant == "no_rebates":
        core = values["revenue"] - values["cogs"] - values["opex"]
    if variant == "no_double_penalty":
        stability = core - values["penalties"]
    else:
        stability = core - 2 * values["penalties"]
    unit_score = stability // values["units"]
    if task["q_type"] == "core_value":
        return core
    if task["q_type"] == "stability_index":
        return stability
    if task["q_type"] == "unit_score":
        return unit_score
    if task["q_type"] == "revenue":
        return values["revenue"]
    return values["penalties"]


def reflector(question: str, gold: str, pred: str, success: bool, task: Dict[str, str]) -> Dict:
    system = "You are a reflection agent. Return ONLY valid JSON."
    result = "correct" if success else "incorrect"
    pred_num = normalize_number(pred)
    error_hint = ""
    if not success and pred_num:
        pred_val = int(pred_num)
        if pred_val == _alt_answer(task, "values_2023", "default"):
            error_hint = "Used FY2023 values instead of FY2024."
        elif pred_val == _alt_answer(task, "values_2025", "default"):
            error_hint = "Used FY2025 forecast values instead of FY2024."
        elif pred_val == _alt_answer(task, "values_2024", "no_double_penalty"):
            error_hint = "Forgot to double penalties in Stability Index."
        elif pred_val == _alt_answer(task, "values_2024", "no_rebates"):
            error_hint = "Forgot to add rebates in Core Value."

    user = (
        f"QUESTION: {question}\n"
        f"GOLD: {gold}\n"
        f"MODEL_ANSWER: {pred}\n"
        f"RESULT: {result}\n"
        f"ERROR_HINT: {error_hint or 'none'}\n\n"
        "Write one short, specific lesson that improves future answers. "
        "If incorrect, write it as a PITFALL to avoid next time. "
        "If correct, write a strategy or a formula. "
        "Prefer explicit formulas, e.g., 'stability = core - 2*penalties'.\n"
        'Return JSON: {"section": ..., "content": ...}'
    )
    raw = call_ace_lm(system, user)
    data = parse_json(raw)
    content = clean_lesson(str(data.get("content", "Convert units before computing.")))
    section = pick_section(str(data.get("section", "STRATEGIES")), content, success)
    return {"section": section, "content": content}


In [125]:
# Curator + evaluation loop

def update_help_hurt(playbook: Playbook, helped_ids: List[str], hurt_ids: List[str]) -> None:
    id_map = build_id_map(playbook)
    for entry_id in helped_ids:
        if entry_id in id_map:
            id_map[entry_id].helpful += 1
    for entry_id in hurt_ids:
        if entry_id in id_map:
            id_map[entry_id].harmful += 1


def prune_playbook(playbook: Playbook, max_entries_per_section: int = 18) -> None:
    for section in playbook.sections:
        entries = playbook.sections[section]
        entries = [
            e
            for e in entries
            if not ((e.harmful >= e.helpful and e.harmful >= 2) or (e.helpful == 0 and e.harmful > 0))
        ]
        if len(entries) > max_entries_per_section:
            entries = sorted(
                entries,
                key=lambda e: (e.helpful - e.harmful, e.helpful),
                reverse=True,
            )[:max_entries_per_section]
        playbook.sections[section] = entries


def curate(playbook: Playbook, section: str, content: str, success: bool) -> None:
    content = content.strip()
    if not content:
        return
    existing = find_duplicate(playbook, section, content, threshold=0.5)
    if existing:
        if success:
            existing.helpful += 1
        else:
            existing.harmful += 1
        return

    entry_id = new_entry_id(playbook, section)
    entry = PlaybookEntry(
        section=section,
        entry_id=entry_id,
        helpful=1 if success else 0,
        harmful=0 if success else 1,
        content=content,
    )
    playbook.sections[section].append(entry)


def run_ace(iterations: int = 40, baseline_budget: int = 900, hint_steps: int = 8):
    playbook = Playbook()
    baseline_lessons: List[str] = []
    history: List[Dict] = []
    early_structured: List[str] = []
    early_baseline: List[str] = []
    rng = random.Random(999)

    for step in range(iterations):
        task = rng.choice(TASKS)
        question = format_question(task, include_hint=step < hint_steps)
        gold = task["answer"]

        # Structured playbook run
        playbook_text = render_playbook(playbook)
        id_map = build_id_map(playbook)
        available_ids = list(id_map.keys())
        pitfall_ids = [e.entry_id for e in playbook.sections["PITFALLS"]]
        gen = generator(playbook_text, question, available_ids, pitfall_ids)
        pred = gen["answer"]
        success = normalize_number(pred) == normalize_number(gold)
        update_help_hurt(playbook, gen["helped_ids"], gen["hurt_ids"])
        lesson = reflector(question, gold, pred, success, task)
        curate(playbook, lesson["section"], lesson["content"], success)
        prune_playbook(playbook)

        if step < 5:
            early_structured.append(lesson["content"])

        # Baseline: concat lessons, no structure or dedupe
        baseline_context = "\n".join(baseline_lessons)
        if len(baseline_context) > baseline_budget:
            baseline_context = baseline_context[-baseline_budget:]
        baseline_notes = f"NOTES:\n{baseline_context}" if baseline_context else "(no lessons)"
        base_gen = generator(baseline_notes, question, [], [])
        base_pred = base_gen["answer"]
        base_success = normalize_number(base_pred) == normalize_number(gold)
        base_lesson = reflector(question, gold, base_pred, base_success, task)
        baseline_lessons.append(base_lesson["content"])
        if step < 5:
            early_baseline.append(base_lesson["content"])

        history.append(
            {
                "step": step,
                "structured_ok": success,
                "baseline_ok": base_success,
            }
        )

    structured_acc = sum(1 for h in history if h["structured_ok"]) / len(history)
    baseline_acc = sum(1 for h in history if h["baseline_ok"]) / len(history)

    final_playbook_text = render_playbook(playbook)
    baseline_context = "\n".join(baseline_lessons)
    if len(baseline_context) > baseline_budget:
        baseline_context = baseline_context[-baseline_budget:]

    structured_retained = sum(1 for l in early_structured if l in final_playbook_text)
    baseline_retained = sum(1 for l in early_baseline if l in baseline_context)

    return {
        "history": history,
        "structured_acc": structured_acc,
        "baseline_acc": baseline_acc,
        "structured_retained": structured_retained,
        "baseline_retained": baseline_retained,
        "playbook": playbook,
        "baseline_context": baseline_context,
    }

results = run_ace(iterations=40, baseline_budget=900, hint_steps=8)
print("Structured accuracy:", round(results["structured_acc"], 3))
print("Baseline accuracy:", round(results["baseline_acc"], 3))
print("Early lesson retention (structured):", results["structured_retained"], "/ 5")
print("Early lesson retention (baseline):", results["baseline_retained"], "/ 5")
print("\nFinal playbook:\n")
print(render_playbook(results["playbook"]))


Structured accuracy: 0.55
Baseline accuracy: 0.375
Early lesson retention (structured): 0 / 5
Early lesson retention (baseline): 0 / 5

Final playbook:

## STRATEGIES
[str-00001] helpful=24 harmful=0 :: To find FY2024 Revenue, directly extract the value labeled 'Revenue' under FY2024 from the data provided.
[str-00002] helpful=5 harmful=0 :: To find the value of a specific financial metric for a given fiscal year, directly extract the number associated with that metric from the provided data without additional calculat

## CODE_SNIPPETS
[code-00001] helpful=17 harmful=0 :: To find FY2024 Penalties, directly use the given FY2024 Penalties value. For related calculations, use formulas: Core Value = Revenue - COGS - OPEX + Rebates; Stability Index = Cor
[code-00002] helpful=19 harmful=0 :: Core Value = Revenue - COGS - OPEX + Rebates; apply this formula directly using FY2024 values to calculate Core Value accurately.
[code-00003] helpful=9 harmful=0 :: Core Value = Revenue - COGS - OPEX -