# Fleet Safety Agent Evaluation

This notebook demonstrates how to evaluate the Fleet Safety Agent using:
1. **Local Testing** - Direct agent invocation
2. **Vertex AI Gen AI Evaluation** - Trajectory and response metrics

> **Note**: Use the same `.venv` created by `make install` to ensure dependency compatibility.


## 1. Setup Environment


In [46]:
import sys
import os

# Add parent directory to path for imports
sys.path.insert(0, os.path.abspath(".."))

# Load environment variables using the project's env helper
from app.helpers.env import load_env_and_verify_api_key

# This loads .env from the project root and verifies GOOGLE_API_KEY is set
# Set require_maps_key=True if testing route planning features
_ = load_env_and_verify_api_key(require_maps_key=True)
print("Environment loaded successfully")


Environment loaded successfully


In [47]:
import asyncio
import json
from typing import Any

import pandas as pd
from IPython.display import Markdown, display

# ADK imports
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types


## 2. Load the Fleet Safety Agent


In [48]:
# Import the Fleet Safety Orchestrator
from app.agent import orchestrator, app

print(f"Agent loaded: {orchestrator.name}")
print(f"App name: {app.name}")


Agent loaded: fleet_safety_orchestrator
App name: fleet_safety_agent


## 3. Define Helper Functions


In [49]:
def parse_events_to_dict(events: list, *, as_json: bool = False) -> dict:
    """
    Parse ADK events into a structured dictionary with response and trajectory.
    """
    final_response = ""
    trajectory = []

    for event in events:
        if not getattr(event, "content", None) or not getattr(event.content, "parts", None):
            continue
        for part in event.content.parts:
            if getattr(part, "function_call", None):
                info = {
                    "tool_name": part.function_call.name,
                    "tool_input": dict(part.function_call.args),
                }
                if info not in trajectory:
                    trajectory.append(info)
            if event.content.role == "model" and getattr(part, "text", None):
                final_response = part.text.strip()

    return {
        "response": final_response,
        "predicted_trajectory": json.dumps(trajectory) if as_json else trajectory
    }


def format_as_markdown(output: dict) -> str:
    """Convert output to formatted markdown."""
    md = f"### Response\n{output['response']}\n\n"
    if output["predicted_trajectory"]:
        md += "### Tool Calls\n"
        traj = output["predicted_trajectory"]
        if isinstance(traj, str):
            traj = json.loads(traj)
        for call in traj:
            md += f"- **{call['tool_name']}**\n"
            for k, v in call.get("tool_input", {}).items():
                md += f"  - `{k}`: `{v}`\n"
    return md


## 4. Create Agent Runner


In [50]:
async def run_fleet_safety_agent(query: str) -> dict:
    """
    Run the Fleet Safety Agent with a query and return parsed results.
    """
    app_name = "fleet_safety_eval"
    user_id = "eval_user"
    session_id = f"eval_session_{hash(query) % 10000}"

    # Create session service
    session_service = InMemorySessionService()
    await session_service.create_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )

    # Create runner
    runner = Runner(
        agent=orchestrator,
        app_name=app_name,
        session_service=session_service
    )

    # Run agent
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = [
        event async for event in runner.run_async(
            user_id=user_id,
            session_id=session_id,
            new_message=content
        )
    ]

    return parse_events_to_dict(events)


def run_fleet_safety_agent_sync(prompt: str) -> dict:
    """Sync wrapper for Vertex AI evaluation."""
    result = asyncio.run(run_fleet_safety_agent(prompt))
    result["predicted_trajectory"] = json.dumps(
        result["predicted_trajectory"] if isinstance(result["predicted_trajectory"], list) 
        else json.loads(result["predicted_trajectory"])
    )
    return result


## 5. Test the Agent Locally


In [51]:
# Test Query 1: Route Planning
query1 = "Plan a safe route for vehicle v001 from London to Manchester"
result1 = await run_fleet_safety_agent(query1)
display(Markdown(f"**Query:** {query1}\n\n" + format_as_markdown(result1)))


**Query:** Plan a safe route for vehicle v001 from London to Manchester

### Response
I apologize, but I was unable to plan the route. The `request_route_plan` tool requires a `driver_id` which was not provided. Please provide the driver's ID so I can proceed with planning a safe route for vehicle `v001` from London to Manchester.

### Tool Calls
- **request_route_plan**
  - `destination`: `Manchester`
  - `origin`: `London`
  - `priority`: `HIGH`
  - `vehicle_id`: `v001`


In [52]:
# Test Query 2: Fleet Status
query2 = "What is the current status of the fleet?"
result2 = await run_fleet_safety_agent(query2)
display(Markdown(f"**Query:** {query2}\n\n" + format_as_markdown(result2)))


**Query:** What is the current status of the fleet?

### Response
The current status of the fleet is as follows:

*   **Fleet Size:** 1 vehicle
*   **Active Vehicles:** 1
*   **Active Trips:** 0
*   **Total Alerts:** 0
*   **Critical Alerts:** 0
*   **System Health:** Good
*   **Timestamp:** 2025-11-30T15:24:53.259501

### Tool Calls
- **get_fleet_status**


In [53]:
# Test Query 3: Safety Check
query3 = "Check the safety status of vehicle v001"
result3 = await run_fleet_safety_agent(query3)
display(Markdown(f"**Query:** {query3}\n\n" + format_as_markdown(result3)))


**Query:** Check the safety status of vehicle v001

### Response
The safety status of vehicle `v001` is currently good. The Risk Monitor reports a **low risk level** with a risk score of 0. There are no active alerts and no historical incidents recorded for this vehicle. The safety rating is N/A.

### Tool Calls
- **check_vehicle_safety**
  - `vehicle_id`: `v001`


---

## 6. Vertex AI Gen AI Evaluation (Optional)

This section uses Vertex AI's evaluation service for more rigorous testing.

> **Prerequisites**: 
> - GCP Project with Vertex AI enabled
> - GCS bucket for evaluation outputs


In [54]:
# Skip this section if you don't have Vertex AI set up
ENABLE_VERTEX_EVAL = False  # Set to True to run Vertex AI evaluation

if ENABLE_VERTEX_EVAL:
    import vertexai
    from google.cloud import aiplatform
    from vertexai.preview.evaluation import EvalTask
    
    # Configure
    PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-project-id")
    LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")
    BUCKET_NAME = f"{PROJECT_ID}-eval-bucket"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    EXPERIMENT_NAME = "fleet-safety-eval"
    
    vertexai.init(project=PROJECT_ID, location=LOCATION, experiment=EXPERIMENT_NAME)
    print(f"Vertex AI initialized: {PROJECT_ID} / {LOCATION}")


### 6.1 Define Evaluation Dataset


In [55]:
# Fleet Safety specific evaluation dataset
eval_data = {
    "prompt": [
        "What is the current status of the fleet?",
        "Plan a safe route from London to Manchester for vehicle v001",
        "Check the safety status of vehicle v001",
        "Generate an executive dashboard for today",
        "What are the active alerts in the system?",
    ],
    "reference_trajectory": [
        # Fleet status - should call get_fleet_status
        [{"tool_name": "get_fleet_status", "tool_input": {"include_details": False}}],
        # Route planning - should call request_route_plan
        [{"tool_name": "request_route_plan", "tool_input": {
            "origin": "London", 
            "destination": "Manchester",
            "vehicle_id": "v001"
        }}],
        # Safety check - should call check_vehicle_safety
        [{"tool_name": "check_vehicle_safety", "tool_input": {"vehicle_id": "v001"}}],
        # Dashboard - should call generate_executive_dashboard
        [{"tool_name": "generate_executive_dashboard", "tool_input": {}}],
        # Alerts - should call get_fleet_status with details
        [{"tool_name": "get_fleet_status", "tool_input": {"include_details": True}}],
    ],
}

eval_dataset = pd.DataFrame(eval_data)
print(f"Evaluation dataset: {len(eval_dataset)} test cases")
eval_dataset


Evaluation dataset: 5 test cases


Unnamed: 0,prompt,reference_trajectory
0,What is the current status of the fleet?,"[{'tool_name': 'get_fleet_status', 'tool_input..."
1,Plan a safe route from London to Manchester fo...,"[{'tool_name': 'request_route_plan', 'tool_inp..."
2,Check the safety status of vehicle v001,"[{'tool_name': 'check_vehicle_safety', 'tool_i..."
3,Generate an executive dashboard for today,"[{'tool_name': 'generate_executive_dashboard',..."
4,What are the active alerts in the system?,"[{'tool_name': 'get_fleet_status', 'tool_input..."


### 6.2 Run Trajectory Evaluation


In [56]:
if ENABLE_VERTEX_EVAL:
    import random
    import string
    
    def get_id(length=8):
        return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
    
    # Trajectory metrics
    trajectory_metrics = [
        "trajectory_exact_match",
        "trajectory_in_order_match",
        "trajectory_any_order_match",
        "trajectory_precision",
        "trajectory_recall",
    ]
    
    EXPERIMENT_RUN = f"fleet-safety-trajectory-{get_id()}"
    
    trajectory_eval_task = EvalTask(
        dataset=eval_dataset,
        metrics=trajectory_metrics,
        experiment=EXPERIMENT_NAME,
        output_uri_prefix=BUCKET_URI + "/trajectory-eval",
    )
    
    print("Running trajectory evaluation...")
    trajectory_result = trajectory_eval_task.evaluate(
        runnable=run_fleet_safety_agent_sync,
        experiment_run_name=EXPERIMENT_RUN
    )
    
    print("\n### Summary Metrics")
    display(pd.DataFrame(trajectory_result.summary_metrics.items(), columns=["Metric", "Value"]))
else:
    print("Vertex AI evaluation disabled. Set ENABLE_VERTEX_EVAL = True to run.")


Vertex AI evaluation disabled. Set ENABLE_VERTEX_EVAL = True to run.


### 6.3 Run Response Quality Evaluation


In [57]:
if ENABLE_VERTEX_EVAL:
    # Response quality metrics
    response_metrics = ["safety", "coherence"]
    
    EXPERIMENT_RUN = f"fleet-safety-response-{get_id()}"
    
    response_eval_task = EvalTask(
        dataset=eval_dataset,
        metrics=response_metrics,
        experiment=EXPERIMENT_NAME,
        output_uri_prefix=BUCKET_URI + "/response-eval",
    )
    
    print("Running response quality evaluation...")
    response_result = response_eval_task.evaluate(
        runnable=run_fleet_safety_agent_sync,
        experiment_run_name=EXPERIMENT_RUN
    )
    
    print("\n### Summary Metrics")
    display(pd.DataFrame(response_result.summary_metrics.items(), columns=["Metric", "Value"]))
else:
    print("Vertex AI evaluation disabled. Set ENABLE_VERTEX_EVAL = True to run.")


Vertex AI evaluation disabled. Set ENABLE_VERTEX_EVAL = True to run.


---

## 7. Summary

This notebook provides:

1. **Local Testing** (Sections 2-5): Direct agent invocation without cloud dependencies
2. **Vertex AI Evaluation** (Section 6): Production-grade evaluation with trajectory and response metrics

### Next Steps

- Add more test cases to `eval_data` for comprehensive coverage
- Create custom metrics for fleet-safety-specific evaluation criteria
- Integrate with CI/CD pipeline using `make eval`
