# Online Monitoring for Agents

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JudgmentLabs/judgment-cookbook/blob/main/Report_Agent_Online_Monitoringipynb)
[![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.judgmentlabs.ai/documentation)


In this notebook, you will learn how to create a research agent that scours Wikimedia's [Wikipedia](https://huggingface.co/datasets/wikimedia/wikipedia) dataset with [ChromaDB](https://www.trychroma.com/), then monitor and evaluate its performance using **custom scorers** with the [`judgeval`](https://github.com/JudgmentLabs/judgeval) library for real-time AI agent monitoring.

1. **Example Scorer (Output-Level)**: Uses LLM-as-a-judge to evaluate the research report relevance
2. **Trajectory Scorer (Process-Level)**: Uses LLM-as-a-judge to validate if citations and document references actually exist in the database
3. **Alerts**: Get notified via email, Slack, or PagerDuty when scorers dip below performance thresholds


You will create a research agent using Wikipedia dataset and ChromaDB, then build custom scorers that evaluate both example-level and trajectory-level agent behavior with real-time alerts for monitoring.

In [None]:
# Installations
!pip install chromadb datasets openai judgeval

To run this notebook, select **Runtime -> Run All**

## Setup

You can get your Judgment API key and Org ID for free on the [Judgment Labs Platform](https://app.judgmentlabs.ai/register).

![Get Started](./assets/get_started.png)

On the Judgment Platform and within your organization, create a project called `research-agent-project`.
- Within Judgment's web app, navigate to `Projects` -> `New Project` in the top right corner.

In [None]:
# set api keys
import os

os.environ['OPENAI_API_KEY'] = ...  # Fill your API keys here
os.environ["JUDGMENT_API_KEY"] = ...
os.environ["JUDGMENT_ORG_ID"] = ...

# Set tokenizers parallelism to avoid warnings
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
# Initialize trace and client
from judgeval.tracer import Tracer, wrap
from openai import OpenAI

judgment = Tracer(project_name="research-agent-project")
client = wrap(OpenAI()) #automatically tracks all LLM API calls

## Understanding the Wikipedia Dataset

[Wikipedia](https://huggingface.co/datasets/wikimedia/wikipedia) is a dataset created by Wikimedia containing cleaned articles across 300+ languages. We'll be using the simplified English split, which contains 242k articles written in easier-to-understand language compared to standard Wikipedia articles, dating from November 1st, 2023.

### What Wikipedia Contains
Each article contains:
- **id**: Article identifier
- **url**: URL of the article
- **title**: Title of the article
- **text**: Text content of the article

Let's take a look at an example article from the dataset to understand the data we're working with!



In [None]:
from datasets import load_dataset

dataset = load_dataset("wikimedia/wikipedia", "20231101.simple", split="train")
print("Title: ", dataset[0]["title"])
print("URL: ", dataset[0]["url"])
print("Text: ", dataset[0]["text"])

## Building the Research Agent

Now that we've explored the Wikipedia dataset, we'll use this data to build an agent that searches through the database and generates reports based on user queries!

Here are the steps we will follow:

1. **Database Setup**: Initialize ChromaDB and populate with articles from Wikipedia

2. **Build Custom Scorers**: Create both output-level and trajectory-level evaluation scorers to track report quality and agent behavior

3. **Online Monitoring**: Set up online monitoring and integrate into the agent and tools (`search_documents`, `get_document`, `synthesize_report`)

4. **Alerts**: Configure notifications when scorers cross thresholds via email, Slack, or PagerDuty


### Database Setup

ChromaDB is the open-source search and retrieval database for AI applications. It uses the [`sentence-transformers/all-MiniLM-L6-v2`](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2) model to embed texts and compares them to user queries to rank documents so you retrieve the most relevant documents given the query.

Now, let's create our database and populate it with Wikipedia articles!

In [None]:
import chromadb

client = chromadb.Client()
collection = client.create_collection("knowledge_base")

batch_size = 100
print(f"Adding {min(batch_size, len(dataset))} articles to ChromaDB...")

documents = []
metadatas = []
ids = []

for i in range(min(batch_size, len(dataset))):
    article = dataset[i]
    documents.append(article["text"])
    metadatas.append({"url": article["url"]})
    ids.append(article["title"])

collection.add(
    documents=documents,
    metadatas=metadatas,
    ids=ids
)

print(f"ChromaDB initialized with {min(batch_size, len(dataset))} Wikipedia articles")

Great! Now we have added 100 articles into our database. Note that since Wikipedia is sorted in alphabetical order, the first 100 articles typically cover topics at the beginning of the alphabet. Hence, you won't find any articles relating to xylophones (unless you add more articles)!

Let's see how we can query the database for articles relating to science!

In [None]:
result = collection.query(query_texts=["What are the different types of science?"], n_results=5)
result

#### Define Your Custom Example Class

Now that we understand how the database works, we need to store the information together. This is needed to pass into our custom scorers later on.

In `judgeval`, all data passed into scorers is represented as an `Example`. The base `Example` object is an abstraction that standardizes how data is stored and accessed. By inheriting from it, you can define your own fields that describe the task you want to monitor.

For our research agent, we'll create a `Report` that captures the fields needed to represent a query and its generated report:

In [None]:
from judgeval.data import Example

class Report(Example):
    query: str
    report: str

In [None]:
one_example = Report(query="What is the capital of France?", report="The capital of France is Paris.")
print("The query is: ", one_example.query)
print("The report is: ", one_example.report)

### Custom Scorers

We'll create custom scorers using `judgeval` that utilize LLM-as-a-judge in two different ways:

1. **Example-level scorer**: A custom `ReportRelevanceScorer` that evaluates only the final output and if the final report is relevant to the user query
2. **Trajectory-level scorer**: A `TracePromptScorer` that evaluates the entire agent behavior and decision-making process throughout execution

#### Example Level Scorer

Let's first implement the `ReportRelevanceScorer` to check if the final report is relevant to the user query.

In `judgeval`, the user must implement:

`async def a_score_example(self, example: Example)`

This method will asynchronously score each example, and the scorer should set three key fields:

- **`self.name`**: scorer name shown in the Judgment Labs platform dashboard 
- **`self.score`**: numeric metric value (e.g., RELEVANT == 1, NOT RELEVANT == 0`)  
- **`self.reason`**: human-readable explanation or context behind the score  

In addition, we should set the **`server_hosted`** variable to `True` to enable server hosting of the custom scorer.

To see more examples of Custom Scorers, take a look at our [HumanEval Custom Scorer cookbook](https://colab.research.google.com/github/JudgmentLabs/judgment-cookbook/blob/refactor/HumanEval_Custom_Scorer.ipynb).

In [None]:
from judgeval.scorers.example_scorer import ExampleScorer
from judgeval.data import Example
from openai import OpenAI

class Report(Example):
    query: str
    report: str

class ReportRelevanceScorer(ExampleScorer):
    name: str = "Report Relevance Scorer"
    server_hosted: bool = True # Enable server hosting

    async def a_score_example(self, example: Report):
        client = OpenAI()
        # Use LLM to evaluate if research report is relevant to the query
        evaluation_prompt = f"""
        Evaluate if this research report is relevant to the query.
        
        Query: {example.query}
        Report: {example.report}
        
        Is the report relevant and does it answer the query? Answer only "YES" or "NO".
        """
        completion = client.chat.completions.create(
            model="gpt-4.1",
            messages=[{"role": "user", "content": evaluation_prompt}]
        )
        evaluation = completion.choices[0].message.content.strip().upper()

        if evaluation == "YES":
            self.reason = "LLM evaluation: Report is relevant to the query"
            return 1.0
        else:
            self.reason = "LLM evaluation: Report is not relevant to the query"
            return 0.0

#### Upload Your Scorer

These custom scorers may add application latency if you run them locally, thus, we have built secure infrastructure to run them with zero impact on Firecracker microVMs. In order to use our servers, first copy and paste the above cell into a Python file called `report_relevance_scorer.py` (we do this for you), and then deploy your scorer to our infrastructure with a single command:


```bash
echo -e "pydantic\nopenai" > requirements.txt
uv run judgeval upload_scorer report_relevance_scorer.py requirements.txt
```

Your scorer runs in its own secure sandbox. Re-upload anytime your scoring logic changes.

#### Trajectory Level Scorer

If you don't only want to check the output-level of your agent, but rather the behavior and tool calls it took throughout the process, we can use the `TracePromptScorer` to apply a prompt onto an LLM-as-a-judge to look over the trace of your agent. This can be helpful when you want to analyze tool-call ordering, agent reasoning, and general behavior based on certain user queries. 

In our example, we will use the `TracePromptScorer` to ensure that the generated report's citations actually reference documents that exist in the database. We will check this by verifying if the agent used the `get_document` tool to retrieve each cited document or if it fabricated the citations.

The `TracePromptScorer` has two main methods:

- **`.create(name, prompt, options)`**: Creates a scorer with that name and prompt on the server. The prompt instructs the LLM to evaluate the agent's trajectory and decision-making process. You can also provide an `options` parameter to map the LLM's text responses to numeric scores. For example, if the LLM responds with "Accurate" or "Inaccurate", you can set `options={"Accurate": 1.0, "Inaccurate": 0.0}` to convert these text responses into numeric scores.
- **`.get(name)`**: Retrieves an existing scorer by name

The `TracePromptScorer` pairs with a `TraceScorerConfig`, which we'll implement in a `@judgment.observe(span_type="function", scorer_config=TraceScorerConfig(scorer=trace_scorer, model="gpt-4.1"))` decorator where you can define the model to monitor all trajectories in real-time (more on this decorator later).

NOTE: You only need to run the `.create` method once.


In [None]:
from judgeval.tracer import Tracer, TraceScorerConfig
from judgeval.scorers import TracePromptScorer

# UNCOMMENT IF YOU HAVR NOT CREATED THE SCORER YET
# trace_scorer = TracePromptScorer.create(
#     name="Citation Count Scorer",
#     prompt="Count how many citations in the report are present in the document searches throughout the database? Then divide by the total number of citations in the report and return the score as a number between 0 and 1."
# )

trace_scorer = TracePromptScorer.get(
    name="Citation Count Scorer"
)

### Building the Agent with Online Monitoring!

Now that we've built the custom scorers, we'll build the agent and instrument the custom scorers to monitor in real-time as each query is passed through the agent. This enables systematic scorer frameworks to run directly on your live agents in production, alerting you the instant agents begin to misbehave so you can push hotfixes before customers are affected.

Now let's create some tools for our agent:

- `search_documents(collection, query, n_results)` - Search for relevant texts inside `collection` based on `query` and returns the `n_results` most relevant results

- `get_document(collection, doc_id)` - Get the document from collection at `doc_id`

- `synthesize_report(query, documents)` - Based on the query and the list of documents, create a comprehensive report with citations based on documents


The `@judgment.observe(span_type="tool")` decorator on top of each function will capture all agent interactions using the tool.

In [None]:
@judgment.observe(span_type="tool")
def search_documents(collection, query: str, n_results: int = 10):
    """Search for relevant documents"""
    results = collection.query(query_texts=[query], n_results=n_results)
    if results['ids'][0]:
        return results['ids'][0]
    return []

@judgment.observe(span_type="tool")
def get_document(collection, doc_id: str):
    """Get specific document by ID"""
    result = collection.get(ids=[doc_id])

    if result['ids']:
        url = result['metadatas'][0]['url']
        content = result['documents'][0]
        return f"Title: {doc_id}\nURL: {url}\nContent: {content}"
    return None

@judgment.observe(span_type="tool")
def synthesize_report(query: str, documents: list) -> str:
    """Create a structured report with proper citations using LLM"""
    
    citation_prompt = f"""
    Create a comprehensive research report for: "{query}"
    
    Use the following sources and provide proper citations. Each source contains the title, URL, and content:
    
    {chr(10).join([f"Source {i+1}: {doc}..." for i, doc in enumerate(documents, 1)])}
    
    Requirements:
    1. Synthesize information from all sources
    2. Create a structured report with introduction, main points, and conclusion
    3. Ensure all information is properly attributed to its source
    4. Write in a professional, academic style
    5. Use numbered in-text citations throughout the report (e.g., "This concept was first introduced [1] and later expanded [2]")
    
    Format the report as:
    # Research Report: [Query]
    
    ## Introduction
    [Brief introduction to the topic]
    
    ## Key Findings
    [Main points with numbered in-text citations like [1], [2], [3], etc.]
    
    ## Conclusion
    [Summary and conclusions]
    
    ## Works Cited
    [Numbered list of all sources used and their corresponding URL, formatted as: "1. Title of Article - Wikipedia (URL)"]
    """
    
    response = client.chat.completions.create(
        model="gpt-4.1",
        messages=[{"role": "user", "content": citation_prompt}],
        temperature=0.1
    )
    
    return response.choices[0].message.content

Now let's build the agent core that orchestrates these tools:

- `ResearchAgent` class - The main agent that coordinates tool calls and decision-making
- `run()` method - LLM-driven orchestration loop that parses XML tool calls and executes them
- `call_tool()` method - Routes tool calls to the appropriate function handlers
- `handle_request()` method - Entry point that integrates monitoring with `judgment.async_evaluate()`

The `@judgment.observe(span_type="function")` decorator on top of a function captures all agent-level interactions and decision-making.

**Key Monitoring Components:**
- `judgment.async_evaluate(scorer, example, sampling_rate)` runs hosted scorers with zero latency impact. The `scorer` is the custom scorer we defined earlier, the `example` is the data we want to score, and the `sampling_rate` controls the frequency of scoring (0.95 = 95% of requests)
- `TraceScorerConfig` is used to configure trajectory-level scorers and specify the model for scoring

Scorers can take time to execute, so they may appear slightly delayed on the UI.

**Note**: `span_type="function"` is used for functions that instrument agent logic, distinct from `span_type="tool"` which is used for individual tool functions.

In [None]:
from report_relevance_scorer import ReportRelevanceScorer, Report
import re

class ResearchAgent:
    def __init__(self, collection):
        self.collection = collection
        self.system_prompt = """You are a research agent. You will use the following tools to complete a report based on the user's request.

AVAILABLE TOOLS:
1. search_documents(query, n_results) - Search for relevant documents, returns list of document IDs
2. get_document(doc_id) - Get specific document content by ID (returns "Title: [title]\nContent: [content]")
3. synthesize_report(query, documents) - Create structured report from documents (documents should be actual text content, not IDs)

EXECUTION PROCESS:
1. **Search**: Find relevant documents with search_documents()
2. **Retrieve**: Get actual content of each document with get_document()
3. **Synthesize**: Create report with synthesize_report() using actual document content
4. **Assess**: Have you COMPLETELY fulfilled the user's request?
   - If NO: Continue searching until you have enough information to answer the question
   - If YES: Complete the task

WHAT "DONE" MEANS:
- You have gathered ALL information the user requested
- You have performed ALL actions the user asked for
- You have delivered a COMPLETE answer to their question
- Nothing from their original request is missing or incomplete

KEY RULE:
- **EVERY RESPONSE MUST END WITH A TOOL CALL UNLESS YOU HAVE COMPLETELY FULFILLED THE USER'S REQUEST**

Format responses as:
<plan>Your analysis and planning when needed</plan>
<tool>
{"name": "tool_name", "args": {"parameter": "value"}}
</tool>"""
    
    @judgment.observe(span_type="function")
    def handle_request(self, query: str) -> str:
        """Handle a research request and generate a report"""
        report = self.run(query)
        
        # Online evaluation with server-hosted scorer
        judgment.async_evaluate(
            scorer=ReportRelevanceScorer(),
            example=Report(query=query, report=report),
            sampling_rate=1.0  # Scores 100% of agent runs
        )
        
        return report
    
    def call_tool(self, tool_name: str, params: dict):
        """Call a tool and return result"""
        tool_handlers = {
            "search_documents": lambda: search_documents(self.collection, **params),
            "get_document": lambda: get_document(self.collection, **params),
            "synthesize_report": lambda: synthesize_report(**params)
        }
        return tool_handlers.get(tool_name, lambda: None)()
    
    @judgment.observe(span_type="function", scorer_config=TraceScorerConfig(scorer=trace_scorer, model="gpt-4.1"))
    def run(self, user_query: str, max_iterations: int = 50):
        """Run the agent with LLM-driven tool calling"""
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": f"Query: {user_query}"}
        ]
        
        for iteration in range(max_iterations):
            response = client.chat.completions.create(
                model="gpt-4.1",
                messages=messages,
            )
            
            llm_output = response.choices[0].message.content
            messages.append({"role": "assistant", "content": llm_output})
            
            # Parse tool calls
            if "<tool>" in llm_output:
                # Extract JSON from <tool> tags
                tool_match = re.search(r'<tool>\s*(\{.*?\})\s*</tool>', llm_output, re.DOTALL)
                if tool_match:
                    import json
                    tool_data = json.loads(tool_match.group(1))
                    tool_name = tool_data["name"]
                    params = tool_data["args"]
                    
                    tool_result = self.call_tool(tool_name, params)
                    
                    # If synthesize_report, return the report directly
                    if tool_name == "synthesize_report":
                        return tool_result
                    
                    # Add result to conversation
                    messages.append({
                        "role": "user", 
                        "content": f"<result>{tool_result}</result>"
                    })
            
            # Check if agent is done (no tool call and has content)
            elif "<tool>" not in llm_output and llm_output.strip():
                return llm_output.strip()
        
        return "Agent reached maximum iterations"

### Complete Agent in Action

Now let's test our complete monitoring system! Run the agent with the following code and check out your project page for `research-agent-project` to see the real-time monitoring in action.

In [None]:
query = "What are the different types of sciences?"
agent = ResearchAgent()
print(agent.handle_request(query))