# Falcon-Eye Agent Test Notebook

Test agents locally by connecting to a running Falcon-Eye API backend.

**What this does:**
- Connects to your backend API to fetch agent configs, tools, and chat history
- Runs the LangGraph ReAct agent **locally** (same code as the agent pod)
- Shows every tool call, tool response, and LLM reasoning step
- Displays media items returned by tools

**Prerequisites:**
```bash
pip install langgraph langchain-anthropic langchain-openai httpx ipywidgets
```

In [None]:
# ── Configuration ─────────────────────────────────────────────
# Point this at your running Falcon-Eye API
API_URL = "http://localhost:8000"   # <-- CHANGE THIS to your backend URL
INTERNAL_API_KEY = ""               # <-- Set if your API requires X-Internal-Key

# LLM config (override per-agent if needed)
LLM_PROVIDER = "openai"            # openai | anthropic | ollama
LLM_MODEL = "gpt-4.1"              # model name
LLM_API_KEY = ""                    # <-- Set your API key here
LLM_BASE_URL = ""                  # Only needed for ollama / custom endpoints

# Agent to test (leave empty to list available agents)
AGENT_ID = ""                       # <-- Paste agent UUID here

In [None]:
# ── Setup: add agent code to path & imports ──────────────────
import sys, os

# Make sure we can import from the agent directory
AGENT_DIR = os.path.dirname(os.path.abspath("__file__"))
if AGENT_DIR not in sys.path:
    sys.path.insert(0, AGENT_DIR)

# Set env vars before importing agent modules
os.environ["API_URL"] = API_URL
os.environ["INTERNAL_API_KEY"] = INTERNAL_API_KEY
os.environ["AGENT_ID"] = AGENT_ID or ""

import json
import asyncio
import httpx
from datetime import datetime
from IPython.display import display, Markdown, HTML, Image

from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langgraph.prebuilt import create_react_agent

from tool_executor import build_tools
from main import get_llm

def _headers():
    h = {}
    if INTERNAL_API_KEY:
        h["X-Internal-Key"] = INTERNAL_API_KEY
    return h

async def api_get(path: str):
    async with httpx.AsyncClient(timeout=30, headers=_headers()) as c:
        res = await c.get(f"{API_URL}{path}")
        res.raise_for_status()
        return res.json()

async def api_post(path: str, data: dict):
    async with httpx.AsyncClient(timeout=30, headers=_headers()) as c:
        res = await c.post(f"{API_URL}{path}", json=data)
        res.raise_for_status()
        return res.json()

print("Imports OK. Connecting to:", API_URL)

In [None]:
# ── List Available Agents ────────────────────────────────────
agents_resp = await api_get("/api/agents/")
agents = agents_resp.get("agents", [])

if not agents:
    print("No agents found. Create one in the dashboard first.")
else:
    print(f"Found {len(agents)} agent(s):\n")
    for a in agents:
        tools_str = ", ".join(a.get("tools", [])[:5])
        if len(a.get("tools", [])) > 5:
            tools_str += f" (+{len(a['tools'])-5} more)"
        print(f"  {a['name']}")
        print(f"    ID:       {a['id']}")
        print(f"    Provider: {a['provider']} / {a['model']}")
        print(f"    Status:   {a['status']}")
        print(f"    Channel:  {a.get('channel_type') or 'none'}")
        print(f"    Tools:    {tools_str or 'none'}")
        print()

In [None]:
# ── Load Agent Config ────────────────────────────────────────
# Set AGENT_ID above, or pick one from the list:
if not AGENT_ID:
    if agents:
        AGENT_ID = agents[0]["id"]
        print(f"Auto-selected first agent: {agents[0]['name']} ({AGENT_ID})")
    else:
        raise ValueError("No agents available. Set AGENT_ID or create an agent first.")

config = await api_get(f"/api/agents/{AGENT_ID}/chat-config")

# Allow local overrides
provider = LLM_PROVIDER or config["provider"]
model = LLM_MODEL or config["model"]
api_key = LLM_API_KEY or config.get("api_key", "")
system_prompt = config.get("system_prompt", "You are a helpful AI assistant.")
tools_schema = config.get("tools_schema", [])
max_tokens = config.get("max_tokens", 4096)
temperature = config.get("temperature", 0.7)

print(f"Agent:    {AGENT_ID}")
print(f"Provider: {provider}")
print(f"Model:    {model}")
print(f"Tools:    {len(tools_schema)}")
if tools_schema:
    for t in tools_schema:
        fn = t["function"]
        print(f"  - {fn['name']}: {fn['description'][:80]}")

In [None]:
# ── Build the LangGraph Agent ────────────────────────────────
import uuid as _uuid

SESSION_ID = str(_uuid.uuid4())

agent_ctx = {
    "provider": provider,
    "model": model,
    "api_key": api_key,
    "agent_id": AGENT_ID,
    "session_id": SESSION_ID,
}

media_collector: list[dict] = []
tools = build_tools(tools_schema, media_collector, API_URL, agent_ctx)

llm = get_llm(provider, model, api_key, temperature, max_tokens, base_url=LLM_BASE_URL)
agent = create_react_agent(model=llm, tools=tools)

# Inject tools list into the system prompt (same as production)
if tools_schema:
    tool_lines = [f"- **{t['function']['name']}**: {t['function']['description']}" for t in tools_schema]
    system_prompt += (
        "\n\n## Available Tools\n"
        "You MUST use the appropriate tool when the user's request matches one. "
        "Do not describe what you would do — actually call the tool.\n\n"
        + "\n".join(tool_lines)
    )

conversation = [SystemMessage(content=system_prompt)]

print(f"Agent ready. Session: {SESSION_ID}")
print(f"Tools: {[t.name for t in tools]}")

In [None]:
# ── Helper: Run a single message and display full trace ─────

def _fmt_tool_args(args):
    """Pretty-format tool arguments."""
    if not args:
        return "(no arguments)"
    try:
        return json.dumps(args, indent=2)
    except Exception:
        return str(args)

def _truncate(text, limit=2000):
    if len(text) <= limit:
        return text
    return text[:limit] + f"\n... ({len(text) - limit} chars truncated)"


async def chat(user_message: str, max_steps: int = 25, verbose: bool = True):
    """
    Send a message to the agent and display the full execution trace.
    
    Returns (response_text, media_items, token_usage).
    """
    media_collector.clear()
    conversation.append(HumanMessage(content=user_message))
    
    if verbose:
        display(Markdown(f"---\n**You:** {user_message}"))
    
    response_text = ""
    total_input = 0
    total_output = 0
    step_num = 0
    tool_calls_log = []
    
    try:
        async for step in agent.astream(
            {"messages": conversation},
            config={"recursion_limit": max_steps},
            stream_mode="updates",
        ):
            for node_name, node_output in step.items():
                for msg in node_output.get("messages", []):
                    step_num += 1
                    
                    if isinstance(msg, AIMessage):
                        # Track token usage
                        usage = getattr(msg, "usage_metadata", None)
                        if usage and isinstance(usage, dict):
                            total_input += usage.get("input_tokens", 0)
                            total_output += usage.get("output_tokens", 0)
                        
                        # Tool calls
                        if msg.tool_calls:
                            for tc in msg.tool_calls:
                                tool_entry = {
                                    "step": step_num,
                                    "tool": tc["name"],
                                    "args": tc["args"],
                                    "id": tc["id"],
                                }
                                tool_calls_log.append(tool_entry)
                                
                                if verbose:
                                    display(Markdown(
                                        f"**Step {step_num} — Tool Call:** `{tc['name']}`\n"
                                        f"```json\n{_fmt_tool_args(tc['args'])}\n```"
                                    ))
                        
                        # Final text response
                        if msg.content and not msg.tool_calls:
                            response_text = msg.content
                            conversation.append(msg)
                    
                    elif isinstance(msg, ToolMessage):
                        # Match back to the tool call
                        tool_name = "unknown"
                        for entry in tool_calls_log:
                            if entry["id"] == msg.tool_call_id:
                                tool_name = entry["tool"]
                                entry["response"] = msg.content
                                break
                        
                        if verbose:
                            content_str = msg.content if isinstance(msg.content, str) else json.dumps(msg.content, indent=2)
                            display(Markdown(
                                f"**Step {step_num} — Tool Response** (`{tool_name}`)**:**\n"
                                f"```\n{_truncate(content_str)}\n```"
                            ))
    
    except Exception as e:
        err_type = type(e).__name__
        if "recursion" in str(e).lower():
            display(Markdown(f"**Hit recursion limit ({max_steps} steps).** Partial response below."))
        else:
            display(Markdown(f"**Error ({err_type}):** {e}"))
            raise
    
    # Display final response
    if verbose:
        display(Markdown(f"---\n**Agent Response:**\n\n{response_text}"))
    
    # Display media
    if media_collector:
        if verbose:
            display(Markdown(f"**Media Items ({len(media_collector)}):**"))
            for i, m in enumerate(media_collector):
                path = m.get('path', 'N/A')
                mtype = m.get('media_type', 'unknown')
                caption = m.get('caption', '')
                url = m.get('url', '')
                display(Markdown(
                    f"{i+1}. **{mtype}** — `{path}`\n"
                    f"   - URL: `{url}`\n"
                    f"   - Caption: {caption or '(none)'}\n"
                    f"   - Size: {m.get('size', '?')} bytes"
                ))
    
    # Token summary
    if verbose:
        display(Markdown(
            f"---\n*Tokens: {total_input} in / {total_output} out | "
            f"Steps: {step_num} | Tool calls: {len(tool_calls_log)} | "
            f"Media: {len(media_collector)}*"
        ))
    
    return {
        "response": response_text,
        "media": list(media_collector),
        "tool_calls": tool_calls_log,
        "tokens": {"input": total_input, "output": total_output},
        "steps": step_num,
    }

print("chat() helper ready. Use: result = await chat('your message')")

## Send Messages

Run the cell below to test. Change the message and re-run as needed.
The conversation history is maintained across calls.

In [None]:
# ── Test: Send a message ─────────────────────────────────────
result = await chat("List all cameras and their status")

In [None]:
# ── Test: Follow-up (conversation continues) ─────────────────
result = await chat("How many nodes are in the cluster?")

In [None]:
# ── Test: Media tool (snapshot/recording) ────────────────────
# Uncomment and set camera_id to test:
# result = await chat("Take a snapshot of camera <camera_id> and send it to me")

In [None]:
# ── Inspect: raw tool calls and responses ────────────────────
if result["tool_calls"]:
    print(f"Tool calls from last interaction ({len(result['tool_calls'])}):\n")
    for tc in result["tool_calls"]:
        print(f"  [{tc['step']}] {tc['tool']}")
        print(f"      Args: {json.dumps(tc['args'], indent=8)}")
        resp = tc.get('response', '(pending)')
        if isinstance(resp, str) and len(resp) > 300:
            resp = resp[:300] + '...'
        print(f"      Response: {resp}")
        print()
else:
    print("No tool calls in last interaction.")

In [None]:
# ── Inspect: full conversation history ───────────────────────
print(f"Conversation ({len(conversation)} messages):\n")
for i, msg in enumerate(conversation):
    role = type(msg).__name__.replace("Message", "")
    content = msg.content if isinstance(msg.content, str) else str(msg.content)
    if len(content) > 200:
        content = content[:200] + "..."
    print(f"  [{i}] {role}: {content}")

In [None]:
# ── Reset conversation ───────────────────────────────────────
# Uncomment to start fresh:
# conversation.clear()
# conversation.append(SystemMessage(content=system_prompt))
# media_collector.clear()
# print("Conversation reset.")

## Direct Tool Testing

Call tools directly without going through the LLM. Useful for debugging tool behavior.

In [None]:
# ── Direct tool execution (bypasses LLM) ─────────────────────
async def call_tool(tool_name: str, **kwargs):
    """
    Execute a tool directly via the API, bypassing the LLM.
    Shows the raw result and any media items.
    """
    payload = {
        "tool_name": tool_name,
        "arguments": kwargs,
        "agent_context": agent_ctx,
    }
    
    display(Markdown(f"**Calling tool:** `{tool_name}`\n```json\n{json.dumps(kwargs, indent=2)}\n```"))
    
    result = await api_post("/api/tools/execute", payload)
    
    tool_result = result.get("result", "")
    tool_media = result.get("media", [])
    
    display(Markdown(f"**Result:**\n```\n{_truncate(tool_result)}\n```"))
    
    if tool_media:
        display(Markdown(f"**Media ({len(tool_media)} items):**"))
        for m in tool_media:
            display(Markdown(f"- `{m.get('path', 'N/A')}` ({m.get('media_type', '?')}) — {m.get('caption', '')}"))
    
    return result

print("call_tool() ready. Usage: result = await call_tool('list_cameras')")
print("\nAvailable tools:")
for t in tools_schema:
    fn = t["function"]
    params = [p for p in fn.get("parameters", {}).get("properties", {}).keys()]
    print(f"  {fn['name']}({', '.join(params)})")

In [None]:
# ── Example: call a tool directly ────────────────────────────
result = await call_tool("list_cameras")

In [None]:
# ── Example: call tool with arguments ─────────────────────────
# result = await call_tool("camera_status", camera_id="YOUR-CAMERA-UUID")
# result = await call_tool("list_recordings")
# result = await call_tool("system_info")
# result = await call_tool("list_files", prefix="snapshots")
# result = await call_tool("send_media", path="snapshots/my_file.jpg", caption="test")

## Batch / Stress Testing

Run multiple prompts in sequence and collect results.

In [None]:
# ── Batch test: run multiple prompts ─────────────────────────
test_prompts = [
    "What cameras are online right now?",
    "How many nodes does the cluster have?",
    "Give me a system overview",
]

results = []
for prompt in test_prompts:
    # Reset conversation for each test
    conversation.clear()
    conversation.append(SystemMessage(content=system_prompt))
    media_collector.clear()
    
    r = await chat(prompt, verbose=False)
    results.append({"prompt": prompt, **r})
    
    status = "OK" if r["response"] else "EMPTY"
    print(f"  [{status}] {prompt}")
    print(f"       -> {len(r['response'])} chars, {r['tokens']['input']}+{r['tokens']['output']} tokens, {len(r['tool_calls'])} tools")

print(f"\nDone: {len(results)} tests")