# Pramana Evaluation Notebook

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/SharathSPhD/pramana/blob/main/notebooks/02_pramana_evaluation.ipynb)

Comprehensive evaluation of Pramana models across multiple tiers:
- Tier 1: Structural evaluation (format adherence)
- Tier 2: Content quality evaluation
- Answer correctness (exact/normalized/semantic matching)
- Cross-stage comparison
- Failure analysis

## 1. Setup & Configuration

In [None]:
#@title Setup: Clone repo, install deps, detect GPU (double-click to show code)
import os, sys, subprocess
from pathlib import Path
from IPython.display import display, HTML

# ---- Clone repo (skip if already cloned or running locally) ----
REPO_URL = "https://github.com/SharathSPhD/pramana.git"
REPO_DIR = "pramana"

try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    if not Path(REPO_DIR).exists():
        print("Cloning repository...")
        subprocess.run(["git", "clone", REPO_URL], check=True, capture_output=True)
        print("  Done.")
    os.chdir(REPO_DIR)
    subprocess.run([sys.executable, "-m", "pip", "install", "-q", "-r", "notebooks/requirements.txt"],
                   check=True, capture_output=True)
    sys.path.insert(0, "notebooks")
else:
    # Local: pramana_backend.py should be in the same dir or parent
    if Path("pramana_backend.py").exists():
        sys.path.insert(0, ".")
    elif Path("notebooks/pramana_backend.py").exists():
        sys.path.insert(0, "notebooks")
    else:
        for p in [Path(".."), Path("../notebooks")]:
            if (p / "pramana_backend.py").exists():
                sys.path.insert(0, str(p.resolve()))
                break

# ---- GPU Detection ----
def check_backend():
    """Detect GPU/CPU and display status banner."""
    gpu_name, gpu_mem = None, None
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"],
            capture_output=True, text=True, timeout=5,
        )
        if result.returncode == 0 and result.stdout.strip():
            parts = result.stdout.strip().split(", ")
            gpu_name = parts[0]
            gpu_mem = parts[1] if len(parts) > 1 else "?"
    except Exception:
        pass

    if gpu_name:
        hw = f"<b>GPU: {gpu_name} ({gpu_mem} MB)</b>"
        color = "green"
    else:
        hw = "<b>CPU</b>"
        color = "orange"

    env = "Google Colab" if IN_COLAB else "Local"
    banner = (
        f"<div style='padding:10px;border-radius:8px;border:2px solid {color};margin:8px 0'>"
        f"<span style='font-size:1.2em'>Runtime: <span style='color:{color}'>{hw}</span></span>"
        f"<br>Environment: {env}"
    )
    if not gpu_name:
        banner += "<br><i>For better performance: Runtime -> Change runtime type -> GPU</i>"
    banner += "</div>"
    display(HTML(banner))
    return gpu_name is not None

GPU_AVAILABLE = check_backend()
print(f"Working directory: {os.getcwd()}")

In [None]:
#@title Imports (double-click to show code)
import os
import sys
import re
from pathlib import Path
import json
from datetime import datetime
from typing import Any, Dict, List

from pramana_backend import (
    STAGE_CONFIGS,
    OLLAMA_MODEL_MAP,
    build_user_prompt,
    create_backend,
    EXAMPLE_PROBLEMS,
    load_test_problems,
    parse_nyaya_phases,
    validate_structure,
    score_content_quality,
    extract_final_answer,
    normalize_text,
    token_overlap_ratio,
    score_answers,
    wilson_interval,
    setup_ollama,
    setup_ollama_stage,
    download_gguf,
)

print("\u2713 All modules imported")

In [None]:
#@title Evaluation configuration (double-click to show code)
# Evaluation configuration
# Data source: "embedded" (built-in examples), "huggingface" (download from HF), or a local directory path
DATA_SOURCE = "embedded"
RESULTS_DIR = Path("./results")
OUTPUT_DIR = Path("./results/evaluation")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Evaluation tiers to run (1=structural, 2=content quality, 3=Z3 verification)
EVAL_TIERS = [1, 2]  # Skip Tier 3 (Z3) unless needed

# ---- Generation hyperparameters ----
# These match the HF Space app controls. On local hardware (not ZeroGPU),
# you can use much larger token lengths to avoid truncating Nyaya output.
BATCH_SIZE = 4        # Process examples in batches
MAX_TOKENS = 2048     # Maximum generation length (raised from 1024 for local hardware)
TEMPERATURE = 0.5     # Sampling temperature (0 = deterministic)
TOP_P = 0.75          # Nucleus sampling parameter
TOP_K = 5             # Top-k sampling parameter (0 = disabled)
TIMEOUT_SECONDS = 120 # Timeout per generation (raised for longer outputs)

# Stages to evaluate
STAGES_TO_EVALUATE = ["Stage 0", "Stage 1"]

print(f"✓ Data source: {DATA_SOURCE}")
print(f"✓ Results directory: {OUTPUT_DIR}")
print(f"✓ Evaluation tiers: {EVAL_TIERS}")
print(f"✓ Generation hyperparameters: max_tokens={MAX_TOKENS}, temperature={TEMPERATURE}, top_p={TOP_P}, top_k={TOP_K}")
print(f"✓ Stages to evaluate: {STAGES_TO_EVALUATE}")

In [None]:
#@title Initialize backend (double-click to show code)
# Change BACKEND_TYPE to switch inference backends.
#   "ollama"       - Ollama server (recommended, auto-setup available)
#   "llamacpp"     - llama-cpp-python (direct GGUF loading)
#   "transformers" - HuggingFace Transformers (requires GPU + pip install transformers torch)

BACKEND_TYPE = "ollama"  # <-- Change this to your preferred backend

# Backend-specific configuration
OLLAMA_URL = "http://localhost:11434"
GGUF_PATH = "/tmp/nyaya-llama-3b-stage0.gguf"

# Auto-setup: install and configure the selected backend
if BACKEND_TYPE == "ollama":
    for _stage in STAGES_TO_EVALUATE:
        print(f"\n--- Setting up Ollama for {_stage} ---")
        try:
            _result = setup_ollama_stage(_stage, base_url=OLLAMA_URL)
        except Exception as e:
            print(f"  Setup error for {_stage}: {e}")

elif BACKEND_TYPE == "llamacpp":
    if not Path(GGUF_PATH).exists():
        print("GGUF not found. Auto-downloading...")
        try:
            GGUF_PATH = download_gguf()
            print(f"GGUF ready at: {GGUF_PATH}")
        except Exception as e:
            print(f"Auto-download failed: {e}")
    else:
        print(f"GGUF found at: {GGUF_PATH}")

elif BACKEND_TYPE == "transformers":
    import subprocess as _sp
    _sp.check_call([sys.executable, "-m", "pip", "install", "-q", "transformers", "torch", "accelerate", "peft"])

backend_cache = {}

def get_backend(stage_name: str, role: str = "tuned"):
    """Get or create backend for a stage."""
    cache_key = f"{stage_name}:{role}"
    if cache_key not in backend_cache:
        try:
            stage_config = STAGE_CONFIGS[stage_name]
            if BACKEND_TYPE == "transformers":
                model_id = stage_config.base_model_id if role == "base" else stage_config.tuned_model_id
                backend_cache[cache_key] = create_backend("transformers", model_id=model_id)
            elif BACKEND_TYPE == "ollama":
                model_map = OLLAMA_MODEL_MAP.get(stage_name, {})
                model_name = model_map.get(role)
                if model_name is None:
                    print(f"  Warning: No Ollama model for {stage_name} role={role}")
                    backend_cache[cache_key] = None
                else:
                    backend_cache[cache_key] = create_backend("ollama", model_name=model_name, base_url=OLLAMA_URL)
            elif BACKEND_TYPE == "llamacpp":
                backend_cache[cache_key] = create_backend("llamacpp", model_path=GGUF_PATH)
            else:
                raise ValueError(f"Unknown backend: {BACKEND_TYPE}")
        except Exception as e:
            print(f"Warning: Could not create backend for {stage_name} ({role}): {e}")
            backend_cache[cache_key] = None
    return backend_cache[cache_key]

def get_model_label(stage_name: str, role: str = "tuned") -> str:
    if BACKEND_TYPE == "ollama":
        return OLLAMA_MODEL_MAP.get(stage_name, {}).get(role, "unknown")
    stage_config = STAGE_CONFIGS[stage_name]
    return stage_config.base_model_id if role == "base" else stage_config.tuned_model_id

_env = "Colab VM" if IN_COLAB else "Local"
print(f"\nBackend: {BACKEND_TYPE} (running on {_env})")
print("Backend initialization ready")

## 2. Load Test Suite

In [None]:
#@title Load validation examples (self-contained, no external files needed) (double-click to show code)
# Load validation examples (self-contained, no external files needed)
validation_examples = load_test_problems(DATA_SOURCE)
print(f"✓ Loaded {len(validation_examples)} validation examples from '{DATA_SOURCE}'")

# Display example summary
for i, ex in enumerate(validation_examples):
    print(f"  {i+1}. [{ex['id']}] {ex['problem_type']} ({ex['difficulty']})")

In [None]:
#@title Optionally load from HuggingFace datasets (double-click to show code)
# Optionally load from HuggingFace datasets
try:
    from datasets import load_dataset
    
    # Load Stage 0 dataset
    stage0_dataset = load_dataset(
        STAGE_CONFIGS["Stage 0"].dataset_repo_id,
        split="test",
        token=HF_TOKEN
    )
    print(f"✓ Loaded Stage 0 dataset: {len(stage0_dataset)} examples")
except Exception as e:
    print(f"Note: Could not load HF dataset: {e}")
    stage0_dataset = None

In [None]:
#@title Display problem summary table (double-click to show code)
# Display problem summary table
import pandas as pd

problem_summary = pd.DataFrame([
    {
        "ID": ex["id"],
        "Problem Type": ex["problem_type"],
        "Difficulty": ex.get("difficulty", "unknown"),
        "Has Ground Truth": bool(ex.get("ground_truth")),
        "Problem Preview": ex["problem"][:60] + "..." if len(ex["problem"]) > 60 else ex["problem"]
    }
    for ex in validation_examples
])

display(problem_summary)
print(f"\nTotal problems: {len(validation_examples)}")
print(f"Problem types: {problem_summary['Problem Type'].value_counts().to_dict()}")

## 3. Batch Generation

In [None]:
#@title Run inference over test suite (double-click to show code)
# Run inference over test suite
import time
from tqdm import tqdm
import signal

def generate_with_timeout(backend, prompt: str, system_prompt: str, timeout: int) -> str:
    """Generate with timeout handling."""
    def timeout_handler(signum, frame):
        raise TimeoutError(f"Generation exceeded {timeout}s")
    
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)
    
    try:
        if backend:
            output = backend.generate(
                prompt,
                system_prompt=system_prompt,
                max_new_tokens=MAX_TOKENS,
                temperature=TEMPERATURE,
                top_p=TOP_P,
                top_k=TOP_K,
            )
        else:
            # Fallback: direct model loading
            output = "[Backend not available]"
        signal.alarm(0)
        return output
    except TimeoutError:
        signal.alarm(0)
        return "[TIMEOUT]"
    except Exception as e:
        signal.alarm(0)
        return f"[ERROR: {str(e)[:50]}]"

def run_batch_generation(stage_name: str, examples: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Run batch generation for a stage."""
    stage_config = STAGE_CONFIGS[stage_name]
    backend = get_backend(stage_name)
    
    results = []
    
    for ex in tqdm(examples, desc=f"Generating ({stage_name})"):
        problem = ex["problem"]
        user_prompt = build_user_prompt(problem)
        
        start_time = time.time()
        generated_output = generate_with_timeout(
            backend,
            user_prompt,
            stage_config.system_prompt,
            TIMEOUT_SECONDS
        )
        generation_time = time.time() - start_time
        
        results.append({
            "example_id": ex["id"],
            "stage": stage_name,
            "problem": problem,
            "generated_output": generated_output,
            "generation_time": generation_time,
            "ground_truth": ex.get("ground_truth", ""),
            "problem_type": ex["problem_type"],
        })
    
    return results

print("✓ Batch generation functions ready")

In [None]:
#@title Generate outputs for all stages (double-click to show code)
# Generate outputs for all stages
all_generation_results = {}

for stage_name in STAGES_TO_EVALUATE:
    print(f"\n{'='*60}")
    print(f"Generating outputs for {stage_name}")
    print(f"{'='*60}")
    
    stage_results = run_batch_generation(stage_name, validation_examples)
    all_generation_results[stage_name] = stage_results
    
    # Cache to JSON
    cache_file = OUTPUT_DIR / f"generations_{stage_name.lower().replace(' ', '_')}.json"
    with open(cache_file, "w") as f:
        json.dump(stage_results, f, indent=2, default=str)
    print(f"✓ Cached to {cache_file}")

print(f"\n✓ Completed generation for {len(STAGES_TO_EVALUATE)} stages")

In [None]:
#@title Load cached results if available (for re-running evaluation without regeneration) (double-click to show code)
# Load cached results if available (for re-running evaluation without regeneration)
def load_cached_generations() -> Dict[str, List[Dict[str, Any]]]:
    """Load cached generation results."""
    cached = {}
    for stage_name in STAGES_TO_EVALUATE:
        cache_file = OUTPUT_DIR / f"generations_{stage_name.lower().replace(' ', '_')}.json"
        if cache_file.exists():
            with open(cache_file) as f:
                cached[stage_name] = json.load(f)
            print(f"✓ Loaded cached results for {stage_name}")
    return cached

# Uncomment to use cached results:
# all_generation_results = load_cached_generations()

## 4. Tier 1: Structural Evaluation

In [None]:
#@title Structural evaluation using self-contained validate_structure() (double-click to show code)
# Structural evaluation using self-contained validate_structure()

def evaluate_structural(stage_name: str, generation_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Run Tier 1 structural evaluation (self-contained regex-based)."""
    evaluated = []
    
    for result in generation_results:
        example_id = result["example_id"]
        generated_output = result["generated_output"]
        
        eval_result = {
            "example_id": example_id,
            "stage": stage_name,
            "parse_success": False,
            "format_metrics": {},
            "tier1_result": None,
        }
        
        try:
            # Run self-contained structural validation
            validation = validate_structure(generated_output)
            
            eval_result["parse_success"] = validation["phases_present"] >= 3  # At least half the phases
            eval_result["format_metrics"] = {
                "phase_completeness": validation["phase_details"],
                "num_phases_present": validation["phases_present"],
                "pramana_sources": validation["pramana_sources"],
                "num_syllogisms": validation["syllogism_count"],
            }
            eval_result["tier1_result"] = {
                "passed": validation["passed"],
                "score": validation["phases_present"] / 6.0,
                "errors": validation["errors"],
                "details": validation["phase_details"],
            }
            
        except Exception as e:
            eval_result["parse_error"] = f"Unexpected error: {e}"
        
        evaluated.append(eval_result)
    
    return evaluated

print("✓ Structural evaluation functions ready")

In [None]:
#@title Run Tier 1 evaluation for all stages (double-click to show code)
# Run Tier 1 evaluation for all stages
tier1_results = {}

for stage_name, generation_results in all_generation_results.items():
    print(f"\nEvaluating {stage_name}...")
    tier1_results[stage_name] = evaluate_structural(stage_name, generation_results)

print(f"\n✓ Completed Tier 1 evaluation for {len(tier1_results)} stages")

In [None]:
#@title Format adherence summary table (double-click to show code)
# Format adherence summary table
format_summary_rows = []

for stage_name, results in tier1_results.items():
    total = len(results)
    parse_success = sum(1 for r in results if r.get("parse_success"))
    tier1_passed = sum(1 for r in results if r.get("tier1_result", {}).get("passed"))
    
    if parse_success > 0:
        avg_phases = sum(
            r.get("format_metrics", {}).get("num_phases_present", 0)
            for r in results if r.get("parse_success")
        ) / parse_success
        avg_pramana = sum(
            r.get("format_metrics", {}).get("num_pramana_sources", 0)
            for r in results if r.get("parse_success")
        ) / parse_success
    else:
        avg_phases = 0
        avg_pramana = 0
    
    format_summary_rows.append({
        "Stage": stage_name,
        "Total Examples": total,
        "Parse Success": parse_success,
        "Parse Rate": f"{parse_success/total*100:.1f}%" if total > 0 else "0%",
        "Tier 1 Passed": tier1_passed,
        "Tier 1 Pass Rate": f"{tier1_passed/total*100:.1f}%" if total > 0 else "0%",
        "Avg Phases Present": f"{avg_phases:.1f}/6",
        "Avg Pramana Sources": f"{avg_pramana:.1f}",
    })

format_summary_df = pd.DataFrame(format_summary_rows)
display(format_summary_df)

In [None]:
#@title Format adherence bar chart (double-click to show code)
# Format adherence bar chart
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Parse success rate
stages = format_summary_df["Stage"].tolist()
parse_rates = [float(r.replace("%", "")) for r in format_summary_df["Parse Rate"]]
tier1_rates = [float(r.replace("%", "")) for r in format_summary_df["Tier 1 Pass Rate"]]

axes[0].bar(stages, parse_rates, color="skyblue", alpha=0.7)
axes[0].set_ylabel("Rate (%)")
axes[0].set_title("Parse Success Rate")
axes[0].set_ylim(0, 100)
axes[0].grid(axis="y", alpha=0.3)

axes[1].bar(stages, tier1_rates, color="lightgreen", alpha=0.7)
axes[1].set_ylabel("Rate (%)")
axes[1].set_title("Tier 1 Pass Rate")
axes[1].set_ylim(0, 100)
axes[1].grid(axis="y", alpha=0.3)

plt.tight_layout()
plt.show()

## 5. Tier 2: Content Quality Evaluation

In [None]:
#@title Content quality evaluation using self-contained score_content_quality() (double-click to show code)
# Content quality evaluation using self-contained score_content_quality()

def evaluate_content_quality(stage_name: str, generation_results: List[Dict[str, Any]], tier1_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Run Tier 2 content quality evaluation (self-contained regex-based)."""
    evaluated = []
    
    # Create mapping from example_id to tier1 result
    tier1_map = {r["example_id"]: r for r in tier1_results}
    
    for result in generation_results:
        example_id = result["example_id"]
        tier1_result = tier1_map.get(example_id, {})
        
        eval_result = {
            "example_id": example_id,
            "stage": stage_name,
            "content_quality": None,
        }
        
        # Only evaluate if parsing succeeded
        if tier1_result.get("parse_success"):
            try:
                generated_output = result["generated_output"]
                problem_text = result.get("problem", "")
                
                # Run self-contained content quality scoring
                quality_scores = score_content_quality(generated_output, problem_text)
                
                eval_result["content_quality"] = {
                    "pratyaksha_score": quality_scores["pratyaksha_grounding"],
                    "udaharana_valid": quality_scores["udaharana_valid"] > 0,
                    "tarka_meaningful": quality_scores["tarka_meaningful"] > 0,
                    "hetvabhasa_completeness": quality_scores["hetvabhasa_completeness"],
                    "overall_score": quality_scores["overall"],
                }
            except Exception as e:
                eval_result["content_quality_error"] = str(e)
        
        evaluated.append(eval_result)
    
    return evaluated

print("✓ Content quality evaluation functions ready")

In [None]:
#@title Run Tier 2 evaluation (double-click to show code)
# Run Tier 2 evaluation
tier2_results = {}

for stage_name in STAGES_TO_EVALUATE:
    generation_results = all_generation_results[stage_name]
    stage_tier1_results = tier1_results[stage_name]
    tier2_results[stage_name] = evaluate_content_quality(
        stage_name, generation_results, stage_tier1_results
    )

print(f"✓ Completed Tier 2 evaluation for {len(tier2_results)} stages")

In [None]:
#@title Content quality radar chart (double-click to show code)
# Content quality radar chart
import numpy as np

def plot_content_quality_radar(stage_results: Dict[str, List[Dict[str, Any]]]):
    """Create radar chart comparing content quality across stages."""
    categories = [
        "Pratyaksha Score",
        "Udaharana Valid",
        "Tarka Meaningful",
        "Hetvabhasa Completeness",
        "Overall Score"
    ]
    
    num_categories = len(categories)
    angles = np.linspace(0, 2 * np.pi, num_categories, endpoint=False).tolist()
    angles += angles[:1]  # Complete the circle
    
    fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection="polar"))
    
    for stage_name, results in stage_results.items():
        # Calculate averages
        quality_scores = [r.get("content_quality") for r in results if r.get("content_quality")]
        
        if not quality_scores:
            continue
        
        avg_pratyaksha = np.mean([q["pratyaksha_score"] for q in quality_scores])
        avg_udaharana = np.mean([1.0 if q["udaharana_valid"] else 0.0 for q in quality_scores])
        avg_tarka = np.mean([1.0 if q["tarka_meaningful"] else 0.0 for q in quality_scores])
        avg_hetvabhasa = np.mean([q["hetvabhasa_completeness"] for q in quality_scores])
        avg_overall = np.mean([q["overall_score"] for q in quality_scores])
        
        values = [avg_pratyaksha, avg_udaharana, avg_tarka, avg_hetvabhasa, avg_overall]
        values += values[:1]  # Complete the circle
        
        ax.plot(angles, values, "o-", linewidth=2, label=stage_name)
        ax.fill(angles, values, alpha=0.25)
    
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(categories)
    ax.set_ylim(0, 1)
    ax.set_title("Content Quality Comparison", size=16, fontweight="bold", pad=20)
    ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1))
    ax.grid(True)
    
    plt.tight_layout()
    plt.show()

plot_content_quality_radar(tier2_results)

## 6. Answer Correctness

In [None]:
#@title Answer correctness evaluation using self-contained extract_final_answer() + score_answers() (double-click to show code)
# Answer correctness evaluation using self-contained extract_final_answer() + score_answers()

def evaluate_answer_correctness(
    stage_name: str,
    generation_results: List[Dict[str, Any]],
    tier1_results: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """Evaluate answer correctness using self-contained scoring functions."""
    evaluated = []
    
    # Create mapping from example_id to tier1 result
    tier1_map = {r["example_id"]: r for r in tier1_results}
    
    for result in generation_results:
        example_id = result["example_id"]
        ground_truth = result.get("ground_truth", "")
        tier1_result = tier1_map.get(example_id, {})
        
        eval_result = {
            "example_id": example_id,
            "stage": stage_name,
            "answer_scores": None,
        }
        
        # Only evaluate if we have ground truth and parsing succeeded
        if ground_truth and tier1_result.get("parse_success"):
            try:
                generated_output = result["generated_output"]
                
                # Extract final answer using self-contained function
                predicted_answer = extract_final_answer(generated_output) or ""
                
                # Score answers
                scores = score_answers(predicted_answer, ground_truth)
                eval_result["answer_scores"] = scores
                
            except Exception as e:
                eval_result["answer_error"] = str(e)
        
        evaluated.append(eval_result)
    
    return evaluated

print("✓ Answer correctness evaluation functions ready")

In [None]:
#@title Run answer correctness evaluation (double-click to show code)
# Run answer correctness evaluation
answer_results = {}

for stage_name in STAGES_TO_EVALUATE:
    generation_results = all_generation_results[stage_name]
    stage_tier1_results = tier1_results[stage_name]
    answer_results[stage_name] = evaluate_answer_correctness(
        stage_name, generation_results, stage_tier1_results
    )

print(f"✓ Completed answer correctness evaluation for {len(answer_results)} stages")

In [None]:
#@title Calculate Wilson confidence intervals (double-click to show code)
# Calculate Wilson confidence intervals
def calculate_accuracy_metrics(answer_results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Calculate accuracy metrics with Wilson confidence intervals."""
    total = len(answer_results)
    
    exact_matches = sum(
        1 for r in answer_results
        if r.get("answer_scores", {}).get("exact_match")
    )
    normalized_matches = sum(
        1 for r in answer_results
        if r.get("answer_scores", {}).get("normalized_match")
    )
    semantic_matches = sum(
        1 for r in answer_results
        if r.get("answer_scores", {}).get("semantic_match")
    )
    
    exact_ci = wilson_interval(successes=exact_matches, total=total)
    normalized_ci = wilson_interval(successes=normalized_matches, total=total)
    semantic_ci = wilson_interval(successes=semantic_matches, total=total)
    
    avg_similarity = np.mean([
        r.get("answer_scores", {}).get("semantic_similarity", 0.0)
        for r in answer_results
        if r.get("answer_scores")
    ]) if any(r.get("answer_scores") for r in answer_results) else 0.0
    
    return {
        "total": total,
        "exact_matches": exact_matches,
        "exact_rate": exact_matches / total if total > 0 else 0.0,
        "exact_ci": exact_ci,
        "normalized_matches": normalized_matches,
        "normalized_rate": normalized_matches / total if total > 0 else 0.0,
        "normalized_ci": normalized_ci,
        "semantic_matches": semantic_matches,
        "semantic_rate": semantic_matches / total if total > 0 else 0.0,
        "semantic_ci": semantic_ci,
        "avg_semantic_similarity": avg_similarity,
    }

# Calculate metrics for each stage
accuracy_metrics = {}
for stage_name, results in answer_results.items():
    accuracy_metrics[stage_name] = calculate_accuracy_metrics(results)

# Display summary table
accuracy_summary_rows = []
for stage_name, metrics in accuracy_metrics.items():
    accuracy_summary_rows.append({
        "Stage": stage_name,
        "Total": metrics["total"],
        "Exact Match": f"{metrics['exact_matches']} ({metrics['exact_rate']*100:.1f}%)",
        "Exact CI (95%)": f"[{metrics['exact_ci'][0]:.3f}, {metrics['exact_ci'][1]:.3f}]",
        "Normalized Match": f"{metrics['normalized_matches']} ({metrics['normalized_rate']*100:.1f}%)",
        "Normalized CI (95%)": f"[{metrics['normalized_ci'][0]:.3f}, {metrics['normalized_ci'][1]:.3f}]",
        "Semantic Match": f"{metrics['semantic_matches']} ({metrics['semantic_rate']*100:.1f}%)",
        "Semantic CI (95%)": f"[{metrics['semantic_ci'][0]:.3f}, {metrics['semantic_ci'][1]:.3f}]",
        "Avg Similarity": f"{metrics['avg_semantic_similarity']:.3f}",
    })

accuracy_summary_df = pd.DataFrame(accuracy_summary_rows)
display(accuracy_summary_df)

## 7. Cross-Stage Comparison

In [None]:
#@title Side-by-side metrics comparison (double-click to show code)
# Side-by-side metrics comparison
comparison_data = []

for stage_name in STAGES_TO_EVALUATE:
    stage_tier1 = tier1_results[stage_name]
    stage_tier2 = tier2_results[stage_name]
    stage_answers = answer_results[stage_name]
    
    # Tier 1 metrics
    parse_rate = sum(1 for r in stage_tier1 if r.get("parse_success")) / len(stage_tier1)
    tier1_pass_rate = sum(1 for r in stage_tier1 if r.get("tier1_result", {}).get("passed")) / len(stage_tier1)
    
    # Tier 2 metrics
    quality_scores = [r.get("content_quality") for r in stage_tier2 if r.get("content_quality")]
    avg_quality = np.mean([q["overall_score"] for q in quality_scores]) if quality_scores else 0.0
    
    # Answer correctness
    exact_rate = accuracy_metrics[stage_name]["exact_rate"]
    semantic_rate = accuracy_metrics[stage_name]["semantic_rate"]
    
    comparison_data.append({
        "Stage": stage_name,
        "Parse Rate": f"{parse_rate*100:.1f}%",
        "Tier 1 Pass Rate": f"{tier1_pass_rate*100:.1f}%",
        "Avg Content Quality": f"{avg_quality:.3f}",
        "Exact Match Rate": f"{exact_rate*100:.1f}%",
        "Semantic Match Rate": f"{semantic_rate*100:.1f}%",
    })

comparison_df = pd.DataFrame(comparison_data)
display(comparison_df)

In [None]:
#@title Visual comparison chart (double-click to show code)
# Visual comparison chart
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

stages = comparison_df["Stage"].tolist()

# Parse rates
parse_rates = [float(r.replace("%", "")) for r in comparison_df["Parse Rate"]]
axes[0, 0].bar(stages, parse_rates, color="skyblue", alpha=0.7)
axes[0, 0].set_ylabel("Rate (%)")
axes[0, 0].set_title("Parse Success Rate")
axes[0, 0].set_ylim(0, 100)
axes[0, 0].grid(axis="y", alpha=0.3)

# Tier 1 pass rates
tier1_rates = [float(r.replace("%", "")) for r in comparison_df["Tier 1 Pass Rate"]]
axes[0, 1].bar(stages, tier1_rates, color="lightgreen", alpha=0.7)
axes[0, 1].set_ylabel("Rate (%)")
axes[0, 1].set_title("Tier 1 Pass Rate")
axes[0, 1].set_ylim(0, 100)
axes[0, 1].grid(axis="y", alpha=0.3)

# Content quality
quality_scores = [float(r) for r in comparison_df["Avg Content Quality"]]
axes[1, 0].bar(stages, quality_scores, color="orange", alpha=0.7)
axes[1, 0].set_ylabel("Score")
axes[1, 0].set_title("Average Content Quality")
axes[1, 0].set_ylim(0, 1)
axes[1, 0].grid(axis="y", alpha=0.3)

# Answer correctness
exact_rates = [float(r.replace("%", "")) for r in comparison_df["Exact Match Rate"]]
semantic_rates = [float(r.replace("%", "")) for r in comparison_df["Semantic Match Rate"]]
x = np.arange(len(stages))
width = 0.35
axes[1, 1].bar(x - width/2, exact_rates, width, label="Exact Match", color="coral", alpha=0.7)
axes[1, 1].bar(x + width/2, semantic_rates, width, label="Semantic Match", color="lightblue", alpha=0.7)
axes[1, 1].set_ylabel("Rate (%)")
axes[1, 1].set_title("Answer Correctness")
axes[1, 1].set_xticks(x)
axes[1, 1].set_xticklabels(stages)
axes[1, 1].legend()
axes[1, 1].set_ylim(0, 100)
axes[1, 1].grid(axis="y", alpha=0.3)

plt.tight_layout()
plt.show()

## 8. Failure Analysis

In [None]:
#@title Identify worst examples (double-click to show code)
# Identify worst examples
def classify_failure_mode(result: Dict[str, Any]) -> str:
    """Classify the type of failure."""
    if not result.get("parse_success"):
        return "Parse Failure"
    
    tier1_passed = result.get("tier1_result", {}).get("passed", False)
    if not tier1_passed:
        return "Tier 1 Failure (Structure)"
    
    answer_scores = result.get("answer_scores", {})
    if answer_scores:
        if not answer_scores.get("exact_match") and not answer_scores.get("semantic_match"):
            return "Answer Incorrect"
    
    quality = result.get("content_quality")
    if quality and quality.get("overall_score", 1.0) < 0.5:
        return "Low Content Quality"
    
    return "Success"

# Classify failures for all stages
failure_analysis = {}

for stage_name in STAGES_TO_EVALUATE:
    stage_tier1 = tier1_results[stage_name]
    stage_tier2 = tier2_results[stage_name]
    stage_answers = answer_results[stage_name]
    
    # Merge results
    merged_results = []
    tier1_map = {r["example_id"]: r for r in stage_tier1}
    tier2_map = {r["example_id"]: r for r in stage_tier2}
    answer_map = {r["example_id"]: r for r in stage_answers}
    
    for example_id in tier1_map.keys():
        merged = {
            "example_id": example_id,
            **tier1_map.get(example_id, {}),
            **tier2_map.get(example_id, {}),
            **answer_map.get(example_id, {}),
        }
        merged["failure_mode"] = classify_failure_mode(merged)
        merged_results.append(merged)
    
    failure_analysis[stage_name] = merged_results

print("✓ Failure analysis complete")

In [None]:
#@title Display worst examples with annotations (double-click to show code)
# Display worst examples with annotations
def display_worst_examples(stage_name: str, failure_results: List[Dict[str, Any]], n: int = 5):
    """Display the worst performing examples."""
    # Sort by failure severity
    failure_order = {"Parse Failure": 0, "Tier 1 Failure (Structure)": 1, "Answer Incorrect": 2, "Low Content Quality": 3, "Success": 4}
    
    sorted_results = sorted(
        failure_results,
        key=lambda r: (
            failure_order.get(r.get("failure_mode", "Success"), 4),
            -r.get("content_quality", {}).get("overall_score", 1.0) if r.get("content_quality") else 0.0,
            -r.get("answer_scores", {}).get("semantic_similarity", 1.0) if r.get("answer_scores") else 0.0,
        )
    )
    
    worst = sorted_results[:n]
    
    print(f"\n{'='*80}")
    print(f"Worst {n} Examples for {stage_name}")
    print(f"{'='*80}")
    
    for i, result in enumerate(worst, 1):
        print(f"\n{i}. Example: {result['example_id']}")
        print(f"   Failure Mode: {result.get('failure_mode', 'Unknown')}")
        
        if not result.get("parse_success"):
            print(f"   Parse Error: {result.get('parse_error', 'Unknown')}")
        
        tier1_result = result.get("tier1_result", {})
        if tier1_result:
            print(f"   Tier 1 Passed: {tier1_result.get('passed', False)}")
            if tier1_result.get("errors"):
                print(f"   Tier 1 Errors: {', '.join(tier1_result['errors'][:3])}")
        
        quality = result.get("content_quality")
        if quality:
            print(f"   Content Quality: {quality.get('overall_score', 0.0):.3f}")
        
        answer_scores = result.get("answer_scores")
        if answer_scores:
            print(f"   Exact Match: {answer_scores.get('exact_match', False)}")
            print(f"   Semantic Similarity: {answer_scores.get('semantic_similarity', 0.0):.3f}")
        
        # Find original problem
        for ex in validation_examples:
            if ex["id"] == result["example_id"]:
                print(f"   Problem Preview: {ex['problem'][:100]}...")
                break

for stage_name, results in failure_analysis.items():
    display_worst_examples(stage_name, results, n=5)

In [None]:
#@title Failure mode distribution (double-click to show code)
# Failure mode distribution
failure_summary_rows = []

for stage_name, results in failure_analysis.items():
    failure_modes = [r.get("failure_mode", "Unknown") for r in results]
    mode_counts = pd.Series(failure_modes).value_counts()
    
    for mode, count in mode_counts.items():
        failure_summary_rows.append({
            "Stage": stage_name,
            "Failure Mode": mode,
            "Count": count,
            "Percentage": f"{count/len(results)*100:.1f}%"
        })

failure_summary_df = pd.DataFrame(failure_summary_rows)
display(failure_summary_df.pivot_table(
    index="Failure Mode",
    columns="Stage",
    values="Count",
    fill_value=0
))

## 9. Results Export

In [None]:
#@title Save comprehensive results as JSON (compatible with results/ format) (double-click to show code)
# Save comprehensive results as JSON (compatible with results/ format)
def compile_final_results() -> Dict[str, Any]:
    """Compile all evaluation results into final format."""
    return {
        "evaluation_metadata": {
            "timestamp": datetime.now().isoformat(),
            "data_source": DATA_SOURCE,
            "stages_evaluated": STAGES_TO_EVALUATE,
            "eval_tiers": EVAL_TIERS,
            "num_examples": len(validation_examples),
        },
        "tier1_results": {
            stage: [
                {
                    "example_id": r["example_id"],
                    "parse_success": r.get("parse_success", False),
                    "format_metrics": r.get("format_metrics", {}),
                    "tier1_passed": r.get("tier1_result", {}).get("passed", False),
                    "tier1_score": r.get("tier1_result", {}).get("score", 0.0),
                    "tier1_errors": r.get("tier1_result", {}).get("errors", []),
                }
                for r in results
            ]
            for stage, results in tier1_results.items()
        },
        "tier2_results": {
            stage: [
                {
                    "example_id": r["example_id"],
                    "content_quality": r.get("content_quality"),
                }
                for r in results
            ]
            for stage, results in tier2_results.items()
        },
        "answer_results": {
            stage: [
                {
                    "example_id": r["example_id"],
                    "answer_scores": r.get("answer_scores"),
                }
                for r in results
            ]
            for stage, results in answer_results.items()
        },
        "summary_metrics": {
            stage: {
                "format_adherence": {
                    "parse_rate": sum(1 for r in tier1_results[stage] if r.get("parse_success")) / len(tier1_results[stage]),
                    "tier1_pass_rate": sum(1 for r in tier1_results[stage] if r.get("tier1_result", {}).get("passed")) / len(tier1_results[stage]),
                },
                "content_quality": {
                    "avg_overall_score": np.mean([
                        r.get("content_quality", {}).get("overall_score", 0.0)
                        for r in tier2_results[stage]
                        if r.get("content_quality")
                    ]) if any(r.get("content_quality") for r in tier2_results[stage]) else 0.0,
                },
                "answer_correctness": accuracy_metrics[stage],
            }
            for stage in STAGES_TO_EVALUATE
        },
    }

final_results = compile_final_results()

# Save to JSON
results_file = OUTPUT_DIR / f"evaluation_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(results_file, "w") as f:
    json.dump(final_results, f, indent=2, default=str)

print(f"✓ Results saved to {results_file}")

In [None]:
#@title Generate Markdown report (double-click to show code)
# Generate Markdown report
def generate_markdown_report(results: Dict[str, Any]) -> str:
    """Generate a comprehensive Markdown evaluation report."""
    md_lines = [
        "# Pramana Evaluation Report",
        "",
        f"**Generated:** {results['evaluation_metadata']['timestamp']}",
        f"**Validation Directory:** {results['evaluation_metadata']['validation_dir']}",
        f"**Stages Evaluated:** {', '.join(results['evaluation_metadata']['stages_evaluated'])}",
        f"**Number of Examples:** {results['evaluation_metadata']['num_examples']}",
        "",
        "## Summary Metrics",
        "",
    ]
    
    # Summary table
    md_lines.extend([
        "| Stage | Parse Rate | Tier 1 Pass Rate | Avg Content Quality | Exact Match Rate | Semantic Match Rate |",
        "|-------|------------|------------------|---------------------|------------------|---------------------|",
    ])
    
    for stage in results['evaluation_metadata']['stages_evaluated']:
        summary = results['summary_metrics'][stage]
        format_metrics = summary['format_adherence']
        quality_metrics = summary['content_quality']
        answer_metrics = summary['answer_correctness']
        
        md_lines.append(
            f"| {stage} | "
            f"{format_metrics['parse_rate']*100:.1f}% | "
            f"{format_metrics['tier1_pass_rate']*100:.1f}% | "
            f"{quality_metrics['avg_overall_score']:.3f} | "
            f"{answer_metrics['exact_rate']*100:.1f}% | "
            f"{answer_metrics['semantic_rate']*100:.1f}% |"
        )
    
    md_lines.extend(["", "## Detailed Results", "",])
    
    for stage in results['evaluation_metadata']['stages_evaluated']:
        md_lines.extend([
            f"### {stage}",
            "",
            f"**Format Adherence:**",
            f"- Parse Success Rate: {results['summary_metrics'][stage]['format_adherence']['parse_rate']*100:.1f}%",
            f"- Tier 1 Pass Rate: {results['summary_metrics'][stage]['format_adherence']['tier1_pass_rate']*100:.1f}%",
            "",
            f"**Content Quality:**",
            f"- Average Overall Score: {results['summary_metrics'][stage]['content_quality']['avg_overall_score']:.3f}",
            "",
            f"**Answer Correctness:**",
            f"- Exact Match Rate: {results['summary_metrics'][stage]['answer_correctness']['exact_rate']*100:.1f}%",
            f"- Semantic Match Rate: {results['summary_metrics'][stage]['answer_correctness']['semantic_rate']*100:.1f}%",
            f"- Average Semantic Similarity: {results['summary_metrics'][stage]['answer_correctness']['avg_semantic_similarity']:.3f}",
            "",
        ])
    
    return "\n".join(md_lines)

markdown_report = generate_markdown_report(final_results)

# Save Markdown report
report_file = OUTPUT_DIR / f"evaluation_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
report_file.write_text(markdown_report, encoding="utf-8")

print(f"✓ Markdown report saved to {report_file}")
print("\n" + "="*80)
print("Report Preview:")
print("="*80)
print(markdown_report[:1000] + "..." if len(markdown_report) > 1000 else markdown_report)