# 02: The Evaluation Harness

The shared evaluation harness (`aieng.agent_evals.evaluation`) provides three composable building blocks:

1. **Datasets** — persistent (input, expected_output) collections stored in Langfuse
2. **Task functions** — async callables that run your agent and return its output as a string
3. **Evaluator functions** — async callables that score output against expected output

These compose into `run_experiment`, which handles scheduling, Langfuse integration, and result collection.

## What You'll Learn

1. Uploading a dataset to Langfuse
2. Writing task and evaluator functions
3. Using the built-in `create_llm_as_judge_evaluator`
4. Running `run_experiment` and reading `ExperimentResult`
5. Two-pass evaluation with trace-level graders

## Prerequisites

- `GOOGLE_API_KEY` in `.env` — used for the LLM judge via Google's OpenAI-compatible API
- `LANGFUSE_PUBLIC_KEY` and `LANGFUSE_SECRET_KEY` in `.env`

> **Config note:** `aieng-eval-agents/aieng/agent_evals/configs.py` is the single source of global shared configuration (API keys, model names, Langfuse credentials). It contains an OpenAI-compatible client (`openai_api_key` + `openai_base_url`) used **only for LLM-as-a-judge** — this lets you point the judge at any OpenAI-compatible endpoint (Gemini, OpenAI, etc.). Agent implementations, by contrast, use **google-adk** directly, which reads `GOOGLE_API_KEY` independently of the OpenAI client config.

In [None]:
import contextlib
import os
from pathlib import Path
from typing import Any

from aieng.agent_evals.async_client_manager import AsyncClientManager
from aieng.agent_evals.evaluation import run_experiment, run_experiment_with_trace_evals
from aieng.agent_evals.evaluation.graders import create_llm_as_judge_evaluator
from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig
from aieng.agent_evals.evaluation.trace import extract_trace_metrics
from aieng.agent_evals.evaluation.types import TraceWaitConfig
from aieng.agent_evals.langfuse import init_tracing
from dotenv import load_dotenv
from IPython.display import HTML, display  # noqa: A004
from langfuse import Langfuse
from langfuse.experiment import Evaluation
from openai import AsyncOpenAI
from rich.console import Console
from rich.panel import Panel
from rich.table import Table


if Path("").absolute().name == "eval-agents":
    print(f"Working directory: {Path('').absolute()}")
else:
    os.chdir(Path("").absolute().parent.parent)
    print(f"Working directory set to: {Path('').absolute()}")

load_dotenv(verbose=True)
console = Console(width=100)

DATASET_NAME = "capital-cities-basics"
tracing_enabled = init_tracing()

## 1. Datasets

A Langfuse dataset is a persistent collection of evaluation items. Each item has:

- **`input`** — passed to the task function (and to evaluators as context)
- **`expected_output`** — ground truth used by evaluators; never shown to the agent
- **`metadata`** — optional dict for filtering or passing context to evaluators

Setting a deterministic `id` on each item makes uploads idempotent — safe to re-run.

In [None]:
langfuse = Langfuse()

qa_items = [
    {"input": "What is the capital of France?", "expected_output": "Paris"},
    {"input": "What is the capital of Japan?", "expected_output": "Tokyo"},
    {"input": "What is the capital of Brazil?", "expected_output": "Brasília"},
    {"input": "What is the capital of Australia?", "expected_output": "Canberra"},
    {"input": "What is the capital of Canada?", "expected_output": "Ottawa"},
]

with contextlib.suppress(Exception):
    langfuse.create_dataset(name=DATASET_NAME)

for i, item in enumerate(qa_items):
    langfuse.create_dataset_item(
        dataset_name=DATASET_NAME,
        id=f"{DATASET_NAME}-{i}",  # deterministic id → idempotent
        input=item["input"],
        expected_output=item["expected_output"],
    )

console.print(f"[green]✓[/green] Dataset '[cyan]{DATASET_NAME}[/cyan]' ready ({len(qa_items)} items)")

## 2. Task Functions

A task function receives one dataset item and returns the agent's output as a string.
The function must accept `item` as a keyword argument; extra arguments go in `**kwargs`.

```python
async def my_task(*, item: Any, **kwargs: Any) -> str:
    ...
```

`item.input` holds the question. `item.expected_output` is available but must not be used
by the task — only by evaluators.

In [None]:
async def capital_city_task(*, item: Any, **kwargs: Any) -> str:
    """Answer a capital city question using a chat completion model."""
    # Langfuse runs each task in a new thread with its own event loop.
    # A shared (singleton) async client would be bound to a previous thread's
    # closed event loop and fail. Use async with to create and close a fresh
    # client within the current event loop.
    configs = AsyncClientManager().configs
    async with AsyncOpenAI(
        api_key=configs.openai_api_key.get_secret_value(),
        base_url=configs.openai_base_url,
    ) as client:
        response = await client.chat.completions.create(
            model=configs.default_worker_model,
            messages=[
                {"role": "system", "content": "Answer in one sentence."},
                {"role": "user", "content": item.input},
            ],
        )
    return response.choices[0].message.content or ""

## 3. Evaluator Functions

An evaluator receives `input`, `output`, and `expected_output` and returns one or more
`Evaluation` objects — each a (name, numeric value) pair with an optional comment.

### Custom Evaluator

The simplest form: a direct assertion on the output string.

In [None]:
async def substring_match(
    *,
    output: str,
    expected_output: str,
    **kwargs: Any,
) -> list[Evaluation]:
    """Score 1.0 if expected_output appears in output (case-insensitive)."""
    correct = expected_output.strip().lower() in output.lower()
    return [Evaluation(name="substring_match", value=float(correct))]

### Built-in: LLM-as-Judge

`create_llm_as_judge_evaluator` returns an evaluator backed by an OpenAI model.
Provide a rubric in Markdown; the model scores each criterion 0 or 1 and returns
an explanation for each score.

In [None]:
rubric = (
    "- **correctness**: 1 if the output names the correct capital city, else 0.\n"
    "- **conciseness**: 1 if the answer is one sentence or less, else 0.\n"
)

llm_judge = create_llm_as_judge_evaluator(
    name="capital_judge",
    rubric_markdown=rubric,
    model_config=LLMRequestConfig(temperature=0.0),
)

## 4. Running `run_experiment`

`run_experiment` fetches the dataset from Langfuse, runs the task on every item
(with configurable concurrency), applies each evaluator, and records scores back to Langfuse.

In [None]:
result = run_experiment(
    DATASET_NAME,
    name="capital-cities-v1",
    task=capital_city_task,
    evaluators=[substring_match, llm_judge],
    description="gpt-4o-mini on capital city questions.",
    max_concurrency=5,
)

console.print("[green]✓[/green] Experiment complete")
if result.dataset_run_url:
    display(
        HTML(f'<p>View results: <a href="{result.dataset_run_url}" target="_blank">{result.dataset_run_url}</a></p>')
    )

In [None]:
table = Table(title="Item Results")
table.add_column("Question", style="white", max_width=38)
table.add_column("Output", style="dim", max_width=32)
table.add_column("substring_match", justify="center", style="cyan")
table.add_column("correctness", justify="center", style="green")
table.add_column("conciseness", justify="center", style="blue")

for item_result in result.item_results:
    scores = {e.name: e.value for e in (item_result.evaluations or [])}
    q = str(item_result.item.input)
    q = q[:36] + "..." if len(q) > 36 else q
    out = str(item_result.output or "")
    out = out[:30] + "..." if len(out) > 30 else out
    table.add_row(
        q,
        out,
        str(scores.get("substring_match", "-")),
        str(scores.get("correctness", "-")),
        str(scores.get("conciseness", "-")),
    )

console.print(table)

## 5. Two-Pass Evaluation: Trace Graders

Some criteria can only be assessed from the execution trace — for example, latency, tool call counts,
or whether the agent's answer is grounded in evidence it actually retrieved.

`run_experiment_with_trace_evals` adds a second pass after traces have ingested into Langfuse:

1. **Pass 1** — output-based evaluators (same as `run_experiment`)
2. **Pass 2** — trace-based evaluators; waits for ingestion, then scores each trace

A trace evaluator receives the full `trace` object and the `item_result`:

```python
def my_trace_eval(*, trace, item_result, **kwargs) -> list[Evaluation]:
    ...
```

`create_trace_groundedness_evaluator` is the built-in grader for tool-using agents —
it checks whether the output's claims are supported by tool observations in the trace.
For a simple LLM call with no tools, a latency evaluator is used here instead.

The `trace_wait` parameter controls how long to wait for Langfuse ingestion before
timing out (default: 180 s).

In [None]:
def trace_latency_evaluator(*, trace, item_result, **kwargs):
    """Score latency from the trace — works for any traced task."""
    metrics = extract_trace_metrics(trace)
    if metrics.latency_sec is None:
        return []
    return [Evaluation(name="latency_sec", value=metrics.latency_sec)]


if tracing_enabled:
    result_with_traces = run_experiment_with_trace_evals(
        DATASET_NAME,
        name="capital-cities-v1-traced",
        task=capital_city_task,
        evaluators=[substring_match],
        trace_evaluators=[trace_latency_evaluator],
        trace_wait=TraceWaitConfig(max_wait_sec=60),
        max_concurrency=5,
    )
    console.print("[green]✓[/green] Two-pass experiment complete")
    if result_with_traces.experiment.dataset_run_url:
        display(
            HTML(
                f'<p>View results: <a href="{result_with_traces.experiment.dataset_run_url}"'
                f' target="_blank">{result_with_traces.experiment.dataset_run_url}</a></p>'
            )
        )
else:
    console.print(
        Panel(
            "[dim]Langfuse credentials not found — skipping two-pass evaluation.\n"
            "Set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY in .env to enable.[/dim]",
            title="Note",
            border_style="dim",
        )
    )

In [None]:
if tracing_enabled:
    trace_evals = result_with_traces.trace_evaluations
    trace_scores = trace_evals.evaluations_by_trace_id  # trace_id → list[Evaluation]

    table = Table(title="Trace Evaluation Results")
    table.add_column("Question", style="white", max_width=40)
    table.add_column("latency_sec", justify="right", style="cyan")
    table.add_column("Status", justify="center", style="dim")

    for item_result in result_with_traces.experiment.item_results:
        q = str(item_result.item.input)
        q = q[:38] + "..." if len(q) > 38 else q
        trace_id = item_result.trace_id

        if trace_id and trace_id in trace_scores:
            scores = {e.name: e.value for e in trace_scores[trace_id]}
            latency = scores.get("latency_sec")
            table.add_row(q, f"{latency:.2f}s" if latency is not None else "-", "✓")
        elif trace_id in (trace_evals.skipped_trace_ids or []):
            table.add_row(q, "-", "skipped")
        elif trace_id in (trace_evals.failed_trace_ids or []):
            table.add_row(q, "-", "failed")
        else:
            table.add_row(q, "-", "no trace")

    console.print(table)

    scored = len(trace_scores)
    skipped = len(trace_evals.skipped_trace_ids or [])
    failed = len(trace_evals.failed_trace_ids or [])
    console.print(f"[dim]Scored: {scored}  Skipped: {skipped}  Failed: {failed}[/dim]")

## Summary

| Component | Protocol |
|-----------|----------|
| **Task function** | `async def task(*, item, **kwargs) -> str` |
| **Item evaluator** | `async def eval(*, input, output, expected_output, metadata, **kwargs) -> list[Evaluation]` |
| **Trace evaluator** | `async def eval(*, trace, item_result, **kwargs) -> list[Evaluation]` |

`run_experiment` composes them: for each dataset item it runs the task and all evaluators,
then records scores to Langfuse.

Built-in graders:
- `create_llm_as_judge_evaluator` — rubric-based output scoring
- `create_trace_groundedness_evaluator` — evidence-grounded claim verification