In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
import os
from pathlib import Path
import datetime
import random

import dspy

In [7]:
def setup_mlflow():
    import mlflow
    import mlflow.dspy

    mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
    experiment = mlflow.set_experiment("dspy-gepa-musique")
    mlflow.dspy.autolog(
        log_compiles=True,
        log_evals=True,
        log_traces_from_compile=True,
    )
    print(f"✅ MLflow tracking enabled at {os.getenv('MLFLOW_TRACKING_URI')}")
    return experiment

mlflow_exp = setup_mlflow()

✅ MLflow tracking enabled at http://localhost:5005


In [6]:
EXP_ID = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
EXP_DIR = Path(f"../outputs/dspy-gepa-musique/{EXP_ID}")
EXP_DIR.mkdir(parents=True, exist_ok=True)
EXP_DIR

PosixPath('../outputs/dspy-gepa-musique/20251003_131415')

In [13]:
lm = dspy.LM(
    "openai/Qwen/Qwen2.5-7B-Instruct",
    temperature=0.6,
    max_tokens=8192,
    api_key="local",
    api_base="http://0.0.0.0:8000/v1",
    cache=False,
)
mlflow_exp._tags["student_lm"] = {
    "model": lm.model,
}
dspy.configure(lm=lm)

reflection_lm = dspy.LM(
    "gemini/gemini-2.5-pro",
    api_key=os.getenv("GEMINI_API_KEY"),
    max_tokens=16384,
    thinking={"type": "enabled"},
    cache=False,
)
mlflow_exp._tags["reflection_lm"] = {
    "model": reflection_lm.model,
}
# reflection_lm = dspy.LM(
#     "openai/Qwen/Qwen3-32B",
#     temperature=0.6,
#     max_tokens=16384,
#     api_key="local",
#     api_base="http://0.0.0.0:8001/v1",
#     cache=False,
# )

In [14]:
lm(messages=[{"role": "user", "content": "Hello"}])

['Hello! How can I assist you today?']

In [15]:
reflection_lm(messages=[{"role": "user", "content": "What is largest prime number below 10?"}])

['The largest prime number below 10 is **7**.\n\nThe prime numbers below 10 are 2, 3, 5, and 7.']

In [16]:
from rlvr.dspy.mhqa.data import prepare_musique_dataset

ds = prepare_musique_dataset(datasets_str="bdsaglam/musique-mini,answerable,train")
random.Random(89).shuffle(ds)
train_size = int(len(ds)*0.80)
train_ds, val_ds = ds[:train_size], ds[train_size:]
train_ds = train_ds[:30]
val_ds = val_ds[:30]

test_ds = prepare_musique_dataset(datasets_str="bdsaglam/musique-mini,answerable,validation[:50]")

Map: 100%|##########| 300/300 [00:00<?, ? examples/s]

Map:   0%|          | 0/300 [00:00<?, ? examples/s]

Map: 100%|##########| 50/50 [00:00<?, ? examples/s]

Map:   0%|          | 0/50 [00:00<?, ? examples/s]

In [27]:
from rlvr.dspy.mhqa.baleen import MultiHopQA

program = MultiHopQA(
    # prompt_technique="cot",
)
program

generate_query.predict = Predict(StringSignature(question, collected_info -> reasoning, search_query, top_n
    instructions='Given a multi-hop question and information collected so far, generate a search query\nto find the next piece of information needed to answer the question.\nFocus on entities, dates, or facts that need to be resolved step by step.'
    question = Field(annotation=str required=True json_schema_extra={'desc': 'The multi-hop question to answer', '__dspy_field_type': 'input', 'prefix': 'Question:'})
    collected_info = Field(annotation=str required=True json_schema_extra={'desc': 'Information collected from previous retrieval steps.', '__dspy_field_type': 'input', 'prefix': 'Collected Info:'})
    reasoning = Field(annotation=str required=True json_schema_extra={'prefix': "Reasoning: Let's think step by step in order to", 'desc': '${reasoning}', '__dspy_field_type': 'output'})
    search_query = Field(annotation=str required=True json_schema_extra={'desc': 'Search q

In [28]:
example = train_ds[0]
example

Example({'question': "What county contains the work location of the president making father's day a national holiday?", 'answer': 'Washington County', 'answers': ['Washington County', 'washington county'], 'docs': [{'body': 'Thanksgiving, or Thanksgiving Day, is a public holiday celebrated on the fourth Thursday of November in the United States. It originated as a harvest festival. Thanksgiving has been celebrated nationally on and off since 1789, after Congress requested a proclamation by George Washington. It has been celebrated as a federal holiday every year since 1863, when, during the American Civil War, President Abraham Lincoln proclaimed a national day of "Thanksgiving and Praise to our beneficent Father who dwelleth in the Heavens,"to be celebrated on the last Thursday in November. Together with Christmas and the New Year, Thanksgiving is a part of the broader fall / winter holiday season in the U.S.', 'id': '0', 'is_supporting': False, 'text': '# Thanksgiving (United States)

In [None]:
pred = program(example.question, example.docs)
pred

In [None]:
from rlvr.dspy.mhqa.metrics import metric

In [None]:
metric(example, pred)

0.35024453024453023

In [None]:
# Evaluate original program
print("📊 Evaluating ORIGINAL program...")
original_evaluate = dspy.Evaluate(
    devset=test_ds,
    metric=metric,
    num_threads=16,
    display_table=False,
    display_progress=True
)
original_eval_result = original_evaluate(program)

📊 Evaluating ORIGINAL program...
Average Metric: 26.66 / 50 (53.3%): 100%|██████████| 50/50 [01:20<00:00,  1.61s/it]

2025/10/03 13:20:40 INFO dspy.evaluate.evaluate: Average Metric: 26.656956275163214 / 50 (53.3%)



🏃 View run eval at: http://localhost:5005/#/experiments/1/runs/651a0fde52bb4e2f890d9ce665ae541a
🧪 View experiment at: http://localhost:5005/#/experiments/1


In [None]:
# raise ValueError()

## GEPA Optimization

GEPA is a reflective prompt optimizer that uses textual feedback to improve performance. We'll create feedback functions for each evaluation aspect and optimize our multi-hop QA program.


In [20]:
from rlvr.dspy.mhqa.feedback_metrics import metric_with_feedback

In [21]:
# Test the feedback metric on our example
feedback_result = metric_with_feedback(example, pred)
print(f"Score: {feedback_result.score:.3f}")
print(f"Feedback: {feedback_result.feedback}")


Score: 0.581
Feedback: Overall performance breakdown:
- Answer F1 Score: Low overlap (F1: 0.40) with expected answer. Your answer 'Los Angeles County.' differs significantly from 'Washington County'. Focus on extracting the specific information requested in the question.
- Retrieval Recall: Poor retrieval (recall: 0.33). Only found 1 out of 3 supporting documents. Missing critical documents: ['5', '7']. Your search queries need to be more comprehensive and targeted.
- Retrieval Precision: Perfect precision! All 1 retrieved documents are supporting documents: ['14']
- Citations F1 Score: Good citations (F1: 0.50). Correct: ['14']. Missing: ['5', '7']. Be more precise about which documents actually support your answer.
- Hop Efficiency: Perfect efficiency! You completed the task in 3 turns, same as or fewer than the reference (3 hops). This shows excellent retrieval strategy and reasoning efficiency.


In [22]:
from dspy import GEPA

# Set up GEPA optimizer with reflection LM for optimization
optimizer = GEPA(
    metric=metric_with_feedback,
    auto="light",  # Use light budget for faster experimentation. Use "heavy" for best performance
    num_threads=16,
    track_stats=True,
    use_merge=False,
    reflection_lm=reflection_lm  
)

print("✅ GEPA optimizer configured")

✅ GEPA optimizer configured


In [None]:
# Run GEPA optimization
print("🚀 Starting GEPA optimization...")

optimized_program = optimizer.compile(
    program,
    trainset=train_ds,
    valset=val_ds,
)

print("✅ GEPA optimization completed!")

2025/10/03 11:49:17 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '861812e4d4a744c780b95fbf4d01e866', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current dspy workflow
2025/10/03 11:49:18 INFO dspy.teleprompt.gepa.gepa: Running GEPA for approx 1765 metric calls of the program. This amounts to 29.42 full evals on the train+val set.
2025/10/03 11:49:18 INFO dspy.teleprompt.gepa.gepa: Using 30 examples for tracking Pareto scores. You can consider using a smaller sample of the valset to allow GEPA to explore more diverse solutions within the same budget.


🚀 Starting GEPA optimization...


GEPA Optimization:   0%|                                                                       | 0/1765 [00:00<?, ?rollouts/s]

In [None]:
optimized_program.save(str(EXP_DIR / "optimized-program"), save_program=True)

### Examine Optimized Prompts

Let's look at how GEPA improved the prompts for each predictor:


In [None]:
for name, pred in optimized_program.named_predictors():
    print("=" * 60)
    print(f"Predictor: {name}")
    print("=" * 60)
    print("Optimized Instructions:")
    print(pred.signature.instructions)
    print("*" * 60)


### Evaluate Optimized Program

Compare the performance before and after GEPA optimization:


In [None]:
print("\\n📊 Evaluating OPTIMIZED program...")
# Evaluate optimized program  
optimized_evaluate = dspy.Evaluate(
    devset=test_ds,
    metric=metric,
    num_threads=8,
    display_table=False,
    display_progress=True
)
optimized_eval_result = optimized_evaluate(optimized_program)

In [None]:
print("\\n" + "=" * 50)
print("🏆 PERFORMANCE COMPARISON")
print("=" * 50)
print(f"Original Program Score:  {original_eval_result.score:.3f}")
print(f"Optimized Program Score: {optimized_eval_result.score:.3f}")
print(f"Improvement:            {optimized_eval_result.score - original_eval_result.score:+.3f}")
print(f"Relative Improvement:   {((optimized_eval_result.score / original_eval_result.score) - 1) * 100:+.1f}%")

### GEPA Optimization Analysis

Analyze the detailed optimization results:


In [None]:
# Analyze GEPA optimization trajectory
if hasattr(optimized_program, 'detailed_results'):
    results = optimized_program.detailed_results
    
    print("🔍 GEPA Optimization Details:")
    print(f"- Total candidates explored: {len(results.candidates)}")
    print(f"- Best candidate index: {results.best_idx}")
    print(f"- Best validation score: {results.val_aggregate_scores[results.best_idx]:.3f}")
    print(f"- Discovery evaluations used: {sum(results.discovery_eval_counts)}")
    
    # Show score progression
    print("\\n📈 Score progression:")
    for i, score in enumerate(results.val_aggregate_scores[:10]):  # Show first 10
        print(f"Candidate {i}: {score:.3f}")
    
    if len(results.val_aggregate_scores) > 10:
        print(f"... and {len(results.val_aggregate_scores) - 10} more candidates")
else:
    print("Detailed results not available (set track_stats=True in GEPA constructor)")


In [None]:
# Test optimized program on the same example
example = test_ds[11]

print("🧪 Testing optimized program on example:")
print(f"Question: {example.question}")
print(f"Expected Answer: {example.answer}")
print(f"Supporting Docs: {example.supporting_ids}")
print()

pred = program(example.question, example.docs)
original_metric_result = metric_with_feedback(example, pred)

print("📋 ORIGINAL vs OPTIMIZED Results:")
print("-" * 50)
print("ORIGINAL:")
print(f"  Answer: {pred.answer}")
print(f"  Retrieved docs: {pred.retrieved_doc_ids}")
print(f"  Citations: {pred.citations}")
print(f"Original score: {original_metric_result.score:.3f}")
# print(f"Original feedback: {original_metric_result.feedback}")

optimized_pred = optimized_program(example.question, example.docs)
optimized_metric_result = metric_with_feedback(example, optimized_pred)
print("OPTIMIZED:")
print(f"  Answer: {optimized_pred.answer}")
print(f"  Retrieved docs: {optimized_pred.retrieved_doc_ids}")
print(f"  Citations: {optimized_pred.citations}")
print(f"Optimized score: {optimized_metric_result.score:.3f}")
# print(f"Optimized feedback: {optimized_metric_result.feedback}")

Can we measure instruction quality by using them with a larger model to see if it gets questions right?