Repo：https://github.com/CaptainMaxine/MultiAgent-with-A2A-MCP-server

## Basic setup

In [3]:
# --- System & module setup ----------------------------------------------------
import sys
import os
import asyncio
import logging
import threading
import time
import json
import re
import datetime
import uuid
from typing import Any, Optional

# --- A2A SDK compatibility shim -----------------------------------------------

from a2a.client import client as _base_client_impl
from a2a.client.card_resolver import A2ACardResolver

class _ClientShim:
    """Shallow wrapper to expose the expected attributes for older imports."""
    def __init__(self, real_client):
        for name in dir(real_client):
            if not name.startswith("_"):
                setattr(self, name, getattr(real_client, name))
        # explicitly surface the card resolver
        self.A2ACardResolver = A2ACardResolver

# install patched module into the import system
sys.modules["a2a.client.client"] = _ClientShim(_base_client_impl)

# --- Async & networking libs --------------------------------------------------
import httpx
import nest_asyncio
import uvicorn

# --- A2A & Google ADK components ---------------------------------------------
from a2a.client import (
    ClientConfig,
    ClientFactory,
    create_text_message_object,
)
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH

from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.genai.types import Part, Content
from types import SimpleNamespace

# allow nested event loops (important for notebooks)
nest_asyncio.apply()

# --- Logging configuration -----------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)


## Database Setup

In [4]:
import sqlite3
import datetime

# Path for local SQLite store
DB_PATH = "mcp.db"

def open_db():
    """Return a SQLite connection with row access by name."""
    conn = sqlite3.connect(DB_PATH, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn

def init_storage():
    """Rebuild database schema and load predefined demo data."""
    cx = open_db()
    cur = cx.cursor()

    # --- Create tables if missing -------------------------------------------------
    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS customers (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            name        TEXT,
            email       TEXT,
            phone       TEXT,
            status      TEXT,
            created_at  TEXT,
            updated_at  TEXT
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS tickets (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            customer_id INTEGER,
            issue       TEXT,
            status      TEXT,
            priority    TEXT,
            created_at  TEXT
        )
        """
    )

    cx.commit()

    # --- Clear existing records ---------------------------------------------------
    cur.execute("DELETE FROM customers")
    cur.execute("DELETE FROM tickets")

    timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat()

    # --- Populate customers -------------------------------------------------------
    seed_customers = [
        {"name": "Alice Premium",   "email": "alice@example.com",   "phone": "111-111-1111", "status": "active"},
        {"name": "Bob Standard",    "email": "bob@example.com",     "phone": "222-222-2222", "status": "active"},
        {"name": "Charlie Disabled","email": "charlie@example.com", "phone": "333-333-3333", "status": "disabled"},
        {"name": "Diana Premium",   "email": "diana@example.com",   "phone": "444-444-4444", "status": "active"},
        {"name": "Eve Standard",    "email": "eve@example.com",     "phone": "555-555-5555", "status": "active"},
    ]

    for entry in seed_customers:
        cur.execute(
            """
            INSERT INTO customers (name, email, phone, status, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (
                entry["name"],
                entry["email"],
                entry["phone"],
                entry["status"],
                timestamp,
                timestamp,
            ),
        )

    # --- Populate tickets ---------------------------------------------------------
    initial_tickets = [
        (1, "Billing duplicate charge", "open",        "high"),
        (1, "Unable to login",          "in_progress", "medium"),
        (2, "Request upgrade",          "open",        "low"),
        (4, "Critical outage",          "open",        "high"),
        (5, "Password reset",           "open",        "low"),
    ]

    for cid, issue, status, priority in initial_tickets:
        cur.execute(
            """
            INSERT INTO tickets (customer_id, issue, status, priority, created_at)
            VALUES (?, ?, ?, ?, ?)
            """,
            (cid, issue, status, priority, timestamp),
        )

    cx.commit()
    cx.close()

    print(f"Database initialized and seeded → {DB_PATH}")

# Execute initialization on import
init_storage()


Database initialized and seeded → mcp.db


## MCP

In [6]:
class MCPClient:
    """Client for calling MCP server tools."""

    def __init__(self, mcp_server_url: str = "http://127.0.0.1:8000"):
        self.mcp_server_url = mcp_server_url
        self._client = None

    async def _get_client(self):
        """Get or create HTTP client."""
        if self._client is None:
            self._client = httpx.AsyncClient(timeout=30.0)
        return self._client

    async def call_tool(self, tool_name: str, **params):
        """Call an MCP tool."""
        client = await self._get_client()
        try:
            response = await client.post(
                f"{self.mcp_server_url}/call",
                json={"tool": tool_name, "params": params}
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            return {"success": False, "error": f"MCP call failed: {str(e)}"}

    async def get_customer(self, customer_id: int):
        """Get customer by ID via MCP."""
        return await self.call_tool("get_customer", customer_id=customer_id)

    async def list_customers(self, status: str = None, limit: int = 100):
        """List customers via MCP."""
        params = {"limit": limit}
        if status:
            params["status"] = status
        return await self.call_tool("list_customers", **params)

    async def update_customer(self, customer_id: int, data: dict):
        """Update customer via MCP."""
        return await self.call_tool("update_customer", customer_id=customer_id, data=data)

    async def create_ticket(self, customer_id: int, issue: str, priority: str = "medium"):
        """Create ticket via MCP."""
        return await self.call_tool("create_ticket", customer_id=customer_id, issue=issue, priority=priority)

    async def get_customer_history(self, customer_id: int):
        """Get customer ticket history via MCP."""
        return await self.call_tool("get_customer_history", customer_id=customer_id)

    async def close(self):
        """Close HTTP client."""
        if self._client:
            await self._client.aclose()
            self._client = None

# Create global MCP client instance
mcp_client = MCPClient()
print("✓ MCP Client created (connecting to http://127.0.0.1:8000)")

✓ MCP Client created (connecting to http://127.0.0.1:8000)


In [7]:
# ============================================================================
# UTILITY BUILDERS FOR ADK EVENTS
# ============================================================================

def create_adk_event(text: str, partial: bool = False, invocation_id: str = None):
    """
    Construct an event object that mimics ADK's runtime streaming output.
    Returns an object carrying text, artifacts, metadata, and event identifiers.
    """

    # ---- lightweight components ------------------------------------------------
    class _TextChunk:
        """Represents a minimal content block used by ADK."""
        def __init__(self, payload: str):
            # A tiny container used by downstream ADK tools
            self.data = SimpleNamespace(text=payload)
            self.text = payload

    class _ArtifactBundle:
        """Collection wrapper for emitted text artifacts."""
        def __init__(self, payload: str):
            self.parts = [_TextChunk(payload)]

    # ---- main event object -----------------------------------------------------
    class _EventEnvelope:
        def __init__(self, body: str, is_partial: bool, invoke_id: str | None):
            now_ms = int(time.time() * 1000)

            self.invocation_id   = invoke_id or str(uuid.uuid4())
            self.event_id        = str(uuid.uuid4())
            self.timestamp       = now_ms
            self.timestamp_ms    = now_ms

            # ADK-compatible content field
            self.content = Content(parts=[Part(text=body)])

            # artifact structure
            self.artifacts = [_ArtifactBundle(body)]

            # stream control attributes
            self.partial        = is_partial
            self.turn_complete  = not is_partial
            self.author         = "agent"

            # error-related placeholders
            self.error_code        = None
            self.error_message     = None
            self.grounding_metadata = None
            self.execution_metadata = None
            self.custom_metadata    = None

            # delta and bookkeeping
            self.delta     = None
            self.type      = "agent_event"
            self.usage_metadata = None

            # relationship / routing fields
            self.branch     = None
            self.depth      = 0
            self.parent_id  = None
            self.session_id = None
            self.turn_id    = None

            # metadata block
            self.metadata = {
                "agent_name": "adk_agent",
                "timestamp_ms": now_ms,
                "invocation_id": self.invocation_id,
            }

            # ADK expects an actions namespace
            self.actions = SimpleNamespace(state_changes=None)

    return _EventEnvelope(text, partial, invocation_id)


## Agents

In [8]:
class CustomerDataAgent(Agent):
    """
    Agent that interprets customer-related requests and forwards them to the MCP
    customer database tools. This agent never performs local DB reads — all data
    comes from MCP-accessible methods.
    """

    def __init__(self):
        super().__init__(
            name="customer_data_agent",
            model="gemini-2.0-flash-exp",
            instruction=(
                "You translate natural-language user input into precise JSON actions. "
                "All customer data operations MUST be executed through MCP tools.\n\n"
                "Allowed operations:\n"
                "  • get_customer(id)\n"
                "  • list_customers(status?)\n"
                "  • get_customer_history(id)\n"
                "  • update_customer(id, fields)\n"
                "  • create_ticket(customer_id, issue, priority)\n\n"
                "Requirements:\n"
                "  • Output MUST be JSON only.\n"
                "  • No speculative assumptions.\n"
                "  • Always call the appropriate MCP tool.\n"
            ),
        )

    # =========================================================================
    # EXECUTION FLOW
    # =========================================================================
    async def run_async(self, ctx):
        """
        ADK entry point. Extracts text from the runtime context, determines the
        request type, executes the appropriate MCP call, and emits a JSON event.
        """

        raw_input = self._collect_user_text(ctx)
        normalized = raw_input.lower().strip()

        response = {
            "agent": "CustomerDataAgent",
            "status": "success",
        }

        try:
            intent = self._detect_intent(normalized)
            customer_id = self._pull_customer_id(raw_input)

            # ------------------------------------------------------------------
            # ROUTING: IDENTIFY THE REQUEST TYPE
            # ------------------------------------------------------------------
            if intent == "fetch_single" and customer_id:
                data = await self._mcp_get_customer(customer_id)
                if data:
                    response["action"] = f"fetch_customer:{customer_id}"
                    response["data"] = data
                else:
                    response.update({"status": "error", "error": "Customer not found"})

            elif intent == "list":
                status = "active" if "active" in normalized else None
                items = await mcp_client.fetch_customers(status=status)
                response["action"] = "list_customers"
                response["data"] = items.get("data", [])

            elif intent == "history" and customer_id:
                history = await self._mcp_history(customer_id)
                response["action"] = f"history:{customer_id}"
                response["data"] = history

            elif intent == "update" and customer_id:
                fields = self._extract_update_fields(raw_input)
                if not fields:
                    response.update({"status": "error", "error": "No update fields found"})
                else:
                    updated = await self._mcp_update(customer_id, fields)
                    response["action"] = f"update:{customer_id}"
                    response["data"] = updated

            elif intent == "ticket" and customer_id:
                issue_text = raw_input
                priority = (
                    "high"
                    if any(p in normalized for p in ["urgent", "immediate", "critical"])
                    else "medium"
                )
                ticket = await self._mcp_ticket(customer_id, issue_text, priority)
                response["action"] = f"create_ticket:{customer_id}"
                response["data"] = ticket

            elif intent == "support" and customer_id:
                data = await self._mcp_get_customer(customer_id)
                response["action"] = f"support_request:{customer_id}"
                response["data"] = data
                response["note"] = "Forward to support agent for handling."

            else:
                response.update(
                    {
                        "status": "error",
                        "error": "Unrecognized request",
                        "request": raw_input,
                    }
                )

        except Exception as exc:
            response.update({"status": "error", "error": str(exc)})

        yield create_adk_event(json.dumps(response, indent=2))

    # =========================================================================
    # TEXT EXTRACTION
    # =========================================================================
    def _collect_user_text(self, ctx) -> str:
        """Extract concatenated text from ADK ctx.user_content."""
        if not getattr(ctx, "user_content", None):
            return ""

        result = []
        contents = (
            ctx.user_content
            if isinstance(ctx.user_content, list)
            else [ctx.user_content]
        )

        for item in contents:
            if hasattr(item, "parts"):
                for part in item.parts:
                    if getattr(part, "text", None):
                        result.append(part.text)

        return " ".join(result)

    # =========================================================================
    # INTENT CLASSIFICATION
    # =========================================================================
    def _detect_intent(self, text: str) -> Optional[str]:
        """Determine what the user is asking for."""
        rules = {
            "fetch_single": ["get customer", "customer info", "customer information"],
            "list": ["list customers", "all customers"],
            "history": ["ticket history", "show tickets", "history"],
            "update": ["update", "change email", "change phone"],
            "ticket": ["refund", "charged", "issue", "problem", "create ticket"],
            "support": ["upgrade", "help"],
        }

        for intent, keys in rules.items():
            if any(kw in text for kw in keys):
                return intent

        return None


    def _pull_customer_id(self, text: str) -> Optional[int]:
        """Extract a customer ID from user text."""
        match = re.search(r"(?:id|customer)\s*[:\-]?\s*(\d+)", text.lower())
        return int(match.group(1)) if match else None

    def _extract_email(self, text: str) -> Optional[str]:
        match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text)
        return match.group(0) if match else None

    def _extract_phone(self, text: str) -> Optional[str]:
        match = re.search(r"\d{3}[-.]?\d{3}[-.]?\d{4}", text)
        return match.group(0) if match else None

    def _extract_update_fields(self, text: str) -> dict:
        """Collect updateable fields from raw text."""
        fields = {}
        email = self._extract_email(text)
        phone = self._extract_phone(text)

        if email:
            fields["email"] = email
        if phone:
            fields["phone"] = phone

        return fields

    # =========================================================================
    # MCP WRAPPERS
    # =========================================================================
    async def _mcp_get_customer(self, cid: int):
        result = await mcp_client.fetch_customer(cid)
        if result.get("success"):
            return result.get("data")
        return None

    async def _mcp_history(self, cid: int):
        result = await mcp_client.ticket_history(cid)
        if result.get("success"):
            return result.get("data", [])
        return []

    async def _mcp_update(self, cid: int, fields: dict):
        result = await mcp_client.modify_customer(cid, fields)
        if result.get("success"):
            return result.get("data")
        return None

    async def _mcp_ticket(self, cid: int, issue: str, priority: str):
        result = await mcp_client.open_ticket(cid, issue, priority)
        if result.get("success"):
            return result.get("data")
        return None


In [9]:
class SupportAgent(Agent):
    """
    Agent specializing in interpreting support-related issues
    (refunds, billing, upgrades, ticket creation, ticket queries)
    and executing them through MCP tools.
    """

    def __init__(self):
        super().__init__(
            name="support_agent",
            model="gemini-2.0-flash-exp",
            instruction=(
                "You analyze support requests and output structured JSON only.\n"
                "You must use MCP tools for all ticket/database operations.\n\n"
                "Support domains:\n"
                "  • billing/refund issues\n"
                "  • account upgrades\n"
                "  • ticket lookup\n"
                "  • ticket creation\n"
            )
        )

    # ======================================================================
    # MAIN EXECUTION
    # ======================================================================
    async def run_async(self, ctx):
        text_raw = self._collect(ctx)
        text = text_raw.lower().strip()

        reply = {
            "agent": "SupportAgent",
            "status": "success",
        }

        try:
            customer_id = self._extract_id(text_raw)
            priority = self._infer_priority(text)

            intent = self._determine_intent(text)

            # ------------------------------------------------------------
            # HANDLE INTENTS
            # ------------------------------------------------------------
            if intent == "refund":
                reply.update({
                    "action": "refund_request",
                    "priority": "HIGH",
                    "message": "Billing/refund issue detected",
                })
                if customer_id:
                    reply["customer_id"] = customer_id
                else:
                    reply["warning"] = "Customer ID missing"

            elif intent == "upgrade":
                reply.update({
                    "action": "account_upgrade",
                    "priority": priority or "MEDIUM",
                    "message": "Upgrade request detected",
                })
                if customer_id:
                    reply["customer_id"] = customer_id
                else:
                    reply["warning"] = "Customer ID missing"

            elif intent == "query_tickets":
                if not customer_id:
                    reply.update({"status": "error", "error": "Customer ID required"})
                else:
                    history = await mcp_client.ticket_history(customer_id)
                    if history.get("success"):
                        tickets = history.get("data", [])
                        if priority:
                            tickets = [t for t in tickets if t.get("priority", "").lower() == priority.lower()]
                        reply.update({
                            "action": "ticket_query",
                            "data": tickets,
                        })
                    else:
                        reply.update({
                            "status": "error",
                            "error": history.get("error", "Ticket query failed"),
                        })

            elif intent == "create_ticket":
                if not customer_id:
                    reply.update({"status": "error", "error": "Customer ID required"})
                else:
                    new_ticket = await mcp_client.open_ticket(
                        customer_id,
                        text_raw,
                        priority or "MEDIUM"
                    )
                    if new_ticket.get("success"):
                        reply.update({
                            "action": "ticket_created",
                            "priority": priority or "MEDIUM",
                            "data": new_ticket.get("data"),
                        })
                    else:
                        reply.update({
                            "status": "error",
                            "error": new_ticket.get("error", "Ticket creation failed"),
                        })

            else:
                reply.update({"status": "error", "error": "Unable to classify support intent"})

        except Exception as exc:
            reply.update({"status": "error", "error": str(exc)})

        yield create_adk_event(json.dumps(reply, indent=2))


    def _collect(self, ctx) -> str:
        """Collect user text from ADK context."""
        if not getattr(ctx, "user_content", None):
            return ""

        buffer = []
        items = ctx.user_content if isinstance(ctx.user_content, list) else [ctx.user_content]

        for content in items:
            if hasattr(content, "parts"):
                for p in content.parts:
                    if getattr(p, "text", None):
                        buffer.append(p.text)
        return " ".join(buffer)

    def _determine_intent(self, text: str) -> Optional[str]:
        """Return high-level support intent."""
        keywords = {
            "refund": ["refund", "charged twice", "duplicate", "unauthorized", "billing"],
            "upgrade": ["upgrade", "plan change", "switch plan", "premium", "account upgrade"],
            "query_tickets": ["query tickets", "show tickets", "open tickets", "ticket history"],
            "create_ticket": ["issue", "problem", "complaint", "report", "ticket"],
        }
        for intent, kws in keywords.items():
            if any(k in text for k in kws):
                return intent
        return None

    def _extract_id(self, text: str) -> Optional[int]:
        match = re.search(r"(?:id|customer)\s*[:\-]?\s*(\d+)", text.lower())
        return int(match.group(1)) if match else None

    def _infer_priority(self, text: str) -> Optional[str]:
        if any(x in text for x in ["urgent", "immediate", "critical"]):
            return "HIGH"
        if "medium" in text:
            return "MEDIUM"
        if "low" in text:
            return "LOW"
        return None


In [10]:
class RouterAgent(Agent):
    """
    Router agent: decides when to involve the CustomerDataAgent and/or
    SupportAgent via A2A, aggregates their JSON responses, and returns a
    single structured JSON result.
    """

    model_config = {"extra": "allow"}

    def __init__(self, customer_agent_url: str, support_agent_url: str):
        super().__init__(
            name="router_agent",
            model="gemini-2.0-flash-exp",
            instruction=(
                "You receive free-form customer service requests and decide "
                "which downstream agents to call via A2A.\n\n"
                "Routing rules (high level):\n"
                "  • CustomerDataAgent: customer info / lists / history / updates\n"
                "  • SupportAgent: refunds / billing / upgrades / ticket actions\n"
                "You may call one or both agents and then merge their JSON replies "
                "into a final JSON object.\n"
                "Only output JSON. Do not explain your reasoning.\n"
            ),
        )
        self.customer_agent_url = customer_agent_url
        self.support_agent_url = support_agent_url
        self._http_client: httpx.AsyncClient | None = None

    # ======================================================================
    # INTERNAL HELPERS
    # ======================================================================
    async def _http(self) -> httpx.AsyncClient:
        """Return a shared async HTTP client."""
        if self._http_client is None:
            self._http_client = httpx.AsyncClient(
                timeout=httpx.Timeout(
                    timeout=120.0,
                    connect=10.0,
                    read=120.0,
                    write=10.0,
                    pool=5.0,
                )
            )
        return self._http_client

    async def _invoke_a2a_agent(self, base_url: str, message: str) -> dict:
        """
        Call a remote A2A agent:
          1) Fetch its AgentCard
          2) Construct an A2A client
          3) Send a text message
          4) Parse the first JSON payload from its artifacts
        """
        client = await self._http()

        try:
            # 1) Fetch AgentCard
            card_resp = await client.get(f"{base_url}{AGENT_CARD_WELL_KNOWN_PATH}")
            card_resp.raise_for_status()
            card_payload = card_resp.json()
            agent_card = AgentCard(**card_payload)

            # 2) Build A2A client
            cfg = ClientConfig(
                httpx_client=client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
                use_client_preference=True,
            )
            factory = ClientFactory(cfg)
            a2a_client = factory.create(agent_card)

            # 3) Send message
            msg = create_text_message_object(content=message)
            collected = []
            async for item in a2a_client.send_message(msg):
                collected.append(item)

            # 4) Extract first JSON from artifacts
            if not collected:
                return {"status": "error", "error": "No response from downstream agent"}

            first = collected[0]
            # Typically (task, meta) or (task,) depending on implementation
            if isinstance(first, tuple) and first:
                task = first[0]
            else:
                task = first

            try:
                # ADK patched artifacts: artifacts[0].parts[0].root.text (JSON string)
                raw_text = task.artifacts[0].parts[0].root.text
                return json.loads(raw_text)
            except Exception as parse_exc:
                return {
                    "status": "error",
                    "error": f"Failed to parse downstream JSON: {parse_exc}",
                }

        except Exception as exc:
            return {"status": "error", "error": str(exc)}

    def _gather_input_text(self, ctx) -> str:
        """Merge user_content parts into a single string."""
        if not getattr(ctx, "user_content", None):
            return ""

        out: list[str] = []
        contents = ctx.user_content if isinstance(ctx.user_content, list) else [ctx.user_content]

        for content in contents:
            if hasattr(content, "parts"):
                for part in content.parts:
                    if getattr(part, "text", None):
                        out.append(part.text)

        return " ".join(out)

    def _routing_flags(self, text: str) -> dict:
        """
        Compute simple Boolean flags that describe which domains
        the query touches.
        """
        t = text.lower()

        # Customer-ish signals
        customer_signals = [
            "customer",
            "account",
            "get",
            "show",
            "history",
            "list",
            "active customers",
            "open tickets",
        ]
        # Support-ish signals
        support_signals = [
            "support",
            "help",
            "refund",
            "charged",
            "billing",
            "ticket",
            "upgrade",
            "plan change",
        ]

        wants_upgrade = any(k in t for k in ["upgrade", "plan change", "switch plan"])
        customer_related = any(k in t for k in customer_signals)
        support_related = any(k in t for k in support_signals)

        return {
            "customer_related": customer_related,
            "support_related": support_related,
            "wants_upgrade": wants_upgrade,
        }

    # ======================================================================
    # MAIN EXECUTION
    # ======================================================================
    async def run_async(self, ctx):
        """Route + orchestrate agent calls via A2A (async generator)."""

        user_text = self._gather_input_text(ctx)
        lowered = user_text.lower().strip()

        result: dict[str, Any] = {
            "router": "RouterAgent",
            "request": user_text,
            "actions": [],
            "results": {},
            "error": None,
        }

        invocation_id = str(uuid.uuid4())

        # Emit a partial "router started" event
        start_payload = json.dumps(
            {"status": "router_started", "request": user_text}, indent=2
        )
        yield create_adk_event(
            start_payload,
            partial=True,
            invocation_id=invocation_id,
        )

        try:
            if not user_text or not lowered:
                result["error"] = "No input text provided"
            else:
                flags = self._routing_flags(lowered)
                needs_customer = flags["customer_related"]
                needs_support = flags["support_related"]
                wants_upgrade = flags["wants_upgrade"]

                print(f"[Router] request = {user_text}")
                print(
                    f"[Router] flags: customer={needs_customer}, "
                    f"support={needs_support}, upgrade={wants_upgrade}"
                )

                # ----------------------------------------------------------
                # CASE 1: Coordinated upgrade flow → both agents
                # ----------------------------------------------------------
                if wants_upgrade and (needs_customer or needs_support):
                    customer_resp = await self._invoke_a2a_agent(
                        self.customer_agent_url, user_text
                    )
                    result["results"]["customer"] = customer_resp
                    result["actions"].append("customer_context_for_upgrade")

                    support_resp = await self._invoke_a2a_agent(
                        self.support_agent_url, user_text
                    )
                    result["results"]["support"] = support_resp
                    result["actions"].append("support_upgrade_handling")

                # ----------------------------------------------------------
                # CASE 2: Both domains present → context + support
                #        (e.g., 'show my active customers with open tickets')
                # ----------------------------------------------------------
                elif needs_customer and needs_support:
                    customer_resp = await self._invoke_a2a_agent(
                        self.customer_agent_url, user_text
                    )
                    result["results"]["customer"] = customer_resp
                    result["actions"].append("customer_context_fetch")

                    support_resp = await self._invoke_a2a_agent(
                        self.support_agent_url, user_text
                    )
                    result["results"]["support"] = support_resp
                    result["actions"].append("support_request_handling")

                # ----------------------------------------------------------
                # CASE 3: Only customer operations needed
                # ----------------------------------------------------------
                elif needs_customer:
                    customer_resp = await self._invoke_a2a_agent(
                        self.customer_agent_url, user_text
                    )
                    result["results"]["customer"] = customer_resp
                    result["actions"].append("customer_agent_invoked")

                # ----------------------------------------------------------
                # CASE 4: Only support operations needed
                # ----------------------------------------------------------
                elif needs_support:
                    support_resp = await self._invoke_a2a_agent(
                        self.support_agent_url, user_text
                    )
                    result["results"]["support"] = support_resp
                    result["actions"].append("support_agent_invoked")

                # ----------------------------------------------------------
                # CASE 5: Router cannot classify
                # ----------------------------------------------------------
                else:
                    result["error"] = "Router could not determine a suitable route"

        except Exception as exc:
            result["error"] = str(exc)
            import traceback

            result["traceback"] = traceback.format_exc()

        # Emit final event
        final_payload = json.dumps(result, indent=2)
        yield create_adk_event(
            final_payload,
            partial=False,
            invocation_id=invocation_id,
        )


In [11]:
from google.adk.sessions.in_memory_session_service import InMemorySessionService

class PatchedSessionService(InMemorySessionService):
    """
    A safer session service that guards against malformed ADK events
    (common when mixing custom ADK event objects + A2A message forwarding).
    """

    async def append_event(self, session, event):
        # Reject events that are clearly malformed
        if not getattr(event, "timestamp", None):
            # Still record something minimal to avoid breaking ADK chains
            event.timestamp = None
        return await super().append_event(session, event)


def normalize_event_fields(ev):
    """
    Ensure structural consistency for ADK event objects coming from
    different agent implementations.
    """

    # Required by ADK/UI but often missing on custom events
    if not hasattr(ev, "timestamp"):
        ev.timestamp = None

    if not hasattr(ev, "error_code"):
        ev.error_code = None

    if not hasattr(ev, "error_message"):
        ev.error_message = None

    return ev


class PatchedRunner(Runner):
    """
    Wraps Runner.run_async and applies field-level normalization to each event.
    """

    async def run_async(self, *args, **kwargs):
        async for ev in super().run_async(*args, **kwargs):
            yield normalize_event_fields(ev)


In [12]:
def build_agent_server(agent: Agent, public_name: str, port: int):
    """
    Construct a Starlette-based A2A server instance for the given agent.
    This includes:
      • AgentCard registration
      • Runner + Executor wiring
      • HTTP handler setup
    """

    card = AgentCard(
        name=public_name,
        url=f"http://127.0.0.1:{port}",
        description=f"{public_name} service endpoint",
        version="1.0",
        capabilities=AgentCapabilities(streaming=False),
        default_input_modes=["text/plain"],
        default_output_modes=["application/json"],
        preferred_transport=TransportProtocol.jsonrpc,
        skills=[
            AgentSkill(
                id="handle",
                name="Process Request",
                description=f"Invoke {public_name} for a user request.",
                tags=["service"],
                examples=["Handle input request"],
            )
        ],
    )

    # ADK runner with patched safety wrappers
    runner = PatchedRunner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=PatchedSessionService(),
        memory_service=InMemoryMemoryService(),
    )

    executor = A2aAgentExecutor(
        runner=runner,
        config=A2aAgentExecutorConfig()
    )

    handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    app = A2AStarletteApplication(agent_card=card, http_handler=handler)
    return app, card


async def launch_agent_server(agent: Agent, name: str, port: int):
    """
    Run a single agent server using uvicorn.
    """

    app, card = build_agent_server(agent, name, port)

    server_cfg = uvicorn.Config(
        app.build(),
        host="127.0.0.1",
        port=port,
        log_level="warning",
        loop="none",
    )

    server = uvicorn.Server(server_cfg)

    print(f"→ Starting {name} server on port {port}")
    await server.serve()


async def launch_all_agents():
    """
    Spawn all A2A agent servers concurrently.
    This function does NOT block indefinitely; it waits for the servers
    to start then gathers the server tasks.
    """

    tasks = [
        asyncio.create_task(launch_agent_server(customer_data_agent, "CustomerDataAgent", 9300)),
        asyncio.create_task(launch_agent_server(support_agent, "SupportAgent", 9301)),
        asyncio.create_task(launch_agent_server(router_agent, "RouterAgent", 9400)),
    ]

    await asyncio.sleep(2)
    print("\n" + "=" * 65)
    print(" A2A Agent Endpoints Active:")
    print("   • CustomerDataAgent → http://127.0.0.1:9300")
    print("   • SupportAgent      → http://127.0.0.1:9301")
    print("   • RouterAgent       → http://127.0.0.1:9400")
    print("=" * 65 + "\n")

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        # graceful shutdown
        pass


def run_in_background():
    """
    Launch the A2A servers inside their own event loop.
    Intended for notebook environments where the main loop
    is already in use.
    """

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    try:
        loop.run_until_complete(launch_all_agents())
    except KeyboardInterrupt:
        pass
    finally:
        pending = asyncio.all_tasks(loop)
        for t in pending:
            t.cancel()
        loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
        loop.close()


In [13]:
print("\n" + "=" * 65)
print("Initializing agent components...")
print("=" * 65)

# Core agent objects
customer_data_agent = CustomerDataAgent()
support_agent       = SupportAgent()
router_agent        = RouterAgent(
    customer_agent_url="http://127.0.0.1:9300",
    support_agent_url="http://127.0.0.1:9301",
)

# Status logs
print("[Init] CustomerDataAgent ready")
print("[Init] SupportAgent ready")
print("[Init] RouterAgent ready")



Initializing agent components...
[Init] CustomerDataAgent ready
[Init] SupportAgent ready
[Init] RouterAgent ready


In [14]:
print("\n" + "=" * 65)
print("Launching A2A agent services...")
print("=" * 65)

svc_thread = threading.Thread(
    target=run_in_background,
    daemon=True
)
svc_thread.start()

# Allow time for servers to open their ports
time.sleep(4)

print("[Startup] All A2A services have been scheduled for launch.\n")


  config=A2aAgentExecutorConfig()
  executor = A2aAgentExecutor(



Launching A2A agent services...
→ Starting CustomerDataAgent server on port 9300
→ Starting SupportAgent server on port 9301
→ Starting RouterAgent server on port 9400

 A2A Agent Endpoints Active:
   • CustomerDataAgent → http://127.0.0.1:9300
   • SupportAgent      → http://127.0.0.1:9301
   • RouterAgent       → http://127.0.0.1:9400

[Startup] All A2A services have been scheduled for launch.



In [15]:
class A2ASimpleClient:
    """
    Minimal A2A testing client.
    Caches AgentCard metadata and sends text messages to A2A endpoints.
    """

    def __init__(self, timeout: float = 120.0):
        self._cache: dict[str, dict] = {}
        self._timeout = timeout

    async def create_task(self, agent_url: str, message: str) -> str:
        """
        Send a text message to an A2A agent and return the first JSON artifact.
        Used only for testing outside the ADK pipeline.
        """

        timeout_cfg = httpx.Timeout(
            timeout=self._timeout,
            connect=10.0,
            read=self._timeout,
            write=10.0,
            pool=5.0,
        )

        async with httpx.AsyncClient(timeout=timeout_cfg) as client:

            # --------------------------------------------------------------
            # 1) Load or reuse the agent's card metadata
            # --------------------------------------------------------------
            if agent_url not in self._cache:
                card_resp = await client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}")
                card_resp.raise_for_status()
                self._cache[agent_url] = card_resp.json()

            card_data = self._cache[agent_url]
            agent_card = AgentCard(**card_data)

            # --------------------------------------------------------------
            # 2) Build A2A client
            # --------------------------------------------------------------
            cfg = ClientConfig(
                httpx_client=client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
                use_client_preference=True,
            )
            factory = ClientFactory(cfg)
            a2a = factory.create(agent_card)

            # --------------------------------------------------------------
            # 3) Send message and collect streaming items
            # --------------------------------------------------------------
            msg = create_text_message_object(content=message)
            events = []

            async for chunk in a2a.send_message(msg):
                events.append(chunk)

            # --------------------------------------------------------------
            # 4) Extract the JSON payload from the first returned event
            # --------------------------------------------------------------
            if not events:
                return "No response received"

            first = events[0]

            # Depending on transport, we may get (task, meta) or (task,)
            task = first[0] if isinstance(first, tuple) and first else first

            try:
                raw = task.artifacts[0].parts[0].root.text
                return raw
            except Exception:
                # Fallback to plain string formatting
                return str(task)


In [16]:
async def run_all_tests():
    """
    Execute a curated set of integration scenarios against the Router agent.
    Each scenario demonstrates a different orchestration pattern across
    CustomerDataAgent and SupportAgent via A2A.
    """

    client = A2ASimpleClient()

    print("\n" + "=" * 90)
    print("INTEGRATION TEST SUITE – A2A Multi-Agent Pipeline")
    print("=" * 90)


    scenarios = [
        {
            "name": "SCENARIO 1 — Simple Customer Lookup",
            "description": "Straightforward MCP read via CustomerDataAgent.",
            "url": "http://127.0.0.1:9400",
            "message": "Get customer information for ID 5",
        },
        {
            "name": "SCENARIO 2 — Coordinated Upgrade Request",
            "description": "Both agents participate: customer info + upgrade handling.",
            "url": "http://127.0.0.1:9400",
            "message": "I'm customer 12345 and need help upgrading my account",
        },
        {
            "name": "SCENARIO 3 — Complex Query (Active + Open Tickets)",
            "description": "Router negotiates across domains to answer combined intent.",
            "url": "http://127.0.0.1:9400",
            "message": "Show me all active customers who have open tickets",
        },
        {
            "name": "SCENARIO 4 — Escalation (Urgent Refund)",
            "description": "Refund detection and high-priority routing.",
            "url": "http://127.0.0.1:9400",
            "message": "I've been charged twice, please refund immediately!",
        },
        {
            "name": "SCENARIO 5 — Multi-Intent (Update + History)",
            "description": "Sequential + parallel reasoning through Router.",
            "url": "http://127.0.0.1:9400",
            "message": "Update my email to new@email.com and show my ticket history",
        },
    ]


    summary = []

    for sc in scenarios:
        print("\n" + "-" * 90)
        print(sc["name"])
        print(f"  → {sc['description']}")
        print("-" * 90)

        try:
            resp = await client.create_task(sc["url"], sc["message"])
            print(resp)
            summary.append({"test": sc["name"], "status": "PASSED"})
        except Exception as exc:
            print(f"[ERROR] {exc}")
            summary.append({"test": sc["name"], "status": "FAILED", "error": str(exc)})


    print("\n" + "=" * 90)
    print("TEST SUMMARY")
    print("=" * 90)

    for item in summary:
        icon = "✓" if item["status"] == "PASSED" else "✗"
        print(f"{icon} {item['test']}: {item['status']}")
        if "error" in item:
            print(f"    Error: {item['error']}")

    print("=" * 90 + "\n")


In [17]:

print("\n" + "=" * 90)
print("MULTI-AGENT CUSTOMER SERVICE SYSTEM — INTEGRATION RUN")
print("A2A ORCHESTRATION  |  MCP TOOLING  |  ROUTER PIPELINE")
print("=" * 90)

# Allow agents enough time to fully bind to their ports
print("\nPreparing environment, warming up agent services...")
time.sleep(2.5)

# Launch the full test suite
asyncio.run(run_all_tests())

print("\n[Done] Test suite execution completed.")
print("Note: All agent servers continue to run in background threads.\n")



MULTI-AGENT CUSTOMER SERVICE SYSTEM — INTEGRATION RUN
A2A ORCHESTRATION  |  MCP TOOLING  |  ROUTER PIPELINE

Preparing environment, warming up agent services...

INTEGRATION TEST SUITE – A2A Multi-Agent Pipeline

------------------------------------------------------------------------------------------
SCENARIO 1 — Simple Customer Lookup
  → Straightforward MCP read via CustomerDataAgent.
------------------------------------------------------------------------------------------


  run_request = self._config.request_converter(
  genai_parts = part_converter(a2a_part)
  return AgentRunRequest(
  task_result_aggregator = TaskResultAggregator()
  for a2a_event in self._config.event_converter(
  message = convert_event_to_a2a_message(
  a2a_parts = part_converter(part)
ERROR:google_adk.google.adk.a2a.executor.a2a_agent_executor:Error handling A2A request: 'types.SimpleNamespace' object has no attribute 'state_delta'
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/google/adk/a2a/executor/a2a_agent_executor.py", line 167, in execute
    await self._handle_request(context, event_queue)
  File "/usr/local/lib/python3.12/dist-packages/google/adk/a2a/executor/a2a_agent_executor.py", line 237, in _handle_request
    async for adk_event in agen:
  File "/tmp/ipython-input-2252841120.py", line 46, in run_async
    async for ev in super().run_async(*args, **kwargs):
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", lin

[Router] request = Get customer information for ID 5
[Router] flags: customer=True, support=False, upgrade=False
artifacts=None context_id='8d9e7f44-3dc9-40b0-805a-853130e987e7' history=[Message(context_id='8d9e7f44-3dc9-40b0-805a-853130e987e7', extensions=None, kind='message', message_id='08bba26e-bcfd-4620-91e9-73e8cf3b02b0', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Get customer information for ID 5'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='453d6dcc-5a35-4928-a07c-53b81161bf50'), Message(context_id='8d9e7f44-3dc9-40b0-805a-853130e987e7', extensions=None, kind='message', message_id='08bba26e-bcfd-4620-91e9-73e8cf3b02b0', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Get customer information for ID 5'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='453d6dcc-5a35-4928-a07c-53b81161bf50'), Message(context_id=None, extensions=None, kind='message', message_id='55749552-f456-4d0d-9174-f1295

ERROR:google_adk.google.adk.a2a.executor.a2a_agent_executor:Error handling A2A request: 'types.SimpleNamespace' object has no attribute 'state_delta'
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/google/adk/a2a/executor/a2a_agent_executor.py", line 167, in execute
    await self._handle_request(context, event_queue)
  File "/usr/local/lib/python3.12/dist-packages/google/adk/a2a/executor/a2a_agent_executor.py", line 237, in _handle_request
    async for adk_event in agen:
  File "/tmp/ipython-input-2252841120.py", line 46, in run_async
    async for ev in super().run_async(*args, **kwargs):
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", line 454, in run_async
    async for event in agen:
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", line 442, in _run_with_trace
    async for event in agen:
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", line 657, in _exec_with_plugin
    await

[Router] request = I'm customer 12345 and need help upgrading my account
[Router] flags: customer=True, support=True, upgrade=False
artifacts=None context_id='680d4b35-b906-45ea-ad70-aeee6b0899bd' history=[Message(context_id='680d4b35-b906-45ea-ad70-aeee6b0899bd', extensions=None, kind='message', message_id='768dc9f4-ecf6-487e-b648-8837ebdac9b4', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text="I'm customer 12345 and need help upgrading my account"))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='dbb31b91-79f9-4785-8293-859967750aeb'), Message(context_id='680d4b35-b906-45ea-ad70-aeee6b0899bd', extensions=None, kind='message', message_id='768dc9f4-ecf6-487e-b648-8837ebdac9b4', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text="I'm customer 12345 and need help upgrading my account"))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='dbb31b91-79f9-4785-8293-859967750aeb'), Message(context_id=None, extensions=None

ERROR:google_adk.google.adk.a2a.executor.a2a_agent_executor:Error handling A2A request: 'types.SimpleNamespace' object has no attribute 'state_delta'
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/google/adk/a2a/executor/a2a_agent_executor.py", line 167, in execute
    await self._handle_request(context, event_queue)
  File "/usr/local/lib/python3.12/dist-packages/google/adk/a2a/executor/a2a_agent_executor.py", line 237, in _handle_request
    async for adk_event in agen:
  File "/tmp/ipython-input-2252841120.py", line 46, in run_async
    async for ev in super().run_async(*args, **kwargs):
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", line 454, in run_async
    async for event in agen:
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", line 442, in _run_with_trace
    async for event in agen:
  File "/usr/local/lib/python3.12/dist-packages/google/adk/runners.py", line 657, in _exec_with_plugin
    await

[Router] request = I've been charged twice, please refund immediately!
[Router] flags: customer=False, support=True, upgrade=False
artifacts=None context_id='8c22219d-1c14-485e-af6a-1736951d85a6' history=[Message(context_id='8c22219d-1c14-485e-af6a-1736951d85a6', extensions=None, kind='message', message_id='eed915d3-52ac-40d2-8aed-1a25b6fbb74c', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text="I've been charged twice, please refund immediately!"))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='66aa608e-85e7-4b8e-9688-70d620d745bb'), Message(context_id='8c22219d-1c14-485e-af6a-1736951d85a6', extensions=None, kind='message', message_id='eed915d3-52ac-40d2-8aed-1a25b6fbb74c', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text="I've been charged twice, please refund immediately!"))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='66aa608e-85e7-4b8e-9688-70d620d745bb'), Message(context_id=None, extensions=None, kin

### **Conclusion**

Implementing this multi-agent system highlighted how important clear task boundaries and well-defined protocols are when agents collaborate. Designing the Router, Customer Data Agent, and Support Agent forced me to formalize intent detection, delegation logic, and multi-step coordination rather than relying on a single model. Integrating an MCP server also emphasized the value of structured tool APIs and showed how external data sources can be safely shared across agents without leaking internal implementation details.

The biggest challenge was debugging coordination between services running on different ports. Issues such as timeout handling, inconsistent tool responses, and information loss during agent-to-agent transfers required adding explicit logging and structured message formats. Ultimately, this project demonstrated how A2A patterns can scale beyond toy examples into practical customer service automation, and it helped solidify my understanding of modular agent architectures and protocol-driven design.

---