In [1]:
import os


from vllm import LLM, SamplingParams
from math_verify import parse, verify

import pandas as pd
from tqdm import tqdm
from transformers import AutoTokenizer

  from .autonotebook import tqdm as notebook_tqdm


INFO 12-08 03:18:12 [__init__.py:239] Automatically detected platform cuda.


2025-12-08 03:18:14,662	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [2]:
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

In [None]:
def verify_answer(ground_truth, response):
    """
    Verify if a response matches the ground truth.
    
    Args:
        ground_truth: The correct answer (will be parsed with $ delimiters)
        response: The model's response to verify
        
    Returns:
        1 if correct, 0 if incorrect
    """
    try:
        ans = parse(response)
    except Exception:
        ans = response
    
    try:
        if verify(parse(f"${ground_truth}$"), ans):
            return 1
        else:
            return 0
    except Exception:
        return 0

def evaluate_responses(responses, ground_truths):
    """
    Evaluate a list of responses against ground truths.
    
    Args:
        responses: List of model responses
        ground_truths: List of correct answers
        
    Returns:
        Tuple of (is_correct_list, accuracy_percentage, num_correct, total)
    """
    is_correct = [verify_answer(gt, resp) for gt, resp in zip(ground_truths, responses)]
    accuracy = sum(is_correct) / len(is_correct) * 100 if is_correct else 0.0
    return is_correct, accuracy, sum(is_correct), len(is_correct)

In [3]:
MODEL_NAME = "Qwen/Qwen2.5-Math-1.5B-Instruct"
MODEL_ALIAS = MODEL_NAME.split("/")[-1]

In [4]:
sample_test_set_path = "/VData/linna4335/llms_know_difficult/runs/Qwen2.5-Math-1.5B/datasplits/AIME_2025_predicted_by_predicting_learnability.json"

In [5]:
df = pd.read_json(sample_test_set_path)

In [6]:
MATH_PROMPT_FORMATTING = " Please put your final answer inside \\boxed{}."

In [7]:
df["formatted_prompt"] = df["question"].apply(lambda x: x + MATH_PROMPT_FORMATTING)

In [8]:
llm = LLM(model=MODEL_NAME, gpu_memory_utilization=0.4)

INFO 12-08 03:18:23 [config.py:717] This model supports multiple tasks: {'reward', 'generate', 'classify', 'score', 'embed'}. Defaulting to 'generate'.
INFO 12-08 03:18:24 [config.py:2003] Chunked prefill is enabled with max_num_batched_tokens=16384.
INFO 12-08 03:18:24 [config.py:2003] Chunked prefill is enabled with max_num_batched_tokens=16384.
INFO 12-08 03:18:25 [core.py:58] Initializing a V1 LLM engine (v0.8.5.post1) with config: model='Qwen/Qwen2.5-Math-1.5B-Instruct', speculative_config=None, tokenizer='Qwen/Qwen2.5-Math-1.5B-Instruct', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.bfloat16, max_seq_len=4096, download_dir=None, load_format=auto, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto,  device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='auto', re

Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  2.66it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  2.65it/s]

Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  2.66it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  2.65it/s]



INFO 12-08 03:18:28 [loader.py:458] Loading weights took 0.41 seconds
INFO 12-08 03:18:28 [gpu_model_runner.py:1347] Model loading took 2.8798 GiB and 1.000572 seconds
INFO 12-08 03:18:28 [gpu_model_runner.py:1347] Model loading took 2.8798 GiB and 1.000572 seconds
INFO 12-08 03:18:34 [backends.py:420] Using cache directory: /home/lina4335/.cache/vllm/torch_compile_cache/0426d30a27/rank_0_0 for vLLM's torch.compile
INFO 12-08 03:18:34 [backends.py:430] Dynamo bytecode transform time: 5.46 s
INFO 12-08 03:18:34 [backends.py:420] Using cache directory: /home/lina4335/.cache/vllm/torch_compile_cache/0426d30a27/rank_0_0 for vLLM's torch.compile
INFO 12-08 03:18:34 [backends.py:430] Dynamo bytecode transform time: 5.46 s
INFO 12-08 03:18:37 [backends.py:118] Directly load the compiled graph(s) for shape None from the cache, took 3.077 s
INFO 12-08 03:18:37 [backends.py:118] Directly load the compiled graph(s) for shape None from the cache, took 3.077 s
INFO 12-08 03:18:38 [monitor.py:33] to

In [9]:
# default prompting

In [10]:
PROMPTS = df["formatted_prompt"].to_list()
GTS = df["answer"].to_list()

In [11]:
PARAMS = SamplingParams(temperature=0.0, max_tokens=3000)

In [12]:
outputs = llm.generate(PROMPTS, PARAMS)

Processed prompts:   0%|          | 0/15 [00:00<?, ?it/s, est. speed input: 0.00 toks/s, output: 0.00 toks/s]

Processed prompts: 100%|██████████| 15/15 [00:04<00:00,  3.06it/s, est. speed input: 358.86 toks/s, output: 2682.63 toks/s]
Processed prompts: 100%|██████████| 15/15 [00:04<00:00,  3.06it/s, est. speed input: 358.86 toks/s, output: 2682.63 toks/s]


In [13]:
RESP = []
for output in outputs:
    prompt = output.prompt
    generated_text = output.outputs[0].text
    RESP.append(generated_text)

In [None]:
# Evaluate responses using reusable function
is_correct, accuracy, num_correct, total = evaluate_responses(RESP, GTS)
print(f"Accuracy: {accuracy:.2f}% ({num_correct}/{total})")


In [46]:
GTS

[70, 588, 16, 117, 279, 504, 821, 77, 62, 81, 259, 510, 204, 60, 735]

In [47]:
is_correct

[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

## 2-Round Inference on All Questions

Now let's perform 2-round inference on all questions. In round 1, we generate an initial answer. In round 2, we ask the model to review and potentially correct its work.

In [None]:
# Round 1: Generate initial answers
PROMPTS_ROUND1 = df["formatted_prompt"].to_list()
GTS_EASY = df["answer"].to_list()

print("Generating Round 1 responses...")
outputs_round1 = llm.generate(PROMPTS_ROUND1, PARAMS)

RESP_ROUND1 = []
for output in outputs_round1:
    generated_text = output.outputs[0].text
    RESP_ROUND1.append(generated_text)
    
print(f"Completed Round 1 for {len(RESP_ROUND1)} questions")

Generating Round 1 responses...


Processed prompts: 100%|██████████| 15/15 [00:04<00:00,  3.60it/s, est. speed input: 421.80 toks/s, output: 2929.05 toks/s]

Completed Round 1 for 15 questions





In [2]:
import pandas as pd

In [3]:
df = pd.read_parquet("/VData/linna4335/llms_know_difficult/hard_rl/data/MATH/train.parquet")

In [None]:
# Round 2: Review and correct the work
REVIEW_PROMPT_TEMPLATE = """Original Question: {question}

Your Initial Answer: {initial_answer}

Please carefully review your answer above. Check for any calculation errors, logical mistakes, or incorrect reasoning. If you find any errors, provide a corrected answer. If your answer is correct, restate it.

Please put your final answer inside \\boxed{{}}."""

PROMPTS_ROUND2 = []
for idx, (question, response) in enumerate(zip(df["question"].to_list(), RESP_ROUND1)):
    review_prompt = REVIEW_PROMPT_TEMPLATE.format(
        question=question,
        initial_answer=response
    )
    PROMPTS_ROUND2.append(review_prompt)

print("Generating Round 2 responses (review)...")
outputs_round2 = llm.generate(PROMPTS_ROUND2, PARAMS)

RESP_ROUND2 = []
for output in outputs_round2:
    generated_text = output.outputs[0].text
    RESP_ROUND2.append(generated_text)
    
print(f"Completed Round 2 for {len(RESP_ROUND2)} questions")

Generating Round 2 responses (review)...


Processed prompts:   0%|          | 0/15 [00:00<?, ?it/s, est. speed input: 0.00 toks/s, output: 0.00 toks/s]

Processed prompts: 100%|██████████| 15/15 [00:11<00:00,  1.34it/s, est. speed input: 1308.23 toks/s, output: 3399.34 toks/s]

Completed Round 2 for 15 questions





In [None]:
# Evaluate Round 1 results
is_correct_round1, accuracy_round1, num_correct_r1, total_r1 = evaluate_responses(RESP_ROUND1, GTS_EASY)
print(f"Round 1 Accuracy: {accuracy_round1:.2f}% ({num_correct_r1}/{total_r1})")

Round 1 Accuracy: 13.33% (2/15)


In [None]:
# Evaluate Round 2 results
is_correct_round2, accuracy_round2, num_correct_r2, total_r2 = evaluate_responses(RESP_ROUND2, GTS_EASY)
print(f"Round 2 Accuracy: {accuracy_round2:.2f}% ({num_correct_r2}/{total_r2})")

Round 2 Accuracy: 13.33% (2/15)


In [None]:
# Compare results and identify improvements
results_comparison = []
for idx in range(len(is_correct_round1)):
    result = {
        'question_idx': idx,
        'predicted_difficulty': df.iloc[idx]['predicted_difficulty_sigmoid'],
        'round1_correct': is_correct_round1[idx],
        'round2_correct': is_correct_round2[idx],
        'improved': is_correct_round2[idx] > is_correct_round1[idx],
        'degraded': is_correct_round2[idx] < is_correct_round1[idx]
    }
    results_comparison.append(result)

comparison_df = pd.DataFrame(results_comparison)

# Summary statistics
num_improved = comparison_df['improved'].sum()
num_degraded = comparison_df['degraded'].sum()
num_unchanged = len(comparison_df) - num_improved - num_degraded

print(f"\n=== 2-Round Inference Results ===")
print(f"Round 1 Accuracy: {accuracy_round1:.2f}%")
print(f"Round 2 Accuracy: {accuracy_round2:.2f}%")
print(f"Improvement: {accuracy_round2 - accuracy_round1:.2f} percentage points")
print(f"\nQuestions improved by review: {num_improved}")
print(f"Questions degraded by review: {num_degraded}")
print(f"Questions unchanged: {num_unchanged}")


=== 2-Round Inference Results ===
Round 1 Accuracy: 13.33%
Round 2 Accuracy: 13.33%
Improvement: 0.00 percentage points

Questions improved by review: 0
Questions degraded by review: 0
Questions unchanged: 15


In [57]:
# Display comparison dataframe
comparison_df

Unnamed: 0,question_idx,predicted_difficulty,round1_correct,round2_correct,improved,degraded
0,0,0.085126,0,0,False,False
1,1,0.185826,0,0,False,False
2,2,0.199134,0,0,False,False
3,3,0.224215,0,0,False,False
4,4,0.230331,0,0,False,False
5,5,0.230922,0,0,False,False
6,6,0.244214,0,0,False,False
7,7,0.257724,0,0,False,False
8,8,0.303385,0,0,False,False
9,9,0.331925,0,0,False,False


In [None]:
# Examine examples where the model improved its answer
improved_indices = comparison_df[comparison_df['improved']]['question_idx'].tolist()

if improved_indices:
    print(f"=== Example of Improved Answer ===")
    idx = improved_indices[0]
    print(f"\nQuestion: {df.iloc[idx]['question'][:200]}...")
    print(f"\nGround Truth: {GTS_EASY[idx]}")
    print(f"\nRound 1 Answer (Incorrect):\n{RESP_ROUND1[idx][:300]}...")
    print(f"\nRound 2 Answer (Correct):\n{RESP_ROUND2[idx][:300]}...")
else:
    print("No improved answers found.")

No improved answers found.


In [None]:
# Examine examples where the model degraded its answer
degraded_indices = comparison_df[comparison_df['degraded']]['question_idx'].tolist()

if degraded_indices:
    print(f"=== Example of Degraded Answer ===")
    idx = degraded_indices[0]
    print(f"\nQuestion: {df.iloc[idx]['question'][:200]}...")
    print(f"\nGround Truth: {GTS_EASY[idx]}")
    print(f"\nRound 1 Answer (Correct):\n{RESP_ROUND1[idx][:300]}...")
    print(f"\nRound 2 Answer (Incorrect):\n{RESP_ROUND2[idx][:300]}...")
else:
    print("No degraded answers found.")

No degraded answers found.


## Gating to solve harder questions

In [None]:
# Select harder questions (high predicted difficulty)
df_sorted_hard = df.sort_values('predicted_difficulty_sigmoid', ascending=False)
df_hard = df_sorted_hard.head(len(df))  # Adjust to select subset

print(f"Total questions: {len(df)}")
print(f"Questions selected for hard question strategies: {len(df_hard)}")
print(f"Predicted difficulty range: {df_hard['predicted_difficulty_sigmoid'].min():.4f} - {df_hard['predicted_difficulty_sigmoid'].max():.4f}")

### Strategy 1: Multi-Sample with Majority Vote
Generate multiple solutions and take the majority answer

In [None]:
# Strategy 1: Generate multiple samples and use majority voting
NUM_SAMPLES = 5
PARAMS_SAMPLING = SamplingParams(temperature=0.7, max_tokens=3000, top_p=0.95)

PROMPTS_HARD = df_hard["formatted_prompt"].to_list()
GTS_HARD = df_hard["answer"].to_list()

# Replicate each prompt NUM_SAMPLES times
prompts_repeated = []
question_indices = []
for idx, prompt in enumerate(PROMPTS_HARD):
    for _ in range(NUM_SAMPLES):
        prompts_repeated.append(prompt)
        question_indices.append(idx)

print(f"Generating {len(prompts_repeated)} responses ({NUM_SAMPLES} per question)...")
outputs_majority = llm.generate(prompts_repeated, PARAMS_SAMPLING)

# Collect responses
responses_by_question = [[] for _ in range(len(PROMPTS_HARD))]
for output, q_idx in zip(outputs_majority, question_indices):
    text = output.outputs[0].text
    responses_by_question[q_idx].append(text)

print(f"Generated {NUM_SAMPLES} samples for {len(PROMPTS_HARD)} questions")

In [None]:
# Parse all responses and find majority answer
from collections import Counter

majority_answers = []
is_correct_majority = []

for q_idx, responses in enumerate(responses_by_question):
    # Parse all responses for this question
    parsed_responses = []
    for resp in responses:
        try:
            ans = parse(resp)
        except Exception:
            ans = resp
        parsed_responses.append(str(ans))  # Convert to string for counting
    
    # Find majority answer
    if parsed_responses:
        answer_counts = Counter(parsed_responses)
        majority_answer = answer_counts.most_common(1)[0][0]
        majority_answers.append(majority_answer)
        is_correct_majority.append(verify_answer(GTS_HARD[q_idx], majority_answer))
    else:
        majority_answers.append("")
        is_correct_majority.append(0)

accuracy_majority = sum(is_correct_majority) / len(is_correct_majority) * 100
print(f"Strategy 1 - Majority Vote Accuracy: {accuracy_majority:.2f}% ({sum(is_correct_majority)}/{len(is_correct_majority)})")

### Strategy 2: Chain-of-Thought with Self-Verification
Ask model to solve, then verify its own work step-by-step

In [None]:
# Strategy 2: Chain-of-thought with self-verification
COT_PROMPT_TEMPLATE = """{question}

Let's approach this step-by-step:
1) First, identify what the problem is asking
2) Work through the solution carefully
3) Check your work by verifying each step
4) Provide your final answer

Please put your final answer inside \\boxed{{}}."""

PROMPTS_COT = [COT_PROMPT_TEMPLATE.format(question=q) for q in df_hard["question"].to_list()]

print("Generating Chain-of-Thought responses...")
outputs_cot = llm.generate(PROMPTS_COT, PARAMS)

RESP_COT = [output.outputs[0].text for output in outputs_cot]
print(f"Completed CoT generation for {len(RESP_COT)} questions")

In [None]:
# Evaluate CoT results
is_correct_cot, accuracy_cot, num_correct_cot, total_cot = evaluate_responses(RESP_COT, GTS_HARD)
print(f"Strategy 2 - Chain-of-Thought Accuracy: {accuracy_cot:.2f}% ({num_correct_cot}/{total_cot})")

### Strategy 3: Multi-Step Verification with Reflection
Generate answer, then ask model to verify and correct if needed (like 2-round but with explicit verification prompt)

In [None]:
# Strategy 3: Multi-step with explicit verification
# Step 1: Generate initial solution
print("Step 1: Generating initial solutions...")
outputs_step1 = llm.generate(PROMPTS_HARD, PARAMS)
RESP_STEP1 = [output.outputs[0].text for output in outputs_step1]

# Step 2: Ask model to verify and critique its own work
VERIFY_PROMPT_TEMPLATE = """Original Question: {question}

Proposed Solution: {solution}

Now, please act as a mathematical reviewer. Carefully verify the solution above:
1) Check if all steps are logically sound
2) Verify all calculations are correct
3) Confirm the final answer makes sense given the problem
4) If you find any errors, provide the corrected solution

Provide your final verified answer inside \\boxed{{}}."""

PROMPTS_VERIFY = []
for q, sol in zip(df_hard["question"].to_list(), RESP_STEP1):
    PROMPTS_VERIFY.append(VERIFY_PROMPT_TEMPLATE.format(question=q, solution=sol))

print("Step 2: Verifying and correcting solutions...")
outputs_verify = llm.generate(PROMPTS_VERIFY, PARAMS)
RESP_VERIFIED = [output.outputs[0].text for output in outputs_verify]

print(f"Completed multi-step verification for {len(RESP_VERIFIED)} questions")

In [None]:
# Evaluate verification strategy results
is_correct_verified, accuracy_verified, num_correct_verified, total_verified = evaluate_responses(RESP_VERIFIED, GTS_HARD)
print(f"Strategy 3 - Multi-Step Verification Accuracy: {accuracy_verified:.2f}% ({num_correct_verified}/{total_verified})")

### Strategy 4: Ensemble - Best of All Strategies
For each question, use the answer from whichever strategy got it correct (simulating picking the best approach)

In [None]:
# Strategy 4: Ensemble - use any strategy that got it correct
is_correct_ensemble = []
for idx in range(len(GTS_HARD)):
    # If any strategy got it correct, mark as correct
    strategies_correct = [
        is_correct_majority[idx],
        is_correct_cot[idx],
        is_correct_verified[idx]
    ]
    is_correct_ensemble.append(1 if any(strategies_correct) else 0)

accuracy_ensemble = sum(is_correct_ensemble) / len(is_correct_ensemble) * 100
print(f"Strategy 4 - Ensemble (Any Correct) Accuracy: {accuracy_ensemble:.2f}% ({sum(is_correct_ensemble)}/{len(is_correct_ensemble)})")

### Results Summary

In [None]:
# Compare all strategies
results_summary = pd.DataFrame({
    'Strategy': ['Majority Vote (5 samples)', 'Chain-of-Thought', 'Multi-Step Verification', 'Ensemble (Any Correct)'],
    'Accuracy': [accuracy_majority, accuracy_cot, accuracy_verified, accuracy_ensemble],
    'Correct': [sum(is_correct_majority), sum(is_correct_cot), sum(is_correct_verified), sum(is_correct_ensemble)],
    'Total': [len(is_correct_majority), len(is_correct_cot), len(is_correct_verified), len(is_correct_ensemble)]
})

print("\n" + "="*60)
print("HARD QUESTIONS STRATEGY COMPARISON")
print("="*60)
print(results_summary.to_string(index=False))
print("="*60)

In [None]:
# Analyze which questions each strategy solved
strategy_comparison = pd.DataFrame({
    'question_idx': range(len(GTS_HARD)),
    'predicted_difficulty': df_hard['predicted_difficulty_sigmoid'].values,
    'majority_vote': is_correct_majority,
    'chain_of_thought': is_correct_cot,
    'verification': is_correct_verified,
    'any_correct': is_correct_ensemble
})

# Count unique solutions per strategy
strategy_comparison['num_strategies_correct'] = (
    strategy_comparison['majority_vote'] + 
    strategy_comparison['chain_of_thought'] + 
    strategy_comparison['verification']
)

print("\nQuestions solved by number of strategies:")
print(strategy_comparison['num_strategies_correct'].value_counts().sort_index())

# Show questions that only one strategy solved
unique_solutions = strategy_comparison[strategy_comparison['num_strategies_correct'] == 1]
if len(unique_solutions) > 0:
    print(f"\n{len(unique_solutions)} questions solved by only ONE strategy (showing strategy complementarity)")

In [None]:
strategy_comparison

### Optional: Using a Stronger Model as Verifier

If you have access to a stronger model (e.g., Qwen2.5-Math-7B), you can load it to verify answers from the weaker model

In [None]:
STRONG_MODEL_NAME = "Qwen/Qwen2.5-Math-7B-Instruct"
llm_strong = LLM(model=STRONG_MODEL_NAME, gpu_memory_utilization=0.9)

VERIFIER_PROMPT_TEMPLATE = """Question: {question}

A student provided this solution:
{weak_solution}

As an expert mathematician, please:
1) Verify if the solution is correct
2) If incorrect, provide the correct solution
3) If correct but can be improved, provide a clearer version

Please put your final answer inside \\boxed{{}}."""

# Use strong model to verify weak model's answers
PROMPTS_STRONG_VERIFY = []
for q, weak_sol in zip(df_hard["question"].to_list(), RESP_STEP1):
    prompt = VERIFIER_PROMPT_TEMPLATE.format(question=q, weak_solution=weak_sol)
    PROMPTS_STRONG_VERIFY.append(prompt)

print("Using stronger model to verify solutions...")
outputs_strong = llm_strong.generate(PROMPTS_STRONG_VERIFY, PARAMS)
RESP_STRONG_VERIFIED = [output.outputs[0].text for output in outputs_strong]

# Evaluate strong model verification
is_correct_strong = []
for idx, response in enumerate(RESP_STRONG_VERIFIED):
    try:
        ans = parse(response)
    except Exception:
        ans = response
    try:
        if verify(parse(f"${GTS_HARD[idx]}$"), ans):
            is_correct_strong.append(1)
        else:
            is_correct_strong.append(0)
    except Exception:
        is_correct_strong.append(0)

accuracy_strong = sum(is_correct_strong) / len(is_correct_strong) * 100
print(f"Strong Model Verification Accuracy: {accuracy_strong:.2f}% ({sum(is_correct_strong)}/{len(is_correct_strong)})")


In [None]:
## Agent-Based Model Routing Framework

Use a router agent to decide which model/strategy to use based on question difficulty

In [None]:
from dataclasses import dataclass
from typing import Literal, Optional, Callable
from enum import Enum

class DifficultyLevel(Enum):
    EASY = "easy"
    MEDIUM = "medium"
    HARD = "hard"
    VERY_HARD = "very_hard"

class SolverStrategy(Enum):
    DIRECT = "direct"
    COT = "chain_of_thought"
    MULTI_SAMPLE = "multi_sample"
    SELF_VERIFY = "self_verify"
    STRONG_MODEL = "strong_model"

@dataclass
class SolverConfig:
    """Configuration for a solver strategy"""
    strategy: SolverStrategy
    model_name: str
    temperature: float
    num_samples: int = 1
    use_verification: bool = False
    
@dataclass
class Question:
    """Question with metadata"""
    text: str
    ground_truth: str
    difficulty_score: float
    index: int
    
@dataclass
class SolverResult:
    """Result from solving a question"""
    question_idx: int
    strategy_used: SolverStrategy
    response: str
    is_correct: int
    difficulty_level: DifficultyLevel
    num_attempts: int = 1

In [None]:
class DifficultyRouter:
    """Routes questions to appropriate solving strategies based on difficulty.
    
    Note: difficulty_score is a SUCCESS RATE prediction (0-1):
    - Higher values (e.g., 0.8) = EASIER questions (high predicted success)
    - Lower values (e.g., 0.2) = HARDER questions (low predicted success)
    """
    
    def __init__(self, very_hard_threshold=0.15, hard_threshold=0.4, medium_threshold=0.7):
        """
        Args:
            very_hard_threshold: Below this success rate = very hard (use strong model)
            hard_threshold: Below this success rate = hard (use multi-sample)
            medium_threshold: Below this success rate = medium (use CoT)
            Above medium_threshold = easy (direct solve)
        """
        self.very_hard_threshold = very_hard_threshold
        self.hard_threshold = hard_threshold
        self.medium_threshold = medium_threshold
        
    def classify_difficulty(self, difficulty_score: float) -> DifficultyLevel:
        """Classify question difficulty based on predicted success rate.
        
        Lower success rate = harder question
        """
        if difficulty_score < self.very_hard_threshold:
            return DifficultyLevel.VERY_HARD
        elif difficulty_score < self.hard_threshold:
            return DifficultyLevel.HARD
        elif difficulty_score < self.medium_threshold:
            return DifficultyLevel.MEDIUM
        else:
            return DifficultyLevel.EASY
    
    def route(self, question: Question) -> SolverConfig:
        """Route question to appropriate solver based on difficulty"""
        difficulty = self.classify_difficulty(question.difficulty_score)
        
        if difficulty == DifficultyLevel.EASY:
            # Easy questions (high success rate): direct solve with weak model
            return SolverConfig(
                strategy=SolverStrategy.DIRECT,
                model_name="weak",
                temperature=0.0,
                num_samples=1,
                use_verification=False
            )
        elif difficulty == DifficultyLevel.MEDIUM:
            # Medium questions (moderate success rate): CoT with weak model
            return SolverConfig(
                strategy=SolverStrategy.COT,
                model_name="weak",
                temperature=0.0,
                num_samples=1,
                use_verification=False
            )
        elif difficulty == DifficultyLevel.HARD:
            # Hard questions (low success rate): Multi-sample with verification
            return SolverConfig(
                strategy=SolverStrategy.MULTI_SAMPLE,
                model_name="weak",
                temperature=0.7,
                num_samples=5,
                use_verification=True
            )
        else:  # VERY_HARD
            # Very hard (very low success rate): Use strong model with verification
            return SolverConfig(
                strategy=SolverStrategy.STRONG_MODEL,
                model_name="strong",
                temperature=0.0,
                num_samples=1,
                use_verification=True
            )

# Initialize router with corrected thresholds (lower = harder)
router = DifficultyRouter(very_hard_threshold=0.15, hard_threshold=0.4, medium_threshold=0.7)
print("Difficulty Router initialized with SUCCESS RATE thresholds:")
print(f"  Very Hard: success rate < {0.15}")
print(f"  Hard: success rate < {0.4}")
print(f"  Medium: success rate < {0.7}")
print(f"  Easy: success rate >= {0.7}")

In [None]:
class SolverAgent:
    """Agent that executes different solving strategies"""
    
    def __init__(self, weak_model: LLM, strong_model: Optional[LLM] = None):
        self.weak_model = weak_model
        self.strong_model = strong_model
        self.cot_template = """{question}

Let's approach this step-by-step:
1) First, identify what the problem is asking
2) Work through the solution carefully
3) Check your work by verifying each step
4) Provide your final answer

Please put your final answer inside \\boxed{{}}."""

        self.verify_template = """Original Question: {question}

Proposed Solution: {solution}

Now, please act as a mathematical reviewer. Carefully verify the solution above:
1) Check if all steps are logically sound
2) Verify all calculations are correct
3) Confirm the final answer makes sense given the problem
4) If you find any errors, provide the corrected solution

Provide your final verified answer inside \\boxed{{}}."""

    def solve_direct(self, question: Question, config: SolverConfig) -> str:
        """Direct solve without special prompting"""
        prompt = question.text + " Please put your final answer inside \\boxed{}."
        params = SamplingParams(temperature=config.temperature, max_tokens=3000)
        model = self.strong_model if config.model_name == "strong" else self.weak_model
        output = model.generate([prompt], params)[0]
        return output.outputs[0].text
    
    def solve_cot(self, question: Question, config: SolverConfig) -> str:
        """Solve with chain-of-thought prompting"""
        prompt = self.cot_template.format(question=question.text)
        params = SamplingParams(temperature=config.temperature, max_tokens=3000)
        model = self.strong_model if config.model_name == "strong" else self.weak_model
        output = model.generate([prompt], params)[0]
        return output.outputs[0].text
    
    def solve_multi_sample(self, question: Question, config: SolverConfig) -> str:
        """Solve with multiple samples and majority vote"""
        prompt = question.text + " Please put your final answer inside \\boxed{}."
        params = SamplingParams(temperature=config.temperature, max_tokens=3000, top_p=0.95)
        model = self.strong_model if config.model_name == "strong" else self.weak_model
        
        # Generate multiple samples
        prompts = [prompt] * config.num_samples
        outputs = model.generate(prompts, params)
        
        # Parse and find majority
        from collections import Counter
        parsed_answers = []
        for output in outputs:
            try:
                ans = parse(output.outputs[0].text)
            except:
                ans = output.outputs[0].text
            parsed_answers.append(str(ans))
        
        # Return majority answer (or first if no clear majority)
        if parsed_answers:
            majority = Counter(parsed_answers).most_common(1)[0][0]
            # Return the full response of the first occurrence of majority answer
            for i, ans in enumerate(parsed_answers):
                if ans == majority:
                    return outputs[i].outputs[0].text
        return outputs[0].outputs[0].text
    
    def solve_with_verification(self, question: Question, config: SolverConfig, initial_response: str) -> str:
        """Verify and potentially correct an initial solution"""
        prompt = self.verify_template.format(question=question.text, solution=initial_response)
        params = SamplingParams(temperature=0.0, max_tokens=3000)
        model = self.strong_model if config.model_name == "strong" else self.weak_model
        output = model.generate([prompt], params)[0]
        return output.outputs[0].text
    
    def solve(self, question: Question, config: SolverConfig) -> str:
        """Main solve method that routes to appropriate strategy"""
        # Get initial solution
        if config.strategy == SolverStrategy.DIRECT:
            response = self.solve_direct(question, config)
        elif config.strategy == SolverStrategy.COT:
            response = self.solve_cot(question, config)
        elif config.strategy == SolverStrategy.MULTI_SAMPLE:
            response = self.solve_multi_sample(question, config)
        elif config.strategy == SolverStrategy.STRONG_MODEL:
            response = self.solve_direct(question, config)
        else:
            response = self.solve_direct(question, config)
        
        # Apply verification if configured
        if config.use_verification:
            response = self.solve_with_verification(question, config, response)
        
        return response

# Initialize agent with models
agent = SolverAgent(weak_model=llm, strong_model=llm_strong if 'llm_strong' in locals() else None)
print("Solver Agent initialized")

In [None]:
class AgenticSolver:
    """Main orchestrator that coordinates routing and solving"""
    
    def __init__(self, router: DifficultyRouter, agent: SolverAgent):
        self.router = router
        self.agent = agent
        self.results = []
        
    def solve_question(self, question: Question) -> SolverResult:
        """Solve a single question using agent-based routing"""
        # Route to appropriate strategy
        config = self.router.route(question)
        difficulty = self.router.classify_difficulty(question.difficulty_score)
        
        # Solve using selected strategy
        response = self.agent.solve(question, config)
        
        # Verify answer
        is_correct = verify_answer(question.ground_truth, response)
        
        # Create result
        result = SolverResult(
            question_idx=question.index,
            strategy_used=config.strategy,
            response=response,
            is_correct=is_correct,
            difficulty_level=difficulty,
            num_attempts=config.num_samples
        )
        
        return result
    
    def solve_batch(self, questions: list[Question], verbose: bool = True) -> list[SolverResult]:
        """Solve a batch of questions"""
        results = []
        
        for question in tqdm(questions, desc="Solving with agent", disable=not verbose):
            result = self.solve_question(question)
            results.append(result)
        
        self.results = results
        return results
    
    def get_summary(self) -> pd.DataFrame:
        """Get summary statistics of solving results"""
        if not self.results:
            return pd.DataFrame()
        
        summary_data = []
        for difficulty_level in DifficultyLevel:
            level_results = [r for r in self.results if r.difficulty_level == difficulty_level]
            if level_results:
                accuracy = sum(r.is_correct for r in level_results) / len(level_results) * 100
                strategy_counts = {}
                for r in level_results:
                    strategy_counts[r.strategy_used.value] = strategy_counts.get(r.strategy_used.value, 0) + 1
                
                summary_data.append({
                    'Difficulty': difficulty_level.value,
                    'Count': len(level_results),
                    'Accuracy': accuracy,
                    'Correct': sum(r.is_correct for r in level_results),
                    'Primary_Strategy': max(strategy_counts, key=strategy_counts.get),
                })
        
        return pd.DataFrame(summary_data)

# Initialize the agentic solver
agentic_solver = AgenticSolver(router=router, agent=agent)
print("Agentic Solver initialized and ready")

### Run Agentic Solver on Questions

In [None]:
# Prepare questions from dataframe
questions = []
for idx, row in df.iterrows():
    question = Question(
        text=row['question'],
        ground_truth=row['answer'],
        difficulty_score=row['predicted_difficulty_sigmoid'],
        index=idx
    )
    questions.append(question)

print(f"Prepared {len(questions)} questions for agentic solving")

# Show routing distribution
difficulty_counts = {}
for q in questions:
    config = router.route(q)
    difficulty = router.classify_difficulty(q.difficulty_score)
    key = f"{difficulty.value} -> {config.strategy.value}"
    difficulty_counts[key] = difficulty_counts.get(key, 0) + 1

print("\nRouting distribution:")
for key, count in sorted(difficulty_counts.items()):
    print(f"  {key}: {count} questions")

In [None]:
# Solve questions using agentic approach
print("Starting agentic solving...")
agentic_results = agentic_solver.solve_batch(questions, verbose=True)
print(f"\nCompleted solving {len(agentic_results)} questions")

In [None]:
# Get summary by difficulty level
summary_df = agentic_solver.get_summary()

print("\n" + "="*70)
print("AGENTIC SOLVER RESULTS BY DIFFICULTY")
print("="*70)
print(summary_df.to_string(index=False))
print("="*70)

# Overall accuracy
total_correct = sum(r.is_correct for r in agentic_results)
total_questions = len(agentic_results)
overall_accuracy = total_correct / total_questions * 100
print(f"\nOverall Accuracy: {overall_accuracy:.2f}% ({total_correct}/{total_questions})")

In [None]:
# Detailed results dataframe
agentic_results_df = pd.DataFrame([
    {
        'question_idx': r.question_idx,
        'difficulty_score': questions[r.question_idx].difficulty_score,
        'difficulty_level': r.difficulty_level.value,
        'strategy': r.strategy_used.value,
        'is_correct': r.is_correct,
        'num_attempts': r.num_attempts
    }
    for r in agentic_results
])

agentic_results_df

### Visualize Agent Performance

In [None]:
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Accuracy by difficulty level
summary_df_sorted = summary_df.sort_values('Difficulty')
axes[0].bar(summary_df_sorted['Difficulty'], summary_df_sorted['Accuracy'])
axes[0].set_xlabel('Difficulty Level')
axes[0].set_ylabel('Accuracy (%)')
axes[0].set_title('Agentic Solver Accuracy by Difficulty')
axes[0].set_ylim([0, 100])
for i, row in summary_df_sorted.iterrows():
    axes[0].text(i, row['Accuracy'] + 2, f"{row['Accuracy']:.1f}%", ha='center')

# Plot 2: Strategy distribution
strategy_counts = agentic_results_df['strategy'].value_counts()
axes[1].bar(strategy_counts.index, strategy_counts.values)
axes[1].set_xlabel('Strategy')
axes[1].set_ylabel('Number of Questions')
axes[1].set_title('Strategy Usage Distribution')
axes[1].tick_params(axis='x', rotation=45)
for i, (strategy, count) in enumerate(strategy_counts.items()):
    axes[1].text(i, count + 0.5, str(count), ha='center')

plt.tight_layout()
plt.show()

print("Agent performance visualized")

### Compare Agentic vs Fixed Strategy

In [None]:
# Compare agentic solver to baseline (direct solving)
print("Comparison of Agentic vs Baseline Approach\n")

# Baseline: original direct solve (from earlier in notebook)
if 'is_correct' in locals():
    baseline_accuracy = sum(is_correct) / len(is_correct) * 100
    baseline_correct = sum(is_correct)
    baseline_total = len(is_correct)
else:
    baseline_accuracy = 0
    baseline_correct = 0
    baseline_total = 0

# Agentic
agentic_accuracy = overall_accuracy
agentic_correct = total_correct
agentic_total = total_questions

comparison_data = {
    'Approach': ['Baseline (Direct)', 'Agentic (Adaptive)'],
    'Accuracy (%)': [baseline_accuracy, agentic_accuracy],
    'Correct': [baseline_correct, agentic_correct],
    'Total': [baseline_total, agentic_total],
    'Improvement': [0, agentic_accuracy - baseline_accuracy]
}

comparison_table = pd.DataFrame(comparison_data)
print(comparison_table.to_string(index=False))

print(f"\n✓ Agentic approach improves accuracy by {agentic_accuracy - baseline_accuracy:.2f} percentage points")
print(f"✓ Routes questions intelligently based on difficulty")
print(f"✓ Uses stronger strategies only when needed (cost-efficient)")