# ChronoMiner Extraction Evaluation

This notebook evaluates structured data extraction quality across multiple LLM providers and models.

## Evaluation Method

Metrics are computed **chunk-by-chunk** using the temporary JSONL files produced by the extractor.
This approach:
- **Eliminates formatting penalties** from whitespace differences in final JSON output
- **Enables accurate per-chunk error attribution** for debugging
- **Isolates extraction quality** from post-processing effects

## Setup

Ensure you have:
1. Corrected transcription files in `test_data/input/{category}/`
2. Ground truth JSONL files in `test_data/ground_truth/{category}/`
3. Model output JSONL files in `test_data/output/{category}/{model_name}/`

In [None]:
import json
import sys
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')

import yaml

# Add parent directory for imports
EVAL_DIR = Path.cwd()
PROJECT_ROOT = EVAL_DIR.parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(EVAL_DIR))

# Import extraction metrics
from metrics import (
    ExtractionMetrics,
    aggregate_metrics,
    compute_extraction_metrics,
    format_field_metrics_table,
    format_metrics_table,
)

# Import JSONL chunk-level utilities
from jsonl_eval import (
    ChunkExtraction,
    DocumentExtractions,
    parse_extraction_jsonl,
    find_jsonl_file,
    load_chunk_extractions,
    load_ground_truth_chunks,
    align_chunks,
)

print(f"Evaluation directory: {EVAL_DIR}")
print(f"Project root: {PROJECT_ROOT}")
print("Imports successful!")

## Load Configuration

In [None]:
# Load evaluation config
config_path = EVAL_DIR / "eval_config.yaml"

with open(config_path, "r", encoding="utf-8") as f:
    config = yaml.safe_load(f)

# Extract paths
input_path = EVAL_DIR / config["dataset"]["input_path"]
output_path = EVAL_DIR / config["dataset"]["output_path"]
ground_truth_path = EVAL_DIR / config["dataset"]["ground_truth_path"]
reports_path = EVAL_DIR / config["evaluation"]["reports_path"]

# Evaluation settings
threshold = config["evaluation"].get("string_similarity_threshold", 0.85)
case_sensitive = config["evaluation"].get("case_sensitive", False)
normalize_ws = config["evaluation"].get("normalize_whitespace", True)
schema_fields = config["evaluation"].get("schema_fields", {})

categories = config["dataset"]["categories"]
models = config["models"]

# Create reports directory
reports_path.mkdir(parents=True, exist_ok=True)

print(f"Categories: {[c['name'] for c in categories]}")
print(f"Models: {[m['name'] for m in models]}")
print(f"Similarity threshold: {threshold}")
print(f"\nPaths:")
print(f"  Input: {input_path}")
print(f"  Output: {output_path}")
print(f"  Ground Truth: {ground_truth_path}")
print(f"  Reports: {reports_path}")

## Discover Available Data

In [None]:
def discover_sources(category_name: str) -> List[str]:
    """
    Discover source files in the input directory for a category.
    
    Returns:
        List of source names (without extension)
    """
    input_dir = input_path / category_name
    
    if not input_dir.exists():
        return []
    
    sources = []
    for input_file in sorted(input_dir.glob("*.txt")):
        stem = input_file.stem
        if stem.endswith("_line_ranges") or stem.endswith("_context"):
            continue
        sources.append(stem)
    
    return sources


def discover_available_models(category_name: str) -> List[str]:
    """
    Discover which models have JSONL output for a category.
    
    Returns:
        List of model names with available output
    """
    cat_output = output_path / category_name
    
    if not cat_output.exists():
        return []
    
    available = []
    for d in cat_output.iterdir():
        if d.is_dir():
            # Check if model directory has any JSONL files
            jsonl_files = list(d.rglob("*.jsonl"))
            # Filter out batch tracking files
            jsonl_files = [f for f in jsonl_files if "_batch_" not in f.name]
            if jsonl_files:
                available.append(d.name)
    
    return sorted(available)


def check_ground_truth_available(category_name: str) -> Tuple[bool, int, str]:
    """
    Check if ground truth files exist for a category.
    
    Returns:
        Tuple of (has_ground_truth, count_of_files, format)
    """
    gt_dir = ground_truth_path / category_name
    if not gt_dir.exists():
        return False, 0, "none"
    
    # Check for JSONL format (preferred)
    jsonl_files = list(gt_dir.glob("*.jsonl"))
    if jsonl_files:
        return True, len(jsonl_files), "jsonl"
    
    # Fall back to JSON format (legacy)
    json_files = list(gt_dir.glob("*.json"))
    if json_files:
        return True, len(json_files), "json"
    
    return False, 0, "none"


# Discover and display available data
print("=" * 60)
print("AVAILABLE DATA SUMMARY")
print("=" * 60)

data_summary = {}

for cat in categories:
    cat_name = cat["name"]
    sources = discover_sources(cat_name)
    available_models = discover_available_models(cat_name)
    gt_available, gt_count, gt_format = check_ground_truth_available(cat_name)
    
    data_summary[cat_name] = {
        "sources": sources,
        "models": available_models,
        "ground_truth_available": gt_available,
        "ground_truth_count": gt_count,
        "ground_truth_format": gt_format,
    }
    
    print(f"\n{cat_name.upper()}")
    print("-" * 40)
    print(f"  Input sources: {len(sources)}")
    if sources:
        for s in sources[:5]:
            print(f"    - {s}")
        if len(sources) > 5:
            print(f"    ... and {len(sources) - 5} more")
    print(f"  Models with JSONL output: {len(available_models)}")
    for m in available_models:
        print(f"    - {m}")
    print(f"  Ground truth: {'Yes' if gt_available else 'No'} ({gt_count} files, format: {gt_format})")

## Chunk-Level Evaluation

Metrics are computed by comparing each chunk of the model output against the corresponding
ground truth chunk. This ensures:
- No formatting penalties from whitespace differences in final JSON output
- Accurate per-chunk error attribution
- Better isolation of extraction quality from post-processing effects

In [None]:
@dataclass
class ChunkEvaluationResult:
    """Container for per-chunk evaluation results."""
    chunk_index: int
    custom_id: str
    metrics: Optional[ExtractionMetrics]
    ground_truth_found: bool
    output_found: bool
    gt_entry_count: int = 0
    hyp_entry_count: int = 0
    error: Optional[str] = None


@dataclass
class SourceEvaluationResult:
    """Container for source-level evaluation results."""
    category: str
    model_name: str
    source_name: str
    chunk_results: List[ChunkEvaluationResult]
    aggregated_metrics: Optional[ExtractionMetrics]
    ground_truth_found: bool
    output_found: bool
    error: Optional[str] = None
    
    @property
    def total_chunks(self) -> int:
        return len(self.chunk_results)
    
    @property
    def evaluated_chunks(self) -> int:
        return sum(1 for c in self.chunk_results if c.metrics is not None)


def evaluate_source_chunks(
    category_name: str,
    model_name: str,
    source_name: str,
    schema_name: str,
) -> SourceEvaluationResult:
    """
    Evaluate a source by comparing chunks from model output to ground truth.
    
    Args:
        category_name: Dataset category
        model_name: Model identifier
        source_name: Source file name
        schema_name: Schema name for field selection
        
    Returns:
        SourceEvaluationResult with per-chunk and aggregated metrics
    """
    # Load ground truth chunks
    gt_doc = load_ground_truth_chunks(ground_truth_path, category_name, source_name)
    if gt_doc is None or not gt_doc.chunks:
        return SourceEvaluationResult(
            category=category_name,
            model_name=model_name,
            source_name=source_name,
            chunk_results=[],
            aggregated_metrics=None,
            ground_truth_found=False,
            output_found=False,
            error="Ground truth not found",
        )
    
    # Load model output chunks
    hyp_doc = load_chunk_extractions(output_path, category_name, model_name, source_name)
    if hyp_doc is None or not hyp_doc.chunks:
        return SourceEvaluationResult(
            category=category_name,
            model_name=model_name,
            source_name=source_name,
            chunk_results=[],
            aggregated_metrics=None,
            ground_truth_found=True,
            output_found=False,
            error="Model output not found",
        )
    
    # Get fields to evaluate for this schema
    fields = schema_fields.get(schema_name, [])
    
    # Align chunks
    aligned = align_chunks(hyp_doc, gt_doc)
    
    # Compute per-chunk metrics
    chunk_results: List[ChunkEvaluationResult] = []
    valid_metrics: List[ExtractionMetrics] = []
    
    for hyp_chunk, gt_chunk in aligned:
        # Determine chunk info
        if gt_chunk:
            chunk_index = gt_chunk.chunk_index
            custom_id = gt_chunk.custom_id or (hyp_chunk.custom_id if hyp_chunk else "")
        elif hyp_chunk:
            chunk_index = hyp_chunk.chunk_index
            custom_id = hyp_chunk.custom_id
        else:
            continue
        
        # Check availability
        gt_found = gt_chunk is not None and gt_chunk.has_entries()
        hyp_found = hyp_chunk is not None and hyp_chunk.has_entries()
        
        gt_entries = gt_chunk.get_entries() if gt_chunk else []
        hyp_entries = hyp_chunk.get_entries() if hyp_chunk else []
        
        if not gt_found and not hyp_found:
            # Both empty - skip
            continue
        
        if not gt_found:
            chunk_results.append(ChunkEvaluationResult(
                chunk_index=chunk_index,
                custom_id=custom_id,
                metrics=None,
                ground_truth_found=False,
                output_found=hyp_found,
                gt_entry_count=0,
                hyp_entry_count=len(hyp_entries),
                error="No ground truth for chunk",
            ))
            continue
        
        if not hyp_found:
            chunk_results.append(ChunkEvaluationResult(
                chunk_index=chunk_index,
                custom_id=custom_id,
                metrics=None,
                ground_truth_found=True,
                output_found=False,
                gt_entry_count=len(gt_entries),
                hyp_entry_count=0,
                error="No model output for chunk",
            ))
            continue
        
        # Compute metrics for this chunk
        try:
            gt_data = {"entries": gt_entries}
            hyp_data = {"entries": hyp_entries}
            
            metrics = compute_extraction_metrics(
                ground_truth=gt_data,
                hypothesis=hyp_data,
                fields_to_evaluate=fields if fields else None,
                threshold=threshold,
                case_sensitive=case_sensitive,
                normalize_ws=normalize_ws,
            )
            
            chunk_results.append(ChunkEvaluationResult(
                chunk_index=chunk_index,
                custom_id=custom_id,
                metrics=metrics,
                ground_truth_found=True,
                output_found=True,
                gt_entry_count=len(gt_entries),
                hyp_entry_count=len(hyp_entries),
            ))
            valid_metrics.append(metrics)
        except Exception as e:
            chunk_results.append(ChunkEvaluationResult(
                chunk_index=chunk_index,
                custom_id=custom_id,
                metrics=None,
                ground_truth_found=True,
                output_found=True,
                gt_entry_count=len(gt_entries),
                hyp_entry_count=len(hyp_entries),
                error=str(e),
            ))
    
    # Aggregate metrics
    aggregated = aggregate_metrics(valid_metrics) if valid_metrics else None
    
    return SourceEvaluationResult(
        category=category_name,
        model_name=model_name,
        source_name=source_name,
        chunk_results=chunk_results,
        aggregated_metrics=aggregated,
        ground_truth_found=True,
        output_found=True,
    )


print("Chunk-level evaluation functions defined.")

## Run Evaluation

In [None]:
def evaluate_category(
    category: dict,
    model: dict,
) -> Tuple[List[SourceEvaluationResult], Optional[ExtractionMetrics]]:
    """
    Evaluate all sources in a category for a given model.
    
    Args:
        category: Category config dict
        model: Model config dict
        
    Returns:
        Tuple of (list of per-source results, aggregated metrics)
    """
    cat_name = category["name"]
    schema_name = category["schema"]
    model_name = model["name"]
    
    sources = discover_sources(cat_name)
    results = []
    all_chunk_metrics = []
    
    for source in sources:
        result = evaluate_source_chunks(cat_name, model_name, source, schema_name)
        results.append(result)
        
        # Collect valid chunk metrics for aggregation
        for chunk_result in result.chunk_results:
            if chunk_result.metrics is not None:
                all_chunk_metrics.append(chunk_result.metrics)
    
    aggregated = aggregate_metrics(all_chunk_metrics) if all_chunk_metrics else None
    
    return results, aggregated


# Run evaluation
all_metrics = {}
all_results = {}  # Store detailed results for reporting

for model in models:
    model_name = model["name"]
    all_metrics[model_name] = {}
    all_results[model_name] = {}
    
    for category in categories:
        cat_name = category["name"]
        
        results, aggregated = evaluate_category(category, model)
        all_results[model_name][cat_name] = results
        
        if aggregated and aggregated.total_gt_entries > 0:
            all_metrics[model_name][cat_name] = aggregated
            evaluated_sources = sum(1 for r in results if r.aggregated_metrics is not None)
            total_chunks = sum(r.total_chunks for r in results)
            print(f"{model_name} / {cat_name}: "
                  f"Entry F1={aggregated.entry_f1:.2%}, "
                  f"Micro F1={aggregated.micro_f1:.2%} "
                  f"({evaluated_sources} sources, {total_chunks} chunks)")

print("\nEvaluation complete!")

## Results Summary Table

In [None]:
from IPython.display import Markdown, display

# Display summary table
if all_metrics:
    summary_table = format_metrics_table(all_metrics)
    display(Markdown(summary_table))
else:
    print("No metrics computed. Check that ground truth and model outputs exist.")

## Field-Level Breakdown

In [None]:
# Show field-level breakdown for each model/category
for model_name, cat_metrics in all_metrics.items():
    for cat_name, metrics in cat_metrics.items():
        display(Markdown(f"### {model_name} / {cat_name}"))
        display(Markdown(format_field_metrics_table(metrics)))
        print()

## Per-Source Details (Optional)

In [None]:
# Show per-source breakdown for a specific model/category
# Uncomment and modify as needed:

# SHOW_MODEL = "gpt_5.1_medium"
# SHOW_CATEGORY = "bibliography"

# if SHOW_MODEL in all_results and SHOW_CATEGORY in all_results[SHOW_MODEL]:
#     results = all_results[SHOW_MODEL][SHOW_CATEGORY]
#     print(f"\n{SHOW_MODEL} / {SHOW_CATEGORY}:")
#     print("-" * 60)
#     for r in results:
#         if r.aggregated_metrics:
#             m = r.aggregated_metrics
#             print(f"  {r.source_name}: F1={m.entry_f1:.2%}, "
#                   f"{r.evaluated_chunks}/{r.total_chunks} chunks")
#         else:
#             print(f"  {r.source_name}: {r.error or 'No data'}")

## Save Reports

In [None]:
import csv

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
reports_path.mkdir(parents=True, exist_ok=True)

# Save JSON
json_path = reports_path / f"eval_results_{timestamp}.json"
json_data = {
    "timestamp": timestamp,
    "evaluation_method": "chunk-level",
    "config": {
        "threshold": threshold,
        "case_sensitive": case_sensitive,
        "normalize_whitespace": normalize_ws,
    },
    "results": {
        model: {cat: m.to_dict() for cat, m in cats.items()}
        for model, cats in all_metrics.items()
    },
}
with open(json_path, "w", encoding="utf-8") as f:
    json.dump(json_data, f, indent=2, ensure_ascii=False)
print(f"Saved: {json_path}")

# Save CSV
csv_path = reports_path / f"eval_results_{timestamp}.csv"
rows = []
for model_name, cat_metrics in all_metrics.items():
    for cat_name, m in cat_metrics.items():
        rows.append({
            "model": model_name,
            "category": cat_name,
            "entry_precision": round(m.entry_precision * 100, 2),
            "entry_recall": round(m.entry_recall * 100, 2),
            "entry_f1": round(m.entry_f1 * 100, 2),
            "micro_precision": round(m.micro_precision * 100, 2),
            "micro_recall": round(m.micro_recall * 100, 2),
            "micro_f1": round(m.micro_f1 * 100, 2),
        })
if rows:
    with open(csv_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=rows[0].keys())
        writer.writeheader()
        writer.writerows(rows)
    print(f"Saved: {csv_path}")

# Save Markdown
md_path = reports_path / f"eval_results_{timestamp}.md"
md_content = f"""# ChronoMiner Extraction Evaluation Results

**Generated:** {timestamp}

**Evaluation Method:** Chunk-level (using temporary JSONL files)

## Summary

{format_metrics_table(all_metrics) if all_metrics else 'No results'}
"""
with open(md_path, "w", encoding="utf-8") as f:
    f.write(md_content)
print(f"Saved: {md_path}")

## Visualization (Optional)

In [None]:
try:
    import matplotlib.pyplot as plt
    import numpy as np
    
    if not all_metrics:
        print("No metrics to visualize")
    else:
        # Prepare data for plotting
        model_names = list(all_metrics.keys())
        cat_names = list(set(
            cat for cats in all_metrics.values() for cat in cats.keys()
        ))
        
        if model_names and cat_names:
            # Create grouped bar chart for F1 scores
            fig, ax = plt.subplots(figsize=(12, 6))
            
            x = np.arange(len(model_names))
            width = 0.8 / len(cat_names)
            
            for i, cat in enumerate(cat_names):
                f1_scores = [
                    all_metrics[model].get(cat, ExtractionMetrics()).micro_f1 * 100
                    for model in model_names
                ]
                offset = (i - len(cat_names) / 2 + 0.5) * width
                ax.bar(x + offset, f1_scores, width, label=cat)
            
            ax.set_ylabel("Micro F1 Score (%)")
            ax.set_xlabel("Model")
            ax.set_title("Extraction Quality by Model and Category (Chunk-Level Evaluation)")
            ax.set_xticks(x)
            ax.set_xticklabels(model_names, rotation=45, ha="right")
            ax.legend(title="Category")
            ax.set_ylim(0, 100)
            ax.grid(axis="y", alpha=0.3)
            
            plt.tight_layout()
            
            # Save chart
            chart_path = reports_path / f"eval_chart_{timestamp}.png"
            plt.savefig(chart_path, dpi=150)
            print(f"Saved: {chart_path}")
            
            plt.show()
        else:
            print("No data to visualize")
    
except ImportError:
    print("matplotlib not available - skipping visualization")