# CAT Agent Evaluator 

This notebook provides a simplified agent evaluation framework that evaluates only the specific questions from the tests_structure.json file with exactly one turn per question.

## Features
- Multi-provider support (Anthropic, Amazon, Meta, OpenAI)
- Cross-region inference endpoints
- On-demand endpoints
- Configurable AWS regions
- **Direct question evaluation from JSON file**
- **One turn per question evaluation**

## Available Models
- **Anthropic**: `us.anthropic.claude-3-sonnet-20240229-v1:0`, `us.anthropic.claude-3-7-sonnet-20250219-v1:0`
- **Amazon Nova**: `amazon.nova-premier-v1:0`, `amazon.nova-pro-v1:0`, `amazon.nova-lite-v1:0`, `amazon.nova-micro-v1:0`
- **Meta Llama**: `us.meta.llama4-maverick-17b-instruct-v1:0`, `us.meta.llama3-2-90b-instruct-v1:0`
- **OpenAI**: `openai.gpt-oss-120b-1:0`, `openai.gpt-oss-20b-1:0`

## Usage
1. Place your test JSON file (e.g., `tests_structure.json`) in the same directory as this notebook
2. Configure your model, agent, and region settings in the final cell
3. Run all cells to execute the evaluation

## Install Required Dependencies

First, let's install all the required packages:

In [1]:
# Install required packages
!pip install boto3 click jinja2 jsonpath-ng markdown-it-py pydantic pyyaml rich



## Import Required Libraries

Import all necessary Python libraries for the agent evaluation framework:

In [10]:
#!/usr/bin/env python3
import json
import uuid
import boto3
import re
import os
import csv
from datetime import datetime
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum

## Model Provider Configuration

Define the supported model providers and configuration classes:

In [11]:
class ModelProvider(Enum):
    ANTHROPIC = "anthropic"
    AMAZON = "amazon"
    META = "meta"
    OPENAI = "openai"


@dataclass
class BedrockModelConfig:
    model_id: str
    request_body: Dict
    
    @property
    def provider(self) -> ModelProvider:
        if "anthropic" in self.model_id:
            return ModelProvider.ANTHROPIC
        elif "amazon" in self.model_id:
            return ModelProvider.AMAZON
        elif "meta" in self.model_id:
            return ModelProvider.META
        elif "openai" in self.model_id:
            return ModelProvider.OPENAI
        else:
            raise ValueError(f"Unsupported model ID: {self.model_id}")

### Load Agent and LangFuse Configuration

Load the configuration from config.json file:

In [46]:
# Load configuration
with open('config.json', 'r') as config_file:
    config = json.load(config_file)

print("📋 Configuration loaded successfully!")
print(f"🏷️  Project: {config['langfuse']['project_name']}")
print(f"🌍 Environment: {config['langfuse']['environment']}")
print(f"🤖 Agent ID: {config['agent']['agentId']}")
print(f"🤖 Agent Alias ID: {config['agent']['agentAliasId']}")
print(f"🔗 Langfuse URL: {config['langfuse']['langfuse_api_url']}")

📋 Configuration loaded successfully!
🏷️  Project: XXX
🌍 Environment: XXX
🤖 Agent ID: CARG5UXPD9
🤖 Agent Alias ID: 0RV9TBGQC4
🔗 Langfuse URL: http://langfu-loadb-ukoqudmq8a8v-2110705221.us-east-1.elb.amazonaws.com


### Configure Langfuse Integration

In [48]:
# Set up Langfuse configuration
os.environ["OTEL_SERVICE_NAME"] = 'Langfuse'
os.environ["DEPLOYMENT_ENVIRONMENT"] = config["langfuse"]["environment"]

project_name = config["langfuse"]["project_name"]
environment = config["langfuse"]["environment"]
langfuse_public_key = config["langfuse"]["langfuse_public_key"]
langfuse_secret_key = config["langfuse"]["langfuse_secret_key"]
langfuse_api_url = config["langfuse"]["langfuse_api_url"]

# Create auth header
auth_token = base64.b64encode(
    f"{langfuse_public_key}:{langfuse_secret_key}".encode()
).decode()

# Set OpenTelemetry environment variables for Langfuse
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{langfuse_api_url}/api/public/otel/v1/traces"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {auth_token}"

print(f"📊 Langfuse endpoint: {os.environ['OTEL_EXPORTER_OTLP_ENDPOINT']}")
print(f"🏷️  Project: {project_name}, Environment: {environment}")

📊 Langfuse endpoint: http://langfu-loadb-ukoqudmq8a8v-2110705221.us-east-1.elb.amazonaws.com/api/public/otel/v1/traces
🏷️  Project: XXX, Environment: XXX


## Conversation and Target Agent Classes

Define classes for conversation handling and Bedrock agent communication:

In [12]:
class Conversation:
    """Captures the interaction between a user and an agent."""
    
    def __init__(self):
        self.messages = []
        self.turns = 0

    def add_turn(self, user_message: str, agent_response: str):
        """Record a turn in the conversation."""
        self.messages.extend([("USER", user_message), ("AGENT", agent_response)])
        self.turns += 1

    def __iter__(self):
        return iter(self.messages)


class BedrockAgentTarget:
    """A target encapsulating an Amazon Bedrock agent."""
    
    def __init__(self, bedrock_agent_id: str, bedrock_agent_alias_id: str, aws_region: str = "us-east-1"):
        self.bedrock_agent_id = bedrock_agent_id
        self.bedrock_agent_alias_id = bedrock_agent_alias_id
        self.session_id = str(uuid.uuid4())
        self.client = boto3.client("bedrock-agent-runtime", region_name=aws_region)

    def invoke(self, prompt: str) -> str:
        """Invoke the target with a prompt."""
        response = self.client.invoke_agent(
            agentId=self.bedrock_agent_id,
            agentAliasId=self.bedrock_agent_alias_id,
            sessionId=self.session_id,
            inputText=prompt,
            enableTrace=True,
        )

        stream = response["completion"]
        completion = ""
        
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                completion += chunk.get("bytes").decode()

        return completion

## Bedrock Request Handler

This class handles communication with different Bedrock model providers:

In [13]:
class BedrockRequestHandler:
    """Static class for building requests to and receiving requests from Bedrock."""
    
    @staticmethod
    def build_request_body(request_body: Dict, model_config: BedrockModelConfig, 
                          system_prompt: str, prompt: str) -> Dict:
        """Build request body for different model providers."""
        if model_config.provider == ModelProvider.ANTHROPIC:
            request_body["system"] = system_prompt
            if "messages" in request_body:
                request_body["messages"][0]["content"][0]["text"] = prompt
        elif model_config.provider == ModelProvider.AMAZON:
            # Amazon Nova models use system array format
            request_body["system"] = [{"text": system_prompt}]
            if "messages" in request_body:
                request_body["messages"][0]["content"][0]["text"] = prompt
        elif model_config.provider == ModelProvider.META:
            # Meta Llama models use prompt format
            request_body["prompt"] = (
                f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>{system_prompt}"
                f"<|eot_id|><|start_header_id|>user<|end_header_id|>{prompt}"
                "<|eot_id|><|start_header_id|>assistant<|end_header_id|>"
            )
        elif model_config.provider == ModelProvider.OPENAI:
            # OpenAI models use messages format similar to OpenAI API
            if "messages" in request_body:
                request_body["messages"] = [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": prompt}
                ]
        return request_body

    @staticmethod
    def parse_completion_from_response(response: Dict, model_config: BedrockModelConfig) -> str:
        """Parse completion from different model provider responses."""
        response_body = response.get("body").read()
        response_json = json.loads(response_body)
        
        if model_config.provider == ModelProvider.ANTHROPIC:
            completion = response_json["content"][0]["text"]
        elif model_config.provider == ModelProvider.AMAZON:
            # Amazon Nova models return output in message format
            completion = response_json["output"]["message"]["content"][0]["text"]
        elif model_config.provider == ModelProvider.META:
            # Meta Llama models return generation
            completion = response_json["generation"]
        elif model_config.provider == ModelProvider.OPENAI:
            # OpenAI models return choices with message content
            completion = response_json["choices"][0]["message"]["content"]
        else:
            raise ValueError(f"Unsupported provider: {model_config.provider}")
            
        return completion

##  Agent Evaluator Class 

The main evaluator class that evaluates only the specific questions from JSON file with one turn per question:

In [17]:
class CATAgentEvaluator:
    """Simplified agent evaluator - FIXED to evaluate only JSON questions with one turn each."""
    
    def __init__(self, evaluator_model: str, agent_id: str, agent_alias_id: str, 
                 aws_region: str = "us-east-1"):
        self.evaluator_model = evaluator_model
        self.agent_id = agent_id
        self.agent_alias_id = agent_alias_id
        self.aws_region = aws_region
        
        # Initialize Bedrock client for evaluator
        self.bedrock_client = boto3.client("bedrock-runtime", region_name=aws_region)
        
        # Initialize target agent
        self.target = BedrockAgentTarget(agent_id, agent_alias_id, aws_region)
        
        # Configure evaluator model based on provider
        self.model_config = self._create_model_config(evaluator_model)

    def _create_model_config(self, model_id: str) -> BedrockModelConfig:
        """Create model configuration based on the model provider."""
        if "anthropic" in model_id:
            return BedrockModelConfig(
                model_id=model_id,
                request_body={
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 4000,
                    "temperature": 0.0,
                    "messages": [{"role": "user", "content": [{"text": ""}]}]
                }
            )
        elif "amazon" in model_id:
            return BedrockModelConfig(
                model_id=model_id,
                request_body={
                    "inferenceConfig": {
                        "maxTokens": 4000,
                        "temperature": 0.0
                    },
                    "messages": [{"role": "user", "content": [{"text": ""}]}]
                }
            )
        elif "meta" in model_id:
            return BedrockModelConfig(
                model_id=model_id,
                request_body={
                    "max_gen_len": 4000,
                    "temperature": 0.0,
                    "prompt": ""
                }
            )
        elif "openai" in model_id:
            return BedrockModelConfig(
                model_id=model_id,
                request_body={
                    "max_tokens": 4000,
                    "temperature": 0.0,
                    "messages": []  # Will be populated by build_request_body
                }
            )
        else:
            raise ValueError(f"Unsupported model: {model_id}")

    def _extract_content_from_xml(self, xml_data: str, element_names: List[str]) -> Tuple:
        """Extract content from XML tags with improved error handling."""
        content = []
        for e in element_names:
            try:
                # Try exact match first
                pattern = rf"<{e}>(.*?)</{e}>"
                match = re.search(pattern, xml_data, re.DOTALL)
                if match:
                    extracted = match.group(1).strip()
                    content.append(extracted if extracted else None)
                else:
                    # Try case-insensitive match
                    pattern = rf"<{e.lower()}>(.*?)</{e.lower()}>"
                    match = re.search(pattern, xml_data.lower(), re.DOTALL)
                    if match:
                        # Find the original case version
                        start_tag = f"<{e.lower()}>"
                        end_tag = f"</{e.lower()}>"
                        start_idx = xml_data.lower().find(start_tag)
                        end_idx = xml_data.lower().find(end_tag)
                        if start_idx != -1 and end_idx != -1:
                            extracted = xml_data[start_idx + len(start_tag):end_idx].strip()
                            content.append(extracted if extracted else None)
                        else:
                            content.append(None)
                    else:
                        content.append(None)
            except Exception as ex:
                print(f"Warning: Error extracting {e} from XML: {ex}")
                content.append(None)
        return tuple(content)

    def _generate(self, system_prompt: str, prompt: str, output_xml_element: str) -> Tuple[str, str]:
        """Generate response using the evaluator model."""
        request_body = BedrockRequestHandler.build_request_body(
            request_body=self.model_config.request_body.copy(),
            model_config=self.model_config,
            system_prompt=system_prompt,
            prompt=prompt,
        )

        response = self.bedrock_client.invoke_model(
            modelId=self.model_config.model_id, 
            body=json.dumps(request_body)
        )

        completion = BedrockRequestHandler.parse_completion_from_response(
            response=response,
            model_config=self.model_config
        )

        output, reasoning = self._extract_content_from_xml(
            completion, [output_xml_element, "thinking"]
        )

        return output, reasoning

    def _generate_evaluation(self, expected_result: str, question: str, agent_response: str) -> Tuple[str, str]:
        """Generate evaluation of a single question-answer pair against expected result."""
        system_prompt = """You are a quality assurance engineer evaluating an agent's response to a user question.

Your job is to analyze the user question, agent response, and expected result to determine if the agent's response meets the expected criteria.

You will classify the response into the following categories:

- A: The agent's response meets or exceeds the expected result criteria.
- B: The agent's response does not meet the expected result criteria.

Please think hard about the response in <thinking> tags before providing only the category letter within <category> tags."""
        
        prompt = f"""Here is the evaluation scenario:

<question>
{question}
</question>

<agent_response>
{agent_response}
</agent_response>

<expected_result>
{expected_result}
</expected_result>

Evaluate whether the agent's response meets the expected result criteria."""

        evaluation, reasoning = self._generate(
            system_prompt=system_prompt,
            prompt=prompt,
            output_xml_element="category",
        )
        
        return evaluation, reasoning

    def evaluate_test(self, test_name: str, questions: List[str], expected_results: List[str]) -> Dict:
        """Evaluate a single test with questions from JSON file - one turn per question."""
        conversation = Conversation()
        all_results = []
        
        print(f"\n=== Evaluating Test: {test_name} ===")
        
        # Process each question as a separate turn
        for i, (question, expected_result) in enumerate(zip(questions, expected_results)):
            print(f"\nTurn {i + 1}")
            print(f"USER: {question}")
            
            # Get agent response
            agent_response = self.target.invoke(question)
            print(f"AGENT: {agent_response}")
            
            # Add turn to conversation
            conversation.add_turn(question, agent_response)
            
            # Evaluate this specific question-answer pair
            eval_category, reasoning = self._generate_evaluation(expected_result, question, agent_response)
            
            question_passed = eval_category == "A"
            question_result = {
                "question_number": i + 1,
                "question": question,
                "expected_result": expected_result,
                "agent_response": agent_response,
                "passed": question_passed,
                "reasoning": reasoning
            }
            all_results.append(question_result)
            
            print(f"Question {i + 1} Status: {'PASSED' if question_passed else 'FAILED'}")
        
        # Overall test passes if all questions pass
        overall_passed = all(result["passed"] for result in all_results)
        
        if overall_passed:
            overall_result = "All questions in the test passed - expected results observed."
        else:
            failed_questions = [str(r["question_number"]) for r in all_results if not r["passed"]]
            overall_result = f"Test failed - questions {', '.join(failed_questions)} did not meet expected results."
        
        # Combine all reasoning
        combined_reasoning = " | ".join([f"Q{r['question_number']}: {r['reasoning']}" for r in all_results if r['reasoning']])

        return {
            "test_name": test_name,
            "passed": overall_passed,
            "result": overall_result,
            "reasoning": combined_reasoning,
            "conversation": [(sender, message) for sender, message in conversation.messages],
            "turns": conversation.turns,
            "question_results": all_results
        }

    def run_evaluation(self, tests_file: str) -> Dict:
        """Run evaluation on all tests from the JSON file."""
        # Get current working directory and construct full path
        current_dir = os.getcwd()
        tests_file_path = os.path.join(current_dir, tests_file)
        
        print(f"Loading tests from: {tests_file_path}")
        
        with open(tests_file_path, 'r') as f:
            tests_data = json.load(f)
        
        results = []
        total_tests = 0
        passed_tests = 0
        
        for test_name, test_data in tests_data.items():
            # Extract questions and expected results from the multi-turn structure
            questions = []
            expected_results = []
            
            for question_key in sorted(test_data.keys()):
                if question_key.startswith('question_'):
                    questions.append(test_data[question_key]['question'])
                    expected_results.append(test_data[question_key]['expected_results'])
            
            # Run evaluation
            test_result = self.evaluate_test(test_name, questions, expected_results)
            results.append(test_result)
            
            total_tests += 1
            if test_result['passed']:
                passed_tests += 1
        
        # Calculate pass rate
        pass_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
        
        return {
            "pass_rate": f"{pass_rate:.1f}%",
            "total_tests": total_tests,
            "passed_tests": passed_tests,
            "results": results
        }

## CSV Export Function

This function exports evaluation results to a CSV file with detailed information for each test question:

In [18]:
def save_evaluation_results_to_csv(evaluation_results: Dict, agent_id: str, agent_alias: str, 
                                   output_filename: str = None) -> str:
    """
    Save evaluation results to a CSV file with detailed question-by-question information.
    
    Args:
        evaluation_results: Dictionary containing evaluation results from run_evaluation()
        agent_id: The Bedrock agent ID
        agent_alias: The agent alias ID
        output_filename: Optional custom filename. If None, generates timestamp-based name.
    
    Returns:
        str: Path to the created CSV file
    """
    
    # Generate filename if not provided
    if output_filename is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_filename = f"agent_evaluation_results_{timestamp}.csv"
    
    # Ensure .csv extension
    if not output_filename.endswith('.csv'):
        output_filename += '.csv'
    
    # Get current working directory
    current_dir = os.getcwd()
    
    # Define evaluation results directory
    results_dir = os.path.join(current_dir, "evaluation_results")
    
    # Create the directory if it doesn't exist
    os.makedirs(results_dir, exist_ok=True)
    
    # Define complete output path
    output_path = os.path.join(results_dir, output_filename)
    
    # Define CSV headers
    headers = [
        'AGENT_ID',
        'AGENT_ALIAS', 
        'TEST_NAME',
        'QUESTION_NUMBER',
        'QUESTION',
        'EXPECTED_RESULT',
        'AGENT_RESPONSE',
        'QUESTION_PASSED',
        'TEST_PASSED',
        'REASONING'
    ]
    
    # Write CSV file
    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        
        # Write headers
        writer.writerow(headers)
        
        # Process each test result
        for result in evaluation_results['results']:
            test_name = result['test_name']
            test_passed = result['passed']
            
            # Process each question in the test
            for question_result in result.get('question_results', []):
                row = [
                    agent_id,
                    agent_alias,
                    test_name,
                    question_result['question_number'],
                    question_result['question'],
                    question_result['expected_result'],
                    question_result['agent_response'],
                    question_result['passed'],
                    test_passed,
                    question_result['reasoning'] or 'No reasoning provided'
                ]
                writer.writerow(row)
    
    print(f"\n📊 Evaluation results saved to: {output_path}")
    print(f"📈 Total rows written: {sum(len(result.get('question_results', [])) for result in evaluation_results['results'])}")
    
    return output_path


## Configuration and Main Execution 

Configure the evaluation parameters and run the evaluation with the fixed logic:

In [None]:
def main():
    """Main function to run the evaluation - one turn per question only."""
    # Configuration - Update these values
    # Available model options:
    # - Anthropic: "us.anthropic.claude-3-sonnet-20240229-v1:0", "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
    # - Amazon Nova: "amazon.nova-premier-v1:0", "amazon.nova-pro-v1:0", "amazon.nova-lite-v1:0", "amazon.nova-micro-v1:0"
    # - Meta Llama: "us.meta.llama4-maverick-17b-instruct-v1:0", "us.meta.llama3-2-90b-instruct-v1:0"
    # - OpenAI: "openai.gpt-oss-120b-1:0", "openai.gpt-oss-20b-1:0"

    EVALUATOR_MODEL = "amazon.nova-pro-v1:0"  # Amazon Nova Pro model
    AGENT_ID = "CARG5UXPD9"  # Replace with your actual agent ID
    AGENT_ALIAS_ID = "0RV9TBGQC4"  # Replace with your actual alias ID
    AWS_REGION = "us-east-1"  # Replace with your region
    TESTS_FILE = "tests_structure.json"

    # Initialize evaluator (removed max_turns parameter - not needed anymore)
    evaluator = CATAgentEvaluator(
        evaluator_model=EVALUATOR_MODEL,
        agent_id=AGENT_ID,
        agent_alias_id=AGENT_ALIAS_ID,
        aws_region=AWS_REGION
    )

    # Run evaluation
    print("Starting FIXED Agent Evaluation...")
    print(f"Evaluator Model: {EVALUATOR_MODEL}")
    print(f"AWS Region: {AWS_REGION}")
    print(f"Target Agent ID: {AGENT_ID}")
    print(f"Target Agent Alias: {AGENT_ALIAS_ID}")

    evaluation_results = evaluator.run_evaluation(TESTS_FILE)

    # Print results
    print(f"\n{'='*60}")
    print("EVALUATION SUMMARY")
    print(f"{'='*60}")
    print(f"Pass Rate: {evaluation_results['pass_rate']}")
    print(f"Tests Passed: {evaluation_results['passed_tests']}/{evaluation_results['total_tests']}")

    print(f"\n{'='*60}")
    print("DETAILED RESULTS")
    print(f"{'='*60}")

    for result in evaluation_results['results']:
        print(f"\nTest: {result['test_name']}")
        print(f"Status: {'PASSED' if result['passed'] else 'FAILED'}")
        print(f"Result: {result['result']}")
        print(f"Reasoning: {result['reasoning']}")
        print(f"Questions Evaluated: {result['turns']}")
        
        # Show individual question results
        if 'question_results' in result:
            print("\nQuestion Details:")
            for q_result in result['question_results']:
                status = "✅ PASSED" if q_result['passed'] else "❌ FAILED"
                print(f"  Q{q_result['question_number']}: {status}")
                print(f"    Question: {q_result['question'][:100]}...")
                print(f"    Reasoning: {q_result['reasoning']}")
        
        print("-" * 40)
    
    # Save results to CSV
    csv_path = save_evaluation_results_to_csv(
        evaluation_results=evaluation_results,
        agent_id=AGENT_ID,
        agent_alias=AGENT_ALIAS_ID
    )
    

if __name__ == "__main__":
    main()