## Dev MCP Tool Suite v1

This notebook is an **instructive reference** for using our development MCP servers:

- **Direct tool calls (no LLM)**: sanity-check servers and tool schemas
- **LLM + DeepAgents examples (Gemini 2.5 Flash)**: realistic agent workflows that use these MCP tools

### Local servers (FastMCP, streamable HTTP)
- dev_repo: `http://127.0.0.1:8100/mcp`
fastmcp run dev_repo_server.py --transport streamable-http --port 8100

- dev_search: `http://127.0.0.1:8101/mcp`
fastmcp run dev_search_server.py --transport streamable-http --port 8101

- dev_docs: `http://127.0.0.1:8104/mcp`
fastmcp run dev_docs_server.py --transport streamable-http --port 8104

### Remote server (optional)
- github: `https://api.githubcopilot.com/mcp/` (set `GITHUB_TOKEN`)

### Gemini (for LLM examples)
Set one of:
- `GEMINI_API_KEY`
- `GOOGLE_API_KEY`


In [1]:
from __future__ import annotations

import os
import time
import asyncio
from pathlib import Path

from langchain_mcp_adapters.client import MultiServerMCPClient


def find_project_root(start: Path | None = None) -> Path:
    cursor = (start or Path.cwd()).resolve()
    for candidate in [cursor, *cursor.parents]:
        if (candidate / "utils" / "mcp" / "fastmcp").is_dir():
            return candidate
    raise RuntimeError(f"Could not detect project root from {cursor}")


PROJECT_ROOT = find_project_root()
print(f"[INFO] PROJECT_ROOT={PROJECT_ROOT}")
print(f"[INFO] sys.executable={os.fspath(Path(os.__file__).resolve()) if False else __import__('sys').executable}")

# Optional remote GitHub MCP server (official)
GITHUB_SERVER_CONFIG = None
token = os.getenv("GITHUB_TOKEN")
if token:
    GITHUB_SERVER_CONFIG = {
        "transport": "streamable_http",
        "url": "https://api.githubcopilot.com/mcp/",
        "headers": {"Authorization": f"Bearer {token}"},
    }
    print("[INFO] GITHUB_TOKEN is set (GitHub MCP enabled)")
else:
    print("[INFO] GITHUB_TOKEN not set (GitHub MCP skipped)")


# =============================================================================
# HELPER: Load tools from MCP server with timeout
# =============================================================================
async def get_tools_for_server(server_name: str, server_config: dict, timeout_sec: float = 10.0):
    """Load tools from a single MCP server.
    
    Args:
        server_name: Name of the server (for logging)
        server_config: Config dict with transport, command/url, etc.
        timeout_sec: Timeout for loading tools
        
    Returns:
        List of LangChain tools from the server
    """
    client = MultiServerMCPClient({server_name: server_config})
    return await asyncio.wait_for(client.get_tools(), timeout=timeout_sec)


# =============================================================================
# HELPER: Call a tool directly (for testing)
# =============================================================================
async def call_tool(tools: list, tool_name: str, args: dict, timeout_sec: float = 10.0):
    """Call a specific tool by name with timeout.
    
    Args:
        tools: List of tools from get_tools_for_server
        tool_name: Name of tool to call (e.g., 'file.exists')
        args: Arguments to pass to the tool
        timeout_sec: Timeout for tool execution
        
    Returns:
        Tool result
    """
    tool = next((t for t in tools if t.name == tool_name), None)
    if not tool:
        raise ValueError(f"Tool '{tool_name}' not found. Available: {[t.name for t in tools]}")
    return await asyncio.wait_for(tool.ainvoke(args), timeout=timeout_sec)


[INFO] PROJECT_ROOT=C:\Users\pogawal\WorkFolder\Documents\Python\ai-dev-agent
[INFO] sys.executable=c:\App\Anaconda\envs\langgraph\python.exe
[INFO] GITHUB_TOKEN is set (GitHub MCP enabled)


In [2]:
# =============================================================================
# MCP SERVER CONFIGURATIONS - HTTP Transport
# =============================================================================
# NOTE: STDIO transport doesn't work in Jupyter (no fileno on notebook streams).
# Using HTTP transport with servers started separately.
#
# Start servers in terminal BEFORE running this notebook:
#   python utils/mcp/fastmcp/start_notebook_mcp_servers.py
# =============================================================================

LOCAL_SERVER_CONFIGS = {
    "dev_repo": {"transport": "streamable_http", "url": "http://127.0.0.1:8100/mcp"},
    "dev_search": {"transport": "streamable_http", "url": "http://127.0.0.1:8101/mcp"},
    "dev_docs": {"transport": "streamable_http", "url": "http://127.0.0.1:8104/mcp"},
}

print('[INFO] HTTP transport configured (servers must be running separately)')
for name, config in LOCAL_SERVER_CONFIGS.items():
    print(f"  {name}: {config['url']}")


[INFO] HTTP transport configured (servers must be running separately)
  dev_repo: http://127.0.0.1:8100/mcp
  dev_search: http://127.0.0.1:8101/mcp
  dev_docs: http://127.0.0.1:8104/mcp


In [3]:
# --- Start local FastMCP servers required by this notebook (idempotent) ---
# This starts ONLY what is not already reachable, and returns process handles
# only for servers started by this notebook.
from utils.mcp.fastmcp.start_notebook_mcp_servers import start_required_servers

# Keep a single global handle so re-running the cell doesn't spawn duplicates.
# Note: if you manually stop servers outside the notebook, set this to None and re-run.
if "_MCP_NOTEBOOK_SERVER_PROCS" not in globals():
    _MCP_NOTEBOOK_SERVER_PROCS = None

if not _MCP_NOTEBOOK_SERVER_PROCS:
    _MCP_NOTEBOOK_SERVER_PROCS = start_required_servers(project_root=PROJECT_ROOT)
else:
    print("[INFO] MCP servers already started by this notebook; skipping start")

if _MCP_NOTEBOOK_SERVER_PROCS:
    started = {name: p.pid for name, p in _MCP_NOTEBOOK_SERVER_PROCS.items()}
    print("[INFO] MCP servers started by this notebook:", started)
else:
    print("[INFO] No local servers were started (they were already running)")

# --- Connection configs (used below) ---
for name, cfg in LOCAL_SERVER_CONFIGS.items():
    print(name)
    print("  transport:", cfg.get("transport"))
    print("  url:", cfg.get("url"))


[INFO] Starting dev_repo (pid=15480) on http://127.0.0.1:8100/mcp
[INFO] Starting dev_search (pid=12932) on http://127.0.0.1:8101/mcp
[INFO] Starting dev_docs (pid=19388) on http://127.0.0.1:8104/mcp
[INFO] MCP servers started by this notebook: {'dev_repo': 15480, 'dev_search': 12932, 'dev_docs': 19388}
dev_repo
  transport: streamable_http
  url: http://127.0.0.1:8100/mcp
dev_search
  transport: streamable_http
  url: http://127.0.0.1:8101/mcp
dev_docs
  transport: streamable_http
  url: http://127.0.0.1:8104/mcp


In [11]:
# --- dev_repo: fast smoke calls ---
dev_repo_tools = await get_tools_for_server("dev_repo", LOCAL_SERVER_CONFIGS["dev_repo"])
print("[INFO] dev_repo tools:", [t.name for t in dev_repo_tools])

current_sprint_path = str(PROJECT_ROOT / "docs/agile/sprints/current_sprint.md")
print(await call_tool(dev_repo_tools, "file.exists", {"file_path": current_sprint_path}, timeout_sec=8))
print(
    await call_tool(
        dev_repo_tools,
        "file.read",
        {"file_path": current_sprint_path, "start_line": 1, "end_line": 25},
        timeout_sec=8,
    )
)


INFO:mcp.client.streamable_http:Received session ID: c94c684f39ea4e3e9de50bff3f38b722
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25
INFO:mcp.client.streamable_http:Received session ID: 66cc39e774224445b126b062b86e5967
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25


[INFO] dev_repo tools: ['file.list_directory', 'file.read', 'file.search_content', 'file.get_info', 'file.exists']


INFO:mcp.client.streamable_http:Received session ID: e9bca59a1f5a48ff9a688821cb2c83e6
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25


{"success":true,"file_path":"C:\\Users\\pogawal\\WorkFolder\\Documents\\Python\\ai-dev-agent\\docs\\agile\\sprints\\current_sprint.md","exists":true,"is_file":true,"is_dir":false,"is_symlink":false,"timestamp":"2026-01-12T16:32:39.202578"}
{"success":true,"file_path":"C:\\Users\\pogawal\\WorkFolder\\Documents\\Python\\ai-dev-agent\\docs\\agile\\sprints\\current_sprint.md","content":"# Current Sprint: Sprint 8 - DeepAgents + MCP Tooling Foundation\n\n**Sprint Number**: 8  \n**Sprint Name**: DeepAgents + MCP Agent-Building Sprint  \n**Duration**: 2 weeks (14 days)  \n**Start Date**: 2025-12-23  \n**End Date**: 2026-01-06  \n**Current Date**: 2025-12-23 (Day 1)  \n**Status**: **ACTIVE - WEEK 1**\n\n---\n\n## Sprint Goal\n\nBuild a reliable foundation for **DeepAgents-based agents** that use **MCP tools** (local FastMCP servers + remote MCP servers), aligned with the proven patterns in `tests/deep_agents/deep_agents_mcp.ipynb`.\n\n---\n\n## Quick Status\n\n**Current Phase**: Week 1 - MCP T

In [8]:
# --- dev_search: fast search call ---
dev_search_tools = await get_tools_for_server("dev_search", LOCAL_SERVER_CONFIGS["dev_search"])
print("[INFO] dev_search tools:", [t.name for t in dev_search_tools])

sprint8_dir = str(PROJECT_ROOT / "docs/agile/sprints/sprint_8")
print(
    await call_tool(
        dev_search_tools,
        "file.search_content",
        {"query": "US-MCP-002", "directory": sprint8_dir},
        timeout_sec=12,
    )
)


INFO:mcp.client.streamable_http:Received session ID: c33e11816cea494dbf4fd1fe0c28fc3d
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25
INFO:mcp.client.streamable_http:Received session ID: b25f2691000c4938878a16769b5fcb6e
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25


[INFO] dev_search tools: ['file.search_content']
{"success":true,"search_text":"US-MCP-002","directory":"C:\\Users\\pogawal\\WorkFolder\\Documents\\Python\\ai-dev-agent\\docs\\agile\\sprints\\sprint_8","total_matches":0,"results":[],"truncated":false,"timestamp":"2026-01-12T16:25:46.727110"}


In [12]:
# --- dev_docs: keep this scoped (full docs/agile can be slow) ---
dev_docs_tools = await get_tools_for_server("dev_docs", LOCAL_SERVER_CONFIGS["dev_docs"])
print("[INFO] dev_docs tools:", [t.name for t in dev_docs_tools])

sprint8_dir = str(PROJECT_ROOT / "docs/agile/sprints/sprint_8")
print(await call_tool(dev_docs_tools, "link.validate", {"target_directory": sprint8_dir}, timeout_sec=20))


INFO:mcp.client.streamable_http:Received session ID: adec8982d3cf499d9538d8daf8b8be02
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25
INFO:mcp.client.streamable_http:Received session ID: 2c8a127268de4bccb5f2e8e21a2275c3
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25


[INFO] dev_docs tools: ['link.scan_all', 'link.validate', 'link.generate_report', 'link.heal']
{"success":true,"data":{"valid_links":[],"broken_links":[],"external_links":[],"suspicious_links":[],"summary":{"total_links":0,"valid_count":0,"broken_count":0,"external_count":0,"validation_rate":0.0}},"message":"Validated 0 links: 0 valid, 0 broken"}


In [13]:
# --- github (remote): list tools only (fast) ---
if GITHUB_SERVER_CONFIG is None:
    print("[SKIP] GitHub MCP (set GITHUB_TOKEN to enable)")
else:
    github_tools = await get_tools_for_server("github", GITHUB_SERVER_CONFIG)
    print(f"[INFO] github tool count: {len(github_tools)}")
    for t in github_tools[:30]:
        print("-", t.name)


INFO:mcp.client.streamable_http:Received session ID: 9921607f-67d5-4e84-bbf2-0204db3c2041
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25
INFO:mcp.client.streamable_http:GET stream disconnected, reconnecting in 1000ms...


[INFO] github tool count: 37
- add_comment_to_pending_review
- add_issue_comment
- assign_copilot_to_issue
- create_branch
- create_or_update_file
- create_pull_request
- create_repository
- delete_file
- fork_repository
- get_commit
- get_file_contents
- get_label
- get_latest_release
- get_me
- get_release_by_tag
- get_tag
- issue_read
- issue_write
- list_branches
- list_commits
- list_issues
- list_pull_requests
- list_releases
- list_tags
- merge_pull_request
- pull_request_read
- pull_request_review_write
- push_files
- request_copilot_review
- search_code


### GitHub MCP (remote) smoke test

This is a minimal end-to-end test of the remote GitHub MCP server.

- Requires `GITHUB_TOKEN` (otherwise the test is skipped).
- Verifies a few expected tool names exist.
- Calls `get_me` (read-only) and asserts the response is non-empty.


In [None]:
import json


def _parse_maybe_json(value):
    """Parse JSON if the MCP tool returned a JSON string; otherwise return as-is."""
    if isinstance(value, str):
        try:
            return json.loads(value)
        except Exception:
            return value
    return value


if GITHUB_SERVER_CONFIG is None:
    print("[SKIP] GitHub MCP smoke test (set GITHUB_TOKEN to enable)")
else:
    github_tools = await get_tools_for_server("github", GITHUB_SERVER_CONFIG)
    tool_names = {t.name for t in github_tools}

    # Keep this small and stable: only verify a few read-only primitives exist
    required = {"get_me", "search_code", "get_file_contents"}
    missing = sorted(required - tool_names)
    assert not missing, (
        f"Missing expected GitHub MCP tools: {missing}. "
        f"Tool count={len(tool_names)}"
    )

    res = await call_tool(github_tools, "get_me", {}, timeout_sec=20)
    if "error" in res:
        raise RuntimeError(res["error"])

    payload = _parse_maybe_json(res.get("result"))
    assert payload, f"Empty get_me response: {payload!r}"

    print("[INFO] get_me response type:", type(payload).__name__)
    if isinstance(payload, dict):
        print("[INFO] get_me keys (sample):", sorted(payload.keys())[:30])
    else:
        print("[INFO] get_me (truncated):", str(payload)[:200])


## LangGraph MCP Agent - Our LangChain 1.x Implementation

This section uses our custom `utils/mcp/langgraph_mcp_agent.py` module which:

1. **Discovers tools** from FastMCP servers via HTTP/SSE
2. **Converts JSON Schema** to Pydantic models for LangChain compatibility
3. **Creates fresh connections** for each tool call (avoids stale connection issues)
4. **Builds a ReAct agent** using LangGraph 1.x API

This is inspired by DeepMCPAgent but rewritten to be compatible with LangChain 1.x.


In [15]:
# =============================================================================
# LangGraph MCP Agent - Using Our LangChain 1.x Compatible Implementation
# =============================================================================
# This uses utils/mcp/langgraph_mcp_agent.py which is compatible with
# LangChain 1.x and creates fresh connections for each tool call.

import os
import sys
sys.path.insert(0, str(PROJECT_ROOT))  # Ensure utils is importable

from langchain_google_genai import ChatGoogleGenerativeAI
from utils.mcp.langgraph_mcp_agent import build_mcp_agent, MCPServerConfig

# Setup LLM (Gemini 2.5 Flash, temperature=0 for determinism)
api_key = os.environ.get('GEMINI_API_KEY') or os.environ.get('GOOGLE_API_KEY')
if not api_key:
    raise ValueError('GEMINI_API_KEY or GOOGLE_API_KEY not set')

llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash-lite",
    temperature=0,
    convert_system_message_to_human=True,
    google_api_key=api_key,
)
print(f'[INFO] LLM ready: {llm.model}')

# Configure MCP servers using our MCPServerConfig
mcp_servers = {
    "dev_repo": MCPServerConfig(
        url="http://127.0.0.1:8100/mcp",
        transport="streamable-http"
    ),
}

print('[1] Building MCP Agent with tools from dev_repo server...')
graph, loader = await build_mcp_agent(
    servers=mcp_servers,
    model=llm,
    instructions="You are a file inspector. Use tools to check if files exist.",
    trace_tools=True,  # Show tool invocations
)

# Show discovered tools
print('[2] Discovered MCP tools:')
tool_infos = await loader.list_tool_info()
for info in tool_infos:
    desc = info.description[:60] + '...' if len(info.description) > 60 else info.description
    print(f'    - {info.name}: {desc}')

# Test with file.exists on README.md
test_file = str(PROJECT_ROOT / "README.md")
print(f'[3] Testing: Does {test_file} exist?')

result = await graph.ainvoke({
    "messages": [{"role": "user", "content": f"Check if this file exists: {test_file}"}]
})

print('[4] Agent response:')
for msg in result.get('messages', []):
    if hasattr(msg, 'content') and msg.content:
        print(f"    {msg.type}: {msg.content[:500]}")


INFO:mcp.client.streamable_http:Received session ID: 1de8231fccec4fca832854f713e46f7a
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25
INFO:mcp.client.streamable_http:Received session ID: ad336c33f14d4126b246b7a50e12e7b4
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25


[INFO] LLM ready: gemini-2.5-flash-lite
[1] Building MCP Agent with tools from dev_repo server...
[2] Discovered MCP tools:
    - file.list_directory: 
    - file.read: 
    - file.search_content: 
    - file.get_info: 
    - file.exists: 
[3] Testing: Does C:\Users\pogawal\WorkFolder\Documents\Python\ai-dev-agent\README.md exist?


  system_instruction, history = _parse_chat_history(
  class AiohttpClientSession(aiohttp.ClientSession):  # type: ignore[misc]
INFO:mcp.client.streamable_http:Received session ID: 6598f60abb5f4fcdae81e5ea2eac1e28
INFO:mcp.client.streamable_http:Negotiated protocol version: 2025-11-25


[TOOL] Invoking: file.exists with {'file_path': 'C:\\Users\\pogawal\\WorkFolder\\Documents\\Python\\ai-dev-agent\\README.md'}
[TOOL] Result from file.exists: {'success': True, 'file_path': 'C:\\Users\\pogawal\\WorkFolder\\Documents\\Python\\ai-dev-agent\\README.md', 'exists': True, 'is_file': True, 'is_dir': False, 'is_symlink': False, 'timestamp': '2026-01-12T16:33:33.545720'}


  system_instruction, history = _parse_chat_history(


[4] Agent response:
    human: Check if this file exists: C:\Users\pogawal\WorkFolder\Documents\Python\ai-dev-agent\README.md
    tool: CallToolResult(content=[TextContent(type='text', text='{"success":true,"file_path":"C:\\\\Users\\\\pogawal\\\\WorkFolder\\\\Documents\\\\Python\\\\ai-dev-agent\\\\README.md","exists":true,"is_file":true,"is_dir":false,"is_symlink":false,"timestamp":"2026-01-12T16:33:33.545720"}', annotations=None, meta=None)], structured_content={'success': True, 'file_path': 'C:\\Users\\pogawal\\WorkFolder\\Documents\\Python\\ai-dev-agent\\README.md', 'exists': True, 'is_file': True, 'is_dir': False, 'is_symlin
    ai: The file C:\Users\pogawal\WorkFolder\Documents\Python\ai-dev-agent\README.md exists.


### GitHub MCP repo showcase: `gitwalter/ai-dev-agent`

This section illustrates **read-only** GitHub MCP operations against the repository at `https://github.com/gitwalter/ai-dev-agent`:

- Fetch and print `README.md` (first characters)
- List branches
- List recent commits (small sample)

If `GITHUB_TOKEN` is not set, this section is skipped.



In [None]:
import base64
import json
from typing import Any


REPO_OWNER = "gitwalter"
REPO_NAME = "ai-dev-agent"
REPO_FULL_NAME = f"{REPO_OWNER}/{REPO_NAME}"
DEFAULT_REF = "main"


def _parse_maybe_json(value: Any) -> Any:
    """Parse JSON if the MCP tool returned a JSON string; otherwise return as-is."""
    if isinstance(value, str):
        try:
            return json.loads(value)
        except Exception:
            return value
    return value


def _get_tool(tools: list, tool_name: str):
    """Find a tool object by name or raise with a clear error."""
    tool = next((t for t in tools if t.name == tool_name), None)
    if tool is None:
        raise KeyError(f"Tool not found: {tool_name}. Available: {[t.name for t in tools]}")
    return tool


def _schema_field_names(tool: Any) -> set[str] | None:
    """Best-effort extraction of input field names from a LangChain tool args schema."""
    schema = getattr(tool, "args_schema", None)
    if schema is None:
        return None

    # Pydantic v2
    model_fields = getattr(schema, "model_fields", None)
    if isinstance(model_fields, dict):
        return set(model_fields.keys())

    # Pydantic v1
    fields = getattr(schema, "__fields__", None)
    if isinstance(fields, dict):
        return set(fields.keys())

    return None


def _repo_args_for(tool_name: str) -> dict[str, Any]:
    """Build repo-identifying args for a given tool based on its schema."""
    tool = _get_tool(github_tools, tool_name)
    field_names = _schema_field_names(tool)

    if field_names is None:
        # Fall back to the most common GitHub tool signature.
        # If this is wrong, the MCP server should return a clear validation error.
        return {"owner": REPO_OWNER, "repo": REPO_NAME}

    if "owner" in field_names and "repo" in field_names:
        return {"owner": REPO_OWNER, "repo": REPO_NAME}

    # Some tools accept a single full-name field.
    for full_name_field in ("repository", "repo_full_name", "full_name"):
        if full_name_field in field_names:
            return {full_name_field: REPO_FULL_NAME}

    raise RuntimeError(
        f"Cannot infer repo-identifying args for tool '{tool_name}'. "
        f"Fields={sorted(field_names)}"
    )


def _file_args_for(tool_name: str, *, path: str, ref: str | None) -> dict[str, Any]:
    """Build args for tools that read a file from a repo."""
    tool = _get_tool(github_tools, tool_name)
    field_names = _schema_field_names(tool)

    args = _repo_args_for(tool_name)

    # File path
    if field_names is None:
        args["path"] = path
    else:
        if "path" in field_names:
            args["path"] = path
        elif "file_path" in field_names:
            args["file_path"] = path
        else:
            raise RuntimeError(
                f"Cannot infer file path arg for tool '{tool_name}'. Fields={sorted(field_names)}"
            )

    # Optional ref/branch
    if ref is not None and field_names is not None:
        for ref_field in ("ref", "branch"):
            if ref_field in field_names:
                args[ref_field] = ref
                break

    return args


def _limit_args_for(tool_name: str, *, limit: int) -> dict[str, Any]:
    """Add an optional limit/per_page argument if the tool schema supports it."""
    tool = _get_tool(github_tools, tool_name)
    field_names = _schema_field_names(tool)

    args = _repo_args_for(tool_name)

    if field_names is None:
        # If we can't see schema, don't guess; just let the server default.
        return args

    for limit_field in ("per_page", "limit", "max_results"):
        if limit_field in field_names:
            args[limit_field] = limit
            break

    return args


def _decode_readme_payload(payload: Any) -> str:
    """Decode common GitHub content payload shapes deterministically."""
    if isinstance(payload, str):
        return payload

    if isinstance(payload, dict):
        # Common REST-style content response shape.
        content = payload.get("content")
        encoding = payload.get("encoding")
        if isinstance(content, str) and encoding == "base64":
            raw = base64.b64decode(content)
            return raw.decode("utf-8", errors="replace")

        # Some servers inline plaintext
        if isinstance(content, str) and encoding in (None, "utf-8", "text"):
            return content

        # Fallback: pretty print structured payload
        return json.dumps(payload, indent=2)[:2000]

    return str(payload)


if GITHUB_SERVER_CONFIG is None:
    print("[SKIP] GitHub MCP repo showcase (set GITHUB_TOKEN to enable)")
else:
    github_tools = await get_tools_for_server("github", GITHUB_SERVER_CONFIG)
    tool_names = {t.name for t in github_tools}

    required_tools = {"get_file_contents", "list_branches", "list_commits"}
    missing_tools = sorted(required_tools - tool_names)
    assert not missing_tools, f"Missing required GitHub MCP tools: {missing_tools}"

    # 1) README.md
    readme_call = await call_tool(
        github_tools,
        "get_file_contents",
        _file_args_for("get_file_contents", path="README.md", ref=DEFAULT_REF),
        timeout_sec=30,
    )
    if "error" in readme_call:
        raise RuntimeError(readme_call["error"])

    readme_payload = _parse_maybe_json(readme_call.get("result"))
    readme_text = _decode_readme_payload(readme_payload)
    print(f"[INFO] {REPO_FULL_NAME} README.md (first 400 chars):")
    print(readme_text[:400])

    # 2) Branches
    branches_call = await call_tool(
        github_tools,
        "list_branches",
        _repo_args_for("list_branches"),
        timeout_sec=30,
    )
    if "error" in branches_call:
        raise RuntimeError(branches_call["error"])

    branches_payload = _parse_maybe_json(branches_call.get("result"))
    if isinstance(branches_payload, list):
        branch_names = []
        for b in branches_payload:
            if isinstance(b, dict) and isinstance(b.get("name"), str):
                branch_names.append(b["name"])
            elif isinstance(b, str):
                branch_names.append(b)
        print("[INFO] Branches (sample):", branch_names[:10])
    else:
        print("[INFO] Branches payload (truncated):", str(branches_payload)[:400])

    # 3) Recent commits (small sample)
    commits_call = await call_tool(
        github_tools,
        "list_commits",
        _limit_args_for("list_commits", limit=5),
        timeout_sec=30,
    )
    if "error" in commits_call:
        raise RuntimeError(commits_call["error"])

    commits_payload = _parse_maybe_json(commits_call.get("result"))
    if isinstance(commits_payload, list) and commits_payload:
        first = commits_payload[0]
        if isinstance(first, dict):
            sha = first.get("sha") or first.get("id")
            msg = None
            commit_obj = first.get("commit")
            if isinstance(commit_obj, dict):
                message_obj = commit_obj.get("message")
                if isinstance(message_obj, str):
                    msg = message_obj
            if msg is None and isinstance(first.get("message"), str):
                msg = first.get("message")

            print("[INFO] Latest commit (best-effort):")
            print("  sha:", sha)
            if msg is not None:
                print("  message:", msg.splitlines()[0][:200])
        else:
            print("[INFO] Commits payload (first item):", str(first)[:400])
    else:
        print("[INFO] Commits payload (truncated):", str(commits_payload)[:400])



### GitHub MCP: `search_code` example

This cell demonstrates `search_code` against `gitwalter/ai-dev-agent`.

Important: GitHub code search is scoped to a single repo via the query qualifier `repo:OWNER/REPO`.
This example always includes `repo:gitwalter/ai-dev-agent` in the search query.

- Uses **read-only** GitHub MCP tooling
- Uses schema introspection to choose the right argument names
- Prints a small, stable sample (first few matches)



In [None]:
import json
from typing import Any


# Keep the query small and stable. This is intended as a reproducible example.
# IMPORTANT: GitHub code search is scoped via the query string qualifier:
#   repo:OWNER/REPO
# Without that qualifier, GitHub searches across all repos you have access to.
REPO_FULL_NAME = "gitwalter/ai-dev-agent"
SEARCH_TEXT = "MultiServerMCPClient"
SEARCH_QUERY = f"repo:{REPO_FULL_NAME} {SEARCH_TEXT}"
MAX_MATCHES_TO_PRINT = 10


def _parse_maybe_json(value: Any) -> Any:
    """Parse JSON if the MCP tool returned a JSON string; otherwise return as-is."""
    if isinstance(value, str):
        try:
            return json.loads(value)
        except Exception:
            return value
    return value


def _get_tool(tools: list, tool_name: str):
    """Find a tool object by name or raise with a clear error."""
    tool = next((t for t in tools if t.name == tool_name), None)
    if tool is None:
        raise KeyError(f"Tool not found: {tool_name}. Available: {[t.name for t in tools]}")
    return tool


def _schema_field_names(tool: Any) -> set[str] | None:
    """Best-effort extraction of input field names from a LangChain tool args schema."""
    schema = getattr(tool, "args_schema", None)
    if schema is None:
        return None

    model_fields = getattr(schema, "model_fields", None)
    if isinstance(model_fields, dict):
        return set(model_fields.keys())

    fields = getattr(schema, "__fields__", None)
    if isinstance(fields, dict):
        return set(fields.keys())

    return None


def _build_search_code_args(tool: Any) -> dict[str, Any]:
    """Build args for `search_code` based on schema fields."""
    fields = _schema_field_names(tool)

    # Fallback to common naming; MCP server should raise a clear validation error if wrong.
    if fields is None:
        return {
            "query": SEARCH_QUERY,
            "owner": "gitwalter",
            "repo": "ai-dev-agent",
        }

    args: dict[str, Any] = {}

    # Query field
    if "query" in fields:
        args["query"] = SEARCH_QUERY
    elif "search_query" in fields:
        args["search_query"] = SEARCH_QUERY
    else:
        raise RuntimeError(f"Cannot infer query field for search_code. Fields={sorted(fields)}")

    # Repo identification
    if "owner" in fields and "repo" in fields:
        args["owner"] = "gitwalter"
        args["repo"] = "ai-dev-agent"
    else:
        for full_name_field in ("repository", "repo_full_name", "full_name"):
            if full_name_field in fields:
                args[full_name_field] = "gitwalter/ai-dev-agent"
                break

    # Optional limits
    for limit_field in ("per_page", "limit", "max_results"):
        if limit_field in fields:
            args[limit_field] = 20
            break

    return args


if GITHUB_SERVER_CONFIG is None:
    print("[SKIP] GitHub MCP search_code example (set GITHUB_TOKEN to enable)")
else:
    github_tools = await get_tools_for_server("github", GITHUB_SERVER_CONFIG)

    search_tool = _get_tool(github_tools, "search_code")
    args = _build_search_code_args(search_tool)

    res = await call_tool(github_tools, "search_code", args, timeout_sec=30)
    if "error" in res:
        raise RuntimeError(res["error"])

    payload = _parse_maybe_json(res.get("result"))

    # Print a small best-effort view of the results.
    if isinstance(payload, dict):
        items = payload.get("items")
        if isinstance(items, list):
            print(f"[INFO] search_code matches (showing up to {MAX_MATCHES_TO_PRINT}):")
            for item in items[:MAX_MATCHES_TO_PRINT]:
                if isinstance(item, dict):
                    path = item.get("path") or item.get("file") or item.get("name")
                    repo = item.get("repository")
                    score = item.get("score")
                    print("-", {"path": path, "repository": repo, "score": score})
                else:
                    print("-", str(item)[:200])
        else:
            print("[INFO] search_code result keys:", sorted(payload.keys())[:50])
            print("[INFO] search_code payload (truncated):", json.dumps(payload, indent=2)[:1000])
    elif isinstance(payload, list):
        print(f"[INFO] search_code returned a list (showing up to {MAX_MATCHES_TO_PRINT}):")
        for item in payload[:MAX_MATCHES_TO_PRINT]:
            print("-", str(item)[:200])
    else:
        print("[INFO] search_code payload (truncated):", str(payload)[:1000])



## LLM + DeepAgents examples (Gemini 2.5 Flash)

These examples show how to use the MCP tools through a **tool-calling LLM** and **DeepAgents**.

Notes:
- These examples are designed to be deterministic: **temperature=0**.
- All examples are **read-only by default**. The only write-capable tool here is `link.heal`, which is demonstrated as a template and is not executed automatically.



In [None]:
import os

from langchain_google_genai import ChatGoogleGenerativeAI


# Gemini configuration (must match project standards)
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY")
if not api_key:
    raise RuntimeError(
        "Missing Gemini API key. Set GEMINI_API_KEY or GOOGLE_API_KEY in the environment."
    )

# IMPORTANT: temperature=0 and convert_system_message_to_human=True for Gemini compatibility.
llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0,
    convert_system_message_to_human=True,
)

print("[INFO] LLM ready:", getattr(llm, "model", "gemini-2.5-flash"))



### Load MCP tools (for agents)

This loads tools from our dev servers and groups them per server, so we can build focused subagents.



### Build DeepAgents subagents (focused toolsets)

We create small, focused subagents instead of one giant agent:

- **repo_agent**: local repo inspection (`dev_repo` tools)
- **search_agent**: fast local search (`dev_search` tools)
- **docs_agent**: link validation/reporting (`dev_docs` tools)
- **github_agent**: GitHub repo inspection and code search (GitHub MCP tools)

Then we create a **coordinator** that can delegate to the right specialist.



In [None]:
from langchain_core.messages import HumanMessage

try:
    from deepagents import create_deep_agent
except Exception as e:
    raise RuntimeError(
        "deepagents is required for this section. Install it in your environment. "
        f"Import error: {type(e).__name__}: {e}"
    )

print("[INFO] DeepAgents imported successfully")
print("[INFO] LLM and server configs already set up in previous cells")

### Simple LLM Tool Test with ACTUAL MCP Tools

This test demonstrates using the **actual MCP tools** (`file.exists`) directly with an LLM agent.

**Key insight**: MCP tools must be loaded fresh just before agent invocation to avoid stale HTTP connection state.

The pattern is:
1. Load tools fresh from MCP server
2. Create agent with those tools immediately
3. Invoke agent immediately while connections are active

In [None]:
# =============================================================================
# SIMPLE LLM TOOL TEST - Using ACTUAL MCP tools via STDIO transport
# =============================================================================
# STDIO transport spawns servers as subprocesses - much more reliable!
# No HTTP connection issues, no stale state.

from langchain_mcp_adapters.client import MultiServerMCPClient

# Create client with STDIO transport
print("[1] Creating MCP client with STDIO transport...")
client = MultiServerMCPClient({
    "dev_repo": LOCAL_SERVER_CONFIGS["dev_repo"]
})

# Get the actual MCP tools
print("[2] Loading MCP tools...")
repo_tools = await client.get_tools()
print(f"    Loaded {len(repo_tools)} tools: {[t.name for t in repo_tools]}")

# Create agent with ACTUAL MCP tools (no wrappers!)
print("[3] Creating agent with actual MCP tools...")
simple_agent = create_deep_agent(
    model=llm,
    tools=repo_tools,  # These are the ACTUAL MCP tools!
    system_prompt="You are a file inspector. Use the file.exists tool when asked about files."
)

# Test with a known file
test_file = str(PROJECT_ROOT / "README.md")
print(f"[4] Testing with file: {test_file}")

# Invoke the agent - tools should work!
print("[5] Invoking agent...")
response = await simple_agent.ainvoke({
    "messages": [HumanMessage(content=f"Check if this file exists: {test_file}")]
})

# Display result
print("[6] Agent response:")
if isinstance(response, dict) and 'messages' in response:
    for msg in response['messages']:
        if hasattr(msg, 'content'):
            print(f"  {msg.type}: {msg.content[:500] if len(msg.content) > 500 else msg.content}")
else:
    print(response)


### LLM examples: `dev_repo` tools (one example per tool)

These prompts are written to encourage the agent to call a specific tool.

Tools covered:
- `file.exists`
- `file.get_info`
- `file.list_directory`
- `file.read`
- `file.search_content`



In [None]:
from typing import Iterable


def _as_text(value: object) -> str:
    """Best-effort conversion of a model message content to text."""
    if isinstance(value, str):
        return value
    return str(value)


async def ask(agent_obj: Any, prompt: str) -> str:
    """Ask a DeepAgent and return the last message content as text."""
    res = await agent_obj.ainvoke({"messages": [{"role": "user", "content": prompt}]})
    return _as_text(res["messages"][-1].content)


async def run_examples(title: str, prompts: Iterable[str], agent_obj: Any) -> None:
    """Run a list of prompts against an agent and print compact results."""
    print(f"\n=== {title} ===\n")
    for i, p in enumerate(prompts, 1):
        print(f"[{i}] Prompt:\n{p}\n")
        out = await ask(agent_obj, p)
        # Keep output readable in notebooks
        print(f"[{i}] Response (truncated):\n{out[:1200]}\n")


current_sprint_path = str(PROJECT_ROOT / "docs/agile/sprints/current_sprint.md")

repo_prompts = [
    # file.exists
    "Call tool file.exists with file_path='" + current_sprint_path + "'. Return only the tool result.",
    # file.get_info
    "Call tool file.get_info with file_path='README.md'. Return only the tool result.",
    # file.list_directory
    "Call tool file.list_directory with directory='docs/agile/sprints', pattern='*.md', recursive=False. Return only the tool result.",
    # file.read
    "Call tool file.read with file_path='docs/agile/sprints/current_sprint.md', start_line=1, end_line=20. Return only the tool result.",
    # file.search_content
    "Call tool file.search_content with query='US-MCP-002', directory='docs/agile/sprints', file_pattern='*.md', max_results=10. Return only the tool result.",
]

await run_examples("dev_repo examples", repo_prompts, repo_agent)



### LLM examples: `dev_search` tool

Tool covered:
- `file.search_content`



### LLM examples: `dev_docs` link tools (one example per tool)

Tools covered:
- `link.validate`
- `link.scan_all` (expensive; disabled by default in the example)
- `link.generate_report`
- `link.heal` (write operation; shown as a template only)



In [None]:
# dev_search: file.search_content via LLM
search_prompts = [
    "Call tool file.search_content with query='search_content_mcp', directory='utils/mcp', file_pattern='*.py', max_results=10. Return only the tool result.",
]
await run_examples("dev_search examples", search_prompts, search_agent)


# dev_docs: link tools via LLM
sprint8_dir = str(PROJECT_ROOT / "docs/agile/sprints/sprint_8")

docs_prompts = [
    "Call tool link.validate with target_directory='" + sprint8_dir + "'. Return only the tool result.",
    "Call tool link.generate_report with target_directory='" + sprint8_dir + "', output_path=None. Return only the tool result.",
]
await run_examples("dev_docs examples", docs_prompts, docs_agent)

# link.scan_all can be expensive on large dirs. Keep it opt-in.
RUN_EXPENSIVE_LINK_SCAN = False
if RUN_EXPENSIVE_LINK_SCAN:
    scan_prompt = (
        "Call tool link.scan_all with target_directory='" + sprint8_dir + "'. "
        "Return only the tool result."
    )
    await run_examples("dev_docs scan_all example", [scan_prompt], docs_agent)

# link.heal is a write operation. This notebook does not execute it automatically.
# Template only:
#   rename_mapping = {"docs/old.md": "docs/new.md"}
#   call link.heal(rename_mapping=rename_mapping, target_directory="docs")
print("[INFO] link.heal example: template only (write operation)")



### LLM examples: GitHub MCP tools (including repo-scoped code search)

Tools covered (read-only):
- `get_me`
- `list_branches`
- `list_commits`
- `get_file_contents`
- `search_code` (always include `repo:gitwalter/ai-dev-agent` in the query)



In [None]:
if github_agent is None:
    print("[SKIP] GitHubAgent not available (set GITHUB_TOKEN to enable GitHub MCP)")
else:
    github_prompts = [
        "Call tool get_me with no args. Return only the tool result.",
        "Call tool list_branches for repository gitwalter/ai-dev-agent. Return only the tool result.",
        "Call tool list_commits for repository gitwalter/ai-dev-agent with a small limit (e.g., 5). Return only the tool result.",
        "Call tool get_file_contents for repository gitwalter/ai-dev-agent path='README.md' ref='main'. Return only the tool result.",
        "Call tool search_code with query='repo:gitwalter/ai-dev-agent MultiServerMCPClient'. Return only the tool result.",
    ]

    await run_examples("github examples", github_prompts, github_agent)



### DeepAgents coordinator demo: realistic dev workflow

This demonstrates a typical "dev agent" workflow:

- Local repo inspection (dev_repo)
- Local search (dev_search)
- Remote code search (GitHub MCP)
- Docs link validation (dev_docs)

The prompt asks the coordinator to delegate to specialists.



In [None]:
coordinator_prompt = """We previously hit a server-side error where a tool wrapper passed the wrong argument name to an underlying function.

Tasks:
1) Using local tools, find the definition of search_content_mcp and summarize its parameter names.
2) Using GitHub code search (repo-scoped), find occurrences of 'search_content_mcp(' in gitwalter/ai-dev-agent.
3) Using dev_docs, validate links under docs/agile/sprints/sprint_8 and report whether there are broken links.

Keep the answer structured as:
- Local signature
- GitHub occurrences (paths)
- Link validation summary
"""

result = await coordinator.ainvoke({"messages": [{"role": "user", "content": coordinator_prompt}]})
print(_as_text(result["messages"][-1].content)[:2000])



In [None]:
# --- Stop local FastMCP servers started by this notebook ---
# This will NOT stop servers that were already running before the start cell.
from utils.mcp.fastmcp.start_notebook_mcp_servers import stop_servers

if "_MCP_NOTEBOOK_SERVER_PROCS" not in globals() or not _MCP_NOTEBOOK_SERVER_PROCS:
    print("[INFO] No notebook-started MCP servers to stop")
else:
    stop_servers(_MCP_NOTEBOOK_SERVER_PROCS)
    _MCP_NOTEBOOK_SERVER_PROCS = None
    print("[OK] Notebook-started MCP servers stopped")
