# ChronoTranscriber Evaluation: CER & WER Analysis

This notebook evaluates transcription quality across multiple LLM models by computing:
- **Character Error Rate (CER)**: Edit distance at character level
- **Word Error Rate (WER)**: Edit distance at word level

## Models Evaluated
| Provider | Model | Reasoning |
|----------|-------|----------|
| OpenAI | GPT-5.1 | Medium |
| OpenAI | GPT-5 Mini | Medium |
| Google | Gemini 3.0 Pro | Medium |
| Google | Gemini 2.5 Flash | None |
| Anthropic | Claude Sonnet 4.5 | Medium |
| Anthropic | Claude Haiku 4.5 | Medium |

## Dataset Categories
1. **Address Books** - Swiss address book pages (Basel 1900)
2. **Bibliography** - European culinary bibliographies  
3. **Military Records** - Brazilian military enlistment cards

In [None]:
# Standard library imports
import json
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')

# Add parent directory to path for imports
EVAL_DIR = Path.cwd()
PROJECT_ROOT = EVAL_DIR.parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(EVAL_DIR))

# Import evaluation metrics
from metrics import (
    compute_metrics,
    aggregate_metrics,
    TranscriptionMetrics,
    format_metrics_table,
)

# Data handling
import yaml

print(f"Evaluation directory: {EVAL_DIR}")
print(f"Project root: {PROJECT_ROOT}")

## 1. Configuration

In [None]:
# Load evaluation configuration
CONFIG_PATH = EVAL_DIR / "eval_config.yaml"

with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
    config = yaml.safe_load(f)

# Extract paths
INPUT_PATH = EVAL_DIR / config['dataset']['input_path']
OUTPUT_PATH = EVAL_DIR / config['dataset']['output_path']
GROUND_TRUTH_PATH = EVAL_DIR / config['dataset']['ground_truth_path']
REPORTS_PATH = EVAL_DIR / config['evaluation']['reports_path']

# Create reports directory
REPORTS_PATH.mkdir(exist_ok=True)

# Extract categories and models
CATEGORIES = [cat['name'] for cat in config['dataset']['categories']]
MODELS = {m['name']: m for m in config['models']}

print("Configuration loaded successfully.")
print(f"\nCategories: {CATEGORIES}")
print(f"\nModels: {list(MODELS.keys())}")
print(f"\nPaths:")
print(f"  Input: {INPUT_PATH}")
print(f"  Output: {OUTPUT_PATH}")
print(f"  Ground Truth: {GROUND_TRUTH_PATH}")
print(f"  Reports: {REPORTS_PATH}")

## 2. Data Loading Functions

In [None]:
def load_transcription_from_txt(file_path: Path) -> Optional[str]:
    """
    Load transcription text from a .txt file.
    
    Args:
        file_path: Path to the .txt file
        
    Returns:
        Text content or None if file doesn't exist
    """
    if not file_path.exists():
        return None
    
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()


def load_transcription_from_jsonl(file_path: Path) -> Optional[str]:
    """
    Load and concatenate transcriptions from a JSONL file.
    
    The JSONL file may contain multiple page transcriptions.
    We extract the 'text_chunk' or 'transcription' field from each line.
    
    Args:
        file_path: Path to the .jsonl file
        
    Returns:
        Concatenated text content or None if file doesn't exist
    """
    if not file_path.exists():
        return None
    
    transcriptions = []
    
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                record = json.loads(line)
                # Try different field names
                text = record.get('text_chunk') or record.get('transcription')
                if text:
                    transcriptions.append(text)
            except json.JSONDecodeError:
                continue
    
    if not transcriptions:
        return None
    
    return "\n\n".join(transcriptions)


def find_output_file(category: str, model_name: str, source_name: str) -> Optional[Path]:
    """
    Find the output transcription file for a given source.
    
    Searches for .txt or .jsonl files in the model's output directory.
    
    Args:
        category: Dataset category (e.g., 'address_books')
        model_name: Model identifier (e.g., 'gpt_5_mini_medium')
        source_name: Source file/folder name
        
    Returns:
        Path to output file or None if not found
    """
    model_output_dir = OUTPUT_PATH / category / model_name
    
    if not model_output_dir.exists():
        return None
    
    # Try different file patterns
    base_name = Path(source_name).stem
    
    # Check for direct .txt file
    txt_file = model_output_dir / f"{base_name}.txt"
    if txt_file.exists():
        return txt_file
    
    # Check for folder with transcription.txt
    folder_txt = model_output_dir / base_name / "transcription.txt"
    if folder_txt.exists():
        return folder_txt
    
    # Check for JSONL file
    jsonl_file = model_output_dir / f"{base_name}.jsonl"
    if jsonl_file.exists():
        return jsonl_file
    
    # Check in subfolder
    folder_jsonl = model_output_dir / base_name / f"{base_name}_transcription.jsonl"
    if folder_jsonl.exists():
        return folder_jsonl
    
    return None


def load_output_transcription(category: str, model_name: str, source_name: str) -> Optional[str]:
    """
    Load a model's output transcription for a given source.
    
    Args:
        category: Dataset category
        model_name: Model identifier
        source_name: Source file/folder name
        
    Returns:
        Transcription text or None if not found
    """
    file_path = find_output_file(category, model_name, source_name)
    
    if file_path is None:
        return None
    
    if file_path.suffix == '.jsonl':
        return load_transcription_from_jsonl(file_path)
    else:
        return load_transcription_from_txt(file_path)


def find_ground_truth_file(category: str, source_name: str) -> Optional[Path]:
    """
    Find the ground truth file for a given source.
    
    Args:
        category: Dataset category
        source_name: Source file/folder name
        
    Returns:
        Path to ground truth file or None if not found
    """
    gt_dir = GROUND_TRUTH_PATH / category
    
    if not gt_dir.exists():
        return None
    
    base_name = Path(source_name).stem
    
    # Check for .txt file
    txt_file = gt_dir / f"{base_name}.txt"
    if txt_file.exists():
        return txt_file
    
    # Check for folder with ground_truth.txt
    folder_txt = gt_dir / base_name / "ground_truth.txt"
    if folder_txt.exists():
        return folder_txt
    
    return None


def load_ground_truth(category: str, source_name: str) -> Optional[str]:
    """
    Load ground truth transcription for a given source.
    
    Args:
        category: Dataset category
        source_name: Source file/folder name
        
    Returns:
        Ground truth text or None if not found
    """
    file_path = find_ground_truth_file(category, source_name)
    
    if file_path is None:
        return None
    
    return load_transcription_from_txt(file_path)


print("Data loading functions defined.")

## 3. Discover Available Data

In [None]:
def discover_sources(category: str) -> List[str]:
    """
    Discover source files/folders in the input directory for a category.
    
    Args:
        category: Dataset category
        
    Returns:
        List of source names
    """
    input_dir = INPUT_PATH / category
    
    if not input_dir.exists():
        return []
    
    sources = []
    
    for item in input_dir.iterdir():
        if item.is_file() and item.suffix.lower() in ['.pdf', '.jpg', '.jpeg', '.png', '.tiff']:
            sources.append(item.name)
        elif item.is_dir():
            # Check if folder contains images
            images = list(item.glob('*.jpg')) + list(item.glob('*.png'))
            if images:
                sources.append(item.name)
    
    return sorted(sources)


def discover_available_models(category: str) -> List[str]:
    """
    Discover which models have output for a given category.
    
    Args:
        category: Dataset category
        
    Returns:
        List of model names with available output
    """
    output_dir = OUTPUT_PATH / category
    
    if not output_dir.exists():
        return []
    
    return sorted([d.name for d in output_dir.iterdir() if d.is_dir()])


# Discover and display available data
print("=" * 60)
print("AVAILABLE DATA SUMMARY")
print("=" * 60)

data_summary = {}

for category in CATEGORIES:
    sources = discover_sources(category)
    available_models = discover_available_models(category)
    gt_available = (GROUND_TRUTH_PATH / category).exists() and any((GROUND_TRUTH_PATH / category).iterdir())
    
    data_summary[category] = {
        'sources': sources,
        'models': available_models,
        'ground_truth_available': gt_available,
    }
    
    print(f"\n{category.upper()}")
    print("-" * 40)
    print(f"  Input sources: {len(sources)}")
    if sources:
        for s in sources[:5]:
            print(f"    - {s}")
        if len(sources) > 5:
            print(f"    ... and {len(sources) - 5} more")
    print(f"  Models with output: {len(available_models)}")
    for m in available_models:
        print(f"    - {m}")
    print(f"  Ground truth available: {'Yes' if gt_available else 'No (pending)'}")

## 4. Compute Metrics

In [None]:
@dataclass
class EvaluationResult:
    """Container for evaluation results."""
    category: str
    model_name: str
    source_name: str
    metrics: Optional[TranscriptionMetrics]
    ground_truth_found: bool
    output_found: bool
    error: Optional[str] = None


def evaluate_source(
    category: str,
    model_name: str,
    source_name: str,
) -> EvaluationResult:
    """
    Evaluate a single source against ground truth.
    
    Args:
        category: Dataset category
        model_name: Model identifier
        source_name: Source file/folder name
        
    Returns:
        EvaluationResult with metrics or error info
    """
    # Load ground truth
    ground_truth = load_ground_truth(category, source_name)
    if ground_truth is None:
        return EvaluationResult(
            category=category,
            model_name=model_name,
            source_name=source_name,
            metrics=None,
            ground_truth_found=False,
            output_found=False,
            error="Ground truth not found",
        )
    
    # Load model output
    output = load_output_transcription(category, model_name, source_name)
    if output is None:
        return EvaluationResult(
            category=category,
            model_name=model_name,
            source_name=source_name,
            metrics=None,
            ground_truth_found=True,
            output_found=False,
            error="Model output not found",
        )
    
    # Compute metrics
    try:
        metrics = compute_metrics(ground_truth, output, normalize=True)
        return EvaluationResult(
            category=category,
            model_name=model_name,
            source_name=source_name,
            metrics=metrics,
            ground_truth_found=True,
            output_found=True,
        )
    except Exception as e:
        return EvaluationResult(
            category=category,
            model_name=model_name,
            source_name=source_name,
            metrics=None,
            ground_truth_found=True,
            output_found=True,
            error=str(e),
        )


def evaluate_model_category(
    category: str,
    model_name: str,
) -> Tuple[List[EvaluationResult], Optional[TranscriptionMetrics]]:
    """
    Evaluate all sources in a category for a given model.
    
    Args:
        category: Dataset category
        model_name: Model identifier
        
    Returns:
        Tuple of (list of per-source results, aggregated metrics)
    """
    sources = discover_sources(category)
    results = []
    valid_metrics = []
    
    for source in sources:
        result = evaluate_source(category, model_name, source)
        results.append(result)
        if result.metrics is not None:
            valid_metrics.append(result.metrics)
    
    aggregated = aggregate_metrics(valid_metrics) if valid_metrics else None
    
    return results, aggregated


print("Evaluation functions defined.")

In [None]:
# Run full evaluation
print("=" * 60)
print("RUNNING EVALUATION")
print("=" * 60)

all_results: Dict[str, Dict[str, List[EvaluationResult]]] = {}
aggregated_metrics: Dict[str, Dict[str, TranscriptionMetrics]] = {}

for category in CATEGORIES:
    all_results[category] = {}
    aggregated_metrics[category] = {}
    
    available_models = discover_available_models(category)
    
    if not available_models:
        print(f"\n{category}: No model outputs found (skipping)")
        continue
    
    print(f"\n{category.upper()}")
    print("-" * 40)
    
    for model_name in available_models:
        results, agg_metrics = evaluate_model_category(category, model_name)
        all_results[category][model_name] = results
        
        if agg_metrics:
            aggregated_metrics[category][model_name] = agg_metrics
            print(f"  {model_name}:")
            print(f"    CER: {agg_metrics.cer*100:.2f}%  |  WER: {agg_metrics.wer*100:.2f}%")
            print(f"    Sources evaluated: {sum(1 for r in results if r.metrics)}")
        else:
            errors = [r.error for r in results if r.error]
            print(f"  {model_name}: No valid evaluations")
            if errors:
                print(f"    Errors: {errors[0]}")

print("\n" + "=" * 60)
print("EVALUATION COMPLETE")
print("=" * 60)

## 5. Results Summary

In [None]:
# Restructure for display: model -> category -> metrics
model_category_metrics: Dict[str, Dict[str, TranscriptionMetrics]] = {}

for category, models in aggregated_metrics.items():
    for model_name, metrics in models.items():
        if model_name not in model_category_metrics:
            model_category_metrics[model_name] = {}
        model_category_metrics[model_name][category] = metrics

# Display as formatted table
if model_category_metrics:
    print("\n" + "=" * 80)
    print("RESULTS SUMMARY TABLE")
    print("=" * 80 + "\n")
    print(format_metrics_table(model_category_metrics, CATEGORIES))
else:
    print("\nNo evaluation results available.")
    print("\nTo generate results:")
    print("1. Run transcriptions for each model (save outputs to test_data/output/{category}/{model_name}/)")
    print("2. Create ground truth files in test_data/ground_truth/{category}/")
    print("3. Re-run this notebook")

In [None]:
# Compute overall metrics per model (across all categories)
print("\n" + "=" * 80)
print("OVERALL MODEL PERFORMANCE (All Categories Combined)")
print("=" * 80 + "\n")

overall_model_metrics = {}

for model_name, cat_metrics in model_category_metrics.items():
    all_metrics = list(cat_metrics.values())
    if all_metrics:
        overall = aggregate_metrics(all_metrics)
        overall_model_metrics[model_name] = overall

# Sort by CER for ranking
if overall_model_metrics:
    ranked = sorted(overall_model_metrics.items(), key=lambda x: x[1].cer)
    
    print(f"{'Rank':<6} {'Model':<30} {'CER (%)':<12} {'WER (%)':<12} {'Chars':<12} {'Words':<10}")
    print("-" * 80)
    
    for rank, (model_name, metrics) in enumerate(ranked, 1):
        print(f"{rank:<6} {model_name:<30} {metrics.cer*100:<12.2f} {metrics.wer*100:<12.2f} "
              f"{metrics.ref_char_count:<12,} {metrics.ref_word_count:<10,}")
else:
    print("No overall metrics available yet.")

## 6. Detailed Per-Source Results

In [None]:
# Show detailed results for a specific category/model (configurable)
SHOW_CATEGORY = "address_books"  # Change as needed
SHOW_MODEL = None  # Set to specific model name or None for all

if SHOW_CATEGORY in all_results:
    print(f"\n{'='*80}")
    print(f"DETAILED RESULTS: {SHOW_CATEGORY.upper()}")
    print(f"{'='*80}\n")
    
    models_to_show = [SHOW_MODEL] if SHOW_MODEL else list(all_results[SHOW_CATEGORY].keys())
    
    for model_name in models_to_show:
        if model_name not in all_results[SHOW_CATEGORY]:
            continue
            
        results = all_results[SHOW_CATEGORY][model_name]
        
        print(f"\n{model_name}")
        print("-" * 60)
        print(f"{'Source':<40} {'CER (%)':<12} {'WER (%)':<12} {'Status'}")
        print("-" * 60)
        
        for result in results:
            source_display = result.source_name[:38] + ".." if len(result.source_name) > 40 else result.source_name
            
            if result.metrics:
                print(f"{source_display:<40} {result.metrics.cer*100:<12.2f} {result.metrics.wer*100:<12.2f} OK")
            else:
                status = result.error or "Unknown error"
                print(f"{source_display:<40} {'--':<12} {'--':<12} {status}")
else:
    print(f"Category '{SHOW_CATEGORY}' not found in results.")

## 7. Export Results

In [None]:
import csv
from datetime import datetime

# Generate timestamp for reports
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Export aggregated metrics to JSON
json_report = {
    "timestamp": timestamp,
    "categories": CATEGORIES,
    "models": list(MODELS.keys()),
    "results": {},
}

for model_name, cat_metrics in model_category_metrics.items():
    json_report["results"][model_name] = {
        "per_category": {cat: m.to_dict() for cat, m in cat_metrics.items()},
    }
    if model_name in overall_model_metrics:
        json_report["results"][model_name]["overall"] = overall_model_metrics[model_name].to_dict()

json_path = REPORTS_PATH / f"eval_results_{timestamp}.json"
with open(json_path, 'w', encoding='utf-8') as f:
    json.dump(json_report, f, indent=2)
print(f"JSON report saved: {json_path}")

# Export to CSV
csv_path = REPORTS_PATH / f"eval_results_{timestamp}.csv"
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerow(["Model", "Category", "CER (%)", "WER (%)", "Char Distance", "Word Distance", 
                     "Ref Chars", "Ref Words", "Hyp Chars", "Hyp Words"])
    
    for model_name, cat_metrics in model_category_metrics.items():
        for category, metrics in cat_metrics.items():
            writer.writerow([
                model_name,
                category,
                round(metrics.cer * 100, 2),
                round(metrics.wer * 100, 2),
                metrics.char_distance,
                metrics.word_distance,
                metrics.ref_char_count,
                metrics.ref_word_count,
                metrics.hyp_char_count,
                metrics.hyp_word_count,
            ])

print(f"CSV report saved: {csv_path}")

# Export Markdown summary
md_path = REPORTS_PATH / f"eval_results_{timestamp}.md"
with open(md_path, 'w', encoding='utf-8') as f:
    f.write(f"# ChronoTranscriber Evaluation Results\n\n")
    f.write(f"**Generated:** {timestamp}\n\n")
    f.write(f"## Models Evaluated\n\n")
    for name, info in MODELS.items():
        f.write(f"- **{name}**: {info.get('description', '')}\n")
    f.write(f"\n## Results by Category\n\n")
    f.write(format_metrics_table(model_category_metrics, CATEGORIES))
    f.write(f"\n\n## Overall Rankings\n\n")
    if overall_model_metrics:
        ranked = sorted(overall_model_metrics.items(), key=lambda x: x[1].cer)
        f.write("| Rank | Model | CER (%) | WER (%) |\n")
        f.write("|------|-------|---------|---------|\n")
        for rank, (model_name, metrics) in enumerate(ranked, 1):
            f.write(f"| {rank} | {model_name} | {metrics.cer*100:.2f} | {metrics.wer*100:.2f} |\n")

print(f"Markdown report saved: {md_path}")

## 8. Visualization (Optional)

In [None]:
# Optional: Create visualizations if matplotlib is available
try:
    import matplotlib.pyplot as plt
    import numpy as np
    
    PLOT_AVAILABLE = True
except ImportError:
    PLOT_AVAILABLE = False
    print("matplotlib not available - skipping visualizations")
    print("Install with: pip install matplotlib")

if PLOT_AVAILABLE and overall_model_metrics:
    # Prepare data
    models = list(overall_model_metrics.keys())
    cer_values = [overall_model_metrics[m].cer * 100 for m in models]
    wer_values = [overall_model_metrics[m].wer * 100 for m in models]
    
    # Create figure
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))
    
    # CER bar chart
    ax1 = axes[0]
    bars1 = ax1.barh(models, cer_values, color='steelblue')
    ax1.set_xlabel('Character Error Rate (%)')
    ax1.set_title('CER by Model')
    ax1.bar_label(bars1, fmt='%.2f%%', padding=3)
    ax1.set_xlim(0, max(cer_values) * 1.2 if cer_values else 10)
    
    # WER bar chart
    ax2 = axes[1]
    bars2 = ax2.barh(models, wer_values, color='darkorange')
    ax2.set_xlabel('Word Error Rate (%)')
    ax2.set_title('WER by Model')
    ax2.bar_label(bars2, fmt='%.2f%%', padding=3)
    ax2.set_xlim(0, max(wer_values) * 1.2 if wer_values else 10)
    
    plt.tight_layout()
    
    # Save figure
    fig_path = REPORTS_PATH / f"eval_chart_{timestamp}.png"
    plt.savefig(fig_path, dpi=150, bbox_inches='tight')
    print(f"Chart saved: {fig_path}")
    
    plt.show()

---

## Next Steps

### To complete the evaluation:

1. **Create Ground Truth**
   - Run a transcription pass on each category using a high-quality model
   - Manually review and correct the outputs
   - Save corrected files to `test_data/ground_truth/{category}/{source_name}.txt`

2. **Run Model Transcriptions**
   - For each model, transcribe all sources in each category
   - Save outputs to `test_data/output/{category}/{model_name}/`
   - Use the ChronoTranscriber CLI with appropriate model configuration

3. **Re-run This Notebook**
   - The notebook will automatically discover outputs and compute metrics
   - Results will be exported to the `reports/` directory

### Expected output structure:
```
eval/
├── test_data/
│   ├── input/
│   │   ├── address_books/      # Source images
│   │   ├── bibliography/       # Source PDFs
│   │   └── military_records/   # Source PDFs
│   ├── output/
│   │   ├── address_books/
│   │   │   ├── gpt_5.1_medium/
│   │   │   ├── gpt_5_mini_medium/
│   │   │   ├── gemini_3.0_pro_medium/
│   │   │   └── ...
│   │   └── ...
│   └── ground_truth/
│       ├── address_books/      # Manually corrected transcriptions
│       ├── bibliography/
│       └── military_records/
└── reports/
    ├── eval_results_YYYYMMDD_HHMMSS.json
    ├── eval_results_YYYYMMDD_HHMMSS.csv
    ├── eval_results_YYYYMMDD_HHMMSS.md
    └── eval_chart_YYYYMMDD_HHMMSS.png
```