In [None]:
# Install Google ADK
!pip install google-generativeai

[31mERROR: Operation cancelled by user[0m[31m
[0m^C


Github: https://github.com/bastetlu299/MCP_multiagent/tree/main

In [None]:
"""
Database interface layer used by the MCP server and A2A agents.

This module centralizes all data reads/writes to the SQLite database,
providing a clean API for the MCP tool handlers to interact with customers,
tickets, and interaction history.
"""

from __future__ import annotations

import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional

from database_setup import DatabaseSetup


# -----------------------------------------------------------------------------
#  Database initialization & configuration
# -----------------------------------------------------------------------------

_setup = DatabaseSetup()
_setup.initialize()  # ensures the DB file + schema exist
DB_PATH: Path = _setup.db_path


def _open_db() -> sqlite3.Connection:
    """
    Return a SQLite connection with row access configured as dict-like objects.
    """
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys = ON")
    return conn


# -----------------------------------------------------------------------------
#  Query Functions
# -----------------------------------------------------------------------------

def get_customer(customer_id: int) -> Optional[Dict[str, Any]]:
    """
    Fetch a single customer record by ID.
    """
    with _open_db() as db:
        row = db.execute(
            """
            SELECT id, name, email, status, created_at
            FROM customers
            WHERE id = ?
            """,
            (customer_id,),
        ).fetchone()
        return dict(row) if row else None


def list_customers(status: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]:
    """
    Retrieve multiple customers, optionally filtered by status.
    """
    with _open_db() as db:
        if status:
            rows = db.execute(
                """
                SELECT id, name, email, status, created_at
                FROM customers
                WHERE status = ?
                LIMIT ?
                """,
                (status, limit),
            ).fetchall()
        else:
            rows = db.execute(
                """
                SELECT id, name, email, status, created_at
                FROM customers
                LIMIT ?
                """,
                (limit,),
            ).fetchall()

        return [dict(r) for r in rows]


def modify_customer(customer_id: int, changes: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """
    Update permitted customer fields. Returns updated record or None if missing.
    """
    allowed = {"name", "email", "status"}
    updates = {k: v for k, v in changes.items() if k in allowed}

    # nothing to update ‚Üí return original
    if not updates:
        return get_customer(customer_id)

    with _open_db() as db:
        exists = db.execute(
            "SELECT 1 FROM customers WHERE id = ?", (customer_id,)
        ).fetchone()
        if not exists:
            return None

        assignments = ", ".join([f"{col} = ?" for col in updates])
        values = list(updates.values()) + [customer_id]

        db.execute(
            f"UPDATE customers SET {assignments} WHERE id = ?",
            values,
        )
        db.commit()

    return get_customer(customer_id)


def new_ticket(customer_id: int, issue: str, priority: str) -> Dict[str, Any]:
    """
    Insert a new support ticket and return the full ticket entry.
    """
    with _open_db() as db:
        cur = db.execute(
            """
            INSERT INTO tickets (customer_id, issue, priority, status)
            VALUES (?, ?, ?, 'open')
            """,
            (customer_id, issue, priority),
        )
        ticket_id = cur.lastrowid
        db.commit()

        row = db.execute(
            """
            SELECT id, customer_id, issue, priority, status, created_at
            FROM tickets
            WHERE id = ?
            """,
            (ticket_id,),
        ).fetchone()

        return dict(row)


def customer_history(customer_id: int) -> List[Dict[str, Any]]:
    """
    Retrieve interaction records for a customer, newest first.
    """
    with _open_db() as db:
        rows = db.execute(
            """
            SELECT id, channel, notes, created_at
            FROM interactions
            WHERE customer_id = ?
            ORDER BY created_at DESC
            """,
            (customer_id,),
        ).fetchall()

        return [dict(r) for r in rows]


# -----------------------------------------------------------------------------
#  Backwards-compatible aliases for legacy imports
# -----------------------------------------------------------------------------

def fetch_customer(customer_id: int) -> Optional[Dict[str, Any]]:
    return get_customer(customer_id)


def fetch_customers(status: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]:
    return list_customers(status=status, limit=limit)


def update_customer_record(customer_id: int, changes: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    return modify_customer(customer_id, changes)


def create_ticket_record(customer_id: int, issue: str, priority: str) -> Dict[str, Any]:
    return new_ticket(customer_id, issue, priority)


def fetch_history(customer_id: int) -> List[Dict[str, Any]]:
    return customer_history(customer_id)


# -----------------------------------------------------------------------------
#  Public API for import
# -----------------------------------------------------------------------------

__all__ = [
    "DB_PATH",
    "get_customer",
    "list_customers",
    "modify_customer",
    "new_ticket",
    "customer_history",
    "fetch_customer",
    "fetch_customers",
    "update_customer_record",
    "create_ticket_record",
    "fetch_history",
]


In [4]:
pip install --upgrade google-adk

Collecting google-adk
  Downloading google_adk-1.20.0-py3-none-any.whl.metadata (14 kB)
Collecting fastapi<0.119.0,>=0.115.0 (from google-adk)
  Using cached fastapi-0.118.3-py3-none-any.whl.metadata (28 kB)
Collecting opentelemetry-api<=1.37.0,>=1.37.0 (from google-adk)
  Downloading opentelemetry_api-1.37.0-py3-none-any.whl.metadata (1.5 kB)
Collecting opentelemetry-sdk<=1.37.0,>=1.37.0 (from google-adk)
  Downloading opentelemetry_sdk-1.37.0-py3-none-any.whl.metadata (1.5 kB)
Collecting starlette<1.0.0,>=0.46.2 (from google-adk)
  Downloading starlette-0.48.0-py3-none-any.whl.metadata (6.3 kB)
Collecting opentelemetry-semantic-conventions==0.58b0 (from opentelemetry-sdk<=1.37.0,>=1.37.0->google-adk)
  Downloading opentelemetry_semantic_conventions-0.58b0-py3-none-any.whl.metadata (2.4 kB)
Downloading google_adk-1.20.0-py3-none-any.whl (2.3 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32

In [1]:
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool import McpToolset, StreamableHTTPConnectionParams


In [None]:
!python app.py


In [2]:
from google.colab import userdata
import os
from termcolor import colored

# Get API key and MCP server URL from Colab secrets
try:
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
    os.environ['GOOGLE_API_KEY'] = GOOGLE_API_KEY
    print(colored("‚úÖ Google API Key loaded", "green"))
except Exception as e:
    print(colored("‚ùå GOOGLE_API_KEY not found in secrets", "red"))
    print(colored("   Please add your Google API key to Colab secrets", "yellow"))
    print(colored("   Get one at: https://aistudio.google.com/app/apikey", "yellow"))
    raise

try:
    MCP_SERVER_URL = userdata.get('MCP_SERVER_UR')
    print(colored(f"‚úÖ MCP Server URL loaded: {MCP_SERVER_URL}", "green"))
except Exception as e:
    print(colored("‚ùå MCP_SERVER_URL not found in secrets", "red"))
    print(colored("   Please add your MCP server URL to Colab secrets", "yellow"))
    print(colored("   Format: https://xxxx.ngrok.io/mcp", "yellow"))
    print(colored("   Run the mcp_customer_demo.ipynb notebook first to get this URL", "yellow"))
    raise

print()


‚úÖ Google API Key loaded
‚úÖ MCP Server URL loaded: http://0.0.0.0:8000



In [3]:
# common_mcp.py
from google.adk.tools.mcp_tool import McpToolset, StreamableHTTPConnectionParams

def build_mcp_toolset(mcp_server_url: str) -> McpToolset:
    """Return an MCP toolset pointing at your MCP HTTP server."""
    return McpToolset(
        connection_params=StreamableHTTPConnectionParams(
            url=mcp_server_url
        )
    )


In [4]:
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool import McpToolset, StreamableHTTPConnectionParams
from termcolor import colored

print(colored("üîß Creating ADK Agent (new API)...", "cyan"))

customer_agent = LlmAgent(
    name="customer_management_agent",
    model="gemini-pro",
    description=(
        "You are a helpful customer management assistant.\n"
        "You can:\n"
        "- Get information about specific customers\n"
        "- List customers\n"
        "- Update customer info\n"
        "- Create support tickets\n"
        "- Retrieve ticket history\n\n"
        "Use MCP tools to perform customer operations.\n"
        "Be friendly and precise in your answers."
    ),
    tools=[
        McpToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=MCP_SERVER_URL
            )
        )
    ]
)

print(colored("‚úÖ Agent created successfully!", "green", attrs=["bold"]))




üîß Creating ADK Agent (new API)...
‚úÖ Agent created successfully!


In [35]:
!unzip langgraph_sdk.zip -d /usr/local/lib/python3.12/dist-packages/


Archive:  langgraph_sdk.zip
replace /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/__init__.py? [y]es, [n]o, [A]ll, [N]one, [r]ename: ye s
  inflating: /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/__init__.py  
replace /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/types.py? [y]es, [n]o, [A]ll, [N]one, [r]ename: yes
  inflating: /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/types.py  
  inflating: /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/__pycache__/exceptions.cpython-311.pyc  
  inflating: /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/__pycache__/types.cpython-311.pyc  
  inflating: /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/__pycache__/__init__.cpython-311.pyc  
replace /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/exceptions.py? [y]es, [n]o, [A]ll, [N]one, [r]ename: yes
  inflating: /usr/local/lib/python3.12/dist-packages/langgraph_sdk/auth/exceptions.py  
replace /usr/loca

In [46]:
!pip install -q a2a-sdk[http-server] fastapi uvicorn httpx

"""
Data Agent (A2A-SDK + MCP)
--------------------------
This agent acts as a thin client over the MCP server. It exposes a single skill
that allows other agents (e.g., router or support) to request customer records,
lists, and interaction history via A2A.
"""

from multiprocessing import Process
from typing import Dict, Any
import httpx, uuid

from fastapi import FastAPI
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCard,
    AgentProvider,
    AgentCapabilities,
    AgentSkill,
)

# -------------------------------------------------------
# MCP endpoint (adjust if needed)
# -------------------------------------------------------
MCP_ENDPOINT = "http://localhost:8000"


async def mcpcall(name: str, args: Dict[str, Any]):
    """Invoke MCP tool via HTTP JSON."""
    async with httpx.AsyncClient() as c:
        r = await c.post(
            f"{MCP_ENDPOINT}/tools/call",
            json={"name": name, "arguments": args}
        )
        r.raise_for_status()
        return r.json().get("result")


# -------------------------------------------------------
# A2A Data Agent Logic (central skill handler)
# -------------------------------------------------------

async def data_handler(request: Dict[str, Any]) -> Dict[str, Any]:
    """
    request: {"query": "<user text>"}
    return:  {"text": "..."} (formatted for A2A)

    Supported queries:
    - "Get customer information for ID 5"
    - "Show me my ticket history"
    - "List active customers"
    - "Update my email to XXX"
    """

    q = request.get("query", "").lower()

    # --- Extract explicit customer ID ---
    if "customer" in q and "id" in q:
        for token in q.split():
            if token.isdigit():
                cid = int(token)
                result = await mcpcall("get_customer", {"customer_id": cid})
                return {"text": f"[DataAgent] Customer {cid}: {result}"}

    # --- List active customers ---
    if "active customers" in q or "list active" in q:
        result = await mcpcall("list_customers", {"status": "active", "limit": 50})
        return {"text": f"[DataAgent] Active customers: {result}"}

    # --- Ticket history ---
    if "ticket history" in q or "history" in q:
        hist = await mcpcall("get_customer_history", {"customer_id": 1})
        return {"text": f"[DataAgent] Ticket history: {hist}"}

    # --- Update email ---
    if "update my email" in q:
        try:
            new_email = q.split("update my email to", 1)[-1].strip()
            await mcpcall("update_customer", {
                "customer_id": 1,
                "data": {"email": new_email}
            })
            return {"text": f"[DataAgent] Email updated to {new_email}"}
        except:
            return {"text": "[DataAgent] Could not parse new email."}

    # --- Default fallback ---
    default_data = await mcpcall("get_customer", {"customer_id": 1})
    return {"text": f"[DataAgent] Default customer: {default_data}"}


# -------------------------------------------------------
# A2A AgentCard (metadata)
# -------------------------------------------------------

data_skill = AgentSkill(
    id="data_agent_skill",
    name="Customer Data Tools",
    description="Uses MCP to retrieve, list, update customer information.",
    tags=["data", "mcp"],
    inputModes=["json"],
    outputModes=["json"],
    examples=[
        "Get customer information for ID 5",
        "Show my ticket history",
        "List active customers",
        "Update my email to new@example.com",
    ],
)

data_card = AgentCard(
    name="Data Agent",
    description="Exposes MCP-backed customer data operations.",
    version="1.0.0",
    url="http://localhost:8011",
    documentationUrl="https://example.com/docs/customer-data",
    capabilities=AgentCapabilities(streaming=False),
    defaultInputModes=["json"],
    defaultOutputModes=["json"],
    provider=AgentProvider(
        organization="Assignment 5",
        url="http://localhost:8011",
    ),
    skills=[data_skill],
)


# -------------------------------------------------------
# Build FastAPI App + A2A Runtime
# -------------------------------------------------------

data_handler_wrapper = DefaultRequestHandler(
    agent_executor=data_handler,
    task_store=InMemoryTaskStore()
)

data_app = A2AStarletteApplication(
    agent_card=data_card,
    http_handler=data_handler_wrapper
).build()


# -------------------------------------------------------
# Launch data agent (NO main, safe for Colab)
# -------------------------------------------------------

def launch_data_agent():
    import uvicorn
    uvicorn.run(data_app, host="0.0.0.0", port=8011)

data_proc = Process(target=launch_data_agent, daemon=True)
data_proc.start()

print("üöÄ Data Agent running at http://localhost:8011")



üöÄ Data Agent running at http://localhost:8011


In [43]:
!pip install -q a2a-sdk[http-server] fastapi uvicorn httpx

"""
Payment / Billing Agent (A2A-SDK version)
-----------------------------------------
Handles queries related to invoices, refunds, and general payment issues.
This agent does not call MCP tools directly; instead, it provides domain-
specific responses to upstream agents such as the router.
"""

from multiprocessing import Process
from typing import Dict, Any

from fastapi import FastAPI
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentProvider, AgentSkill




async def payment_handler(payload: Dict[str, Any]) -> Dict[str, Any]:
    """
    Produce a text-based response summarizing billing capabilities.
    Expects: {"query": "<user text>"} from router or client.
    """
    user_text = str(payload.get("query", "")).strip()
    reply = (
        "Payment Agent Response:\n"
        "I handle refunds, invoice issues, failed payments, and account charges.\n"
        f"Your request: {user_text or '(empty)'}"
    )
    return {"text": reply}



payment_skill = AgentSkill(
    id="payment",
    name="Payment Services",
    description="Supports billing problems and refund workflows.",
    tags=["billing", "payments"],
    inputModes=["json"],
    outputModes=["json"],
    examples=["Issue refund", "Send invoice", "Payment failed"],
)

payment_card = AgentCard(
    name="Payment Agent",
    description="Provides assistance for payment, invoices, and refund inquiries.",
    version="1.0.0",
    url="http://localhost:8013",
    documentationUrl="https://example.com/docs/payments",
    defaultInputModes=["json"],
    defaultOutputModes=["json"],
    capabilities=AgentCapabilities(streaming=False),
    provider=AgentProvider(
        organization="Assignment 5",
        url="http://localhost:8013",
    ),
    skills=[payment_skill],
)


payment_handler_wrapper = DefaultRequestHandler(
    agent_executor=payment_handler,
    task_store=InMemoryTaskStore(),
)

payment_app = A2AStarletteApplication(
    agent_card=payment_card,
    http_handler=payment_handler_wrapper,
).build()

def launch_payment():
    import uvicorn
    uvicorn.run(payment_app, host="0.0.0.0", port=8013)

payment_proc = Process(target=launch_payment, daemon=True)
payment_proc.start()

print("üöÄ Payment Agent running at http://localhost:8013")


üöÄ Payment Agent running at http://localhost:8013


In [44]:
!pip install -q a2a-sdk[http-server] fastapi uvicorn

"""
Support Agent (A2A-SDK version)
-------------------------------
Provides empathetic, user-facing support responses. If the message includes
data context forwarded by the router (e.g., ‚ÄúData context: ...‚Äù), the agent
incorporates it into a more informed guidance message.
"""

from __future__ import annotations

from multiprocessing import Process
from typing import Dict, Any

from fastapi import FastAPI
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCard,
    AgentCapabilities,
    AgentProvider,
    AgentSkill,
)


# ---------------------------------------------------------------------------
# Internal helper functions
# ---------------------------------------------------------------------------

def parse_support_prompt(text: str) -> tuple[str, str]:
    """
    Extract optional upstream "data context" and the actual customer request.
    Router sends messages formatted like:
        "Data context: ... Now craft guidance..."
    Returns:
        (context_text, user_request)
    """
    if "Data context:" in text:
        parts = text.split("Data context:", 1)
        lead = parts[0].strip()
        context = parts[1].strip()
        request = lead if lead else "your request"
        return context, request

    cleaned = text.strip()
    return "", cleaned or "your request"


def generate_suggestions(user_prompt: str) -> list[str]:
    """
    Produce 2‚Äì3 practical next steps based on keywords in the request.
    """
    lower = user_prompt.lower()
    out: list[str] = []

    if any(k in lower for k in ["login", "password"]):
        out.append("Try resetting your password and confirm you can sign in from a trusted browser.")
        out.append("If it still fails, share the exact error message so we can diagnose quickly.")
    elif any(k in lower for k in ["ticket", "issue", "problem"]):
        out.append("I can open a support ticket and notify you as soon as there's progress.")
        out.append("Screenshots or timestamps would help us troubleshoot faster.")
    elif any(k in lower for k in ["history", "follow", "activity"]):
        out.append("I've reviewed your recent activity and will keep an eye on any new updates.")
        out.append("If something changes on your side, let me know and we can adjust next steps.")
    else:
        out.append("Tell me any specific details you'd like us to verify or double-check.")
        out.append("We can also set up a short follow-up if you need more help.")

    out.append("If this is urgent, reply here and I‚Äôll jump on it immediately.")
    return out


# ---------------------------------------------------------------------------
# Skill implementation (A2A handler)
# ---------------------------------------------------------------------------

async def support_handler(payload: Dict[str, Any]) -> Dict[str, Any]:
    """
    Generate a friendly, end-user-facing support reply.
    Expects payload like {"query": "<text from router or user>"}.
    """
    text = str(payload.get("query", "")).strip()
    context_text, request_text = parse_support_prompt(text)

    # Greeting
    if context_text:
        opening = "Hi there ‚Äî I reviewed the latest notes on your account."
    else:
        opening = "Hi there, thanks for reaching out."

    # Small contextual line
    prompt_lower = text.lower()
    if "login" in prompt_lower:
        context_line = "It looks like you're having trouble signing in."
    elif any(k in prompt_lower for k in ["ticket", "issue", "problem"]):
        context_line = "I can see you‚Äôre dealing with an issue that needs attention."
    elif context_text:
        context_line = "I‚Äôve read through the account history you mentioned."
    else:
        context_line = ""

    # Build suggestions
    steps = generate_suggestions(text)

    response_lines = [
        opening,
        context_line,
        "",
        f"Here‚Äôs what I recommend based on {request_text}:",
    ]

    for s in steps[:3]:
        response_lines.append(f"- {s}")

    response_lines.append(
        "If you'd like me to take action now, just reply to this message and I‚Äôll handle it."
    )

    final_text = "\n".join(line for line in response_lines if line)
    return {"text": final_text}


# ---------------------------------------------------------------------------
# Agent metadata
# ---------------------------------------------------------------------------

support_skill = AgentSkill(
    id="support-general",
    name="General Support",
    description="Handles everyday support inquiries and troubleshooting questions.",
    tags=["support", "triage", "helpdesk"],
    inputModes=["json"],
    outputModes=["json"],
    examples=[
        "Help me reset my password",
        "I need to troubleshoot an issue",
        "Review my recent activity",
    ],
)

support_card = AgentCard(
    name="Support Agent",
    description="Provides troubleshooting help and customer-friendly guidance.",
    url="http://localhost:8012",
    version="1.0.0",
    documentationUrl="https://example.com/docs/support",
    defaultInputModes=["json"],
    defaultOutputModes=["json"],
    capabilities=AgentCapabilities(streaming=False),
    provider=AgentProvider(
        organization="Assignment 5",
        url="http://localhost:8012",
    ),
    skills=[support_skill],
)

support_handler_wrapper = DefaultRequestHandler(
    agent_executor=support_handler,
    task_store=InMemoryTaskStore(),
)

support_app = A2AStarletteApplication(
    agent_card=support_card,
    http_handler=support_handler_wrapper,
).build()

def launch_support():
    import uvicorn
    uvicorn.run(support_app, host="0.0.0.0", port=8012)

support_proc = Process(target=launch_support, daemon=True)
support_proc.start()

print("üöÄ Support Agent running at http://localhost:8012")


üöÄ Support Agent running at http://localhost:8012


In [None]:


import uuid
from multiprocessing import Process
from typing import Dict, Any, List

import httpx
from fastapi import FastAPI
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCard,
    AgentCapabilities,
    AgentProvider,
    AgentSkill,
)

DATA_URL = "http://localhost:8011"
SUPPORT_URL = "http://localhost:8012"
PAYMENT_URL = "http://localhost:8013"



async def a2a_call(url: str, skill_id: str, payload: Dict[str, Any]) -> Any:
    rpc_request = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "a2a.agent.invoke",
        "params": {
            "skill": skill_id,
            "input": payload,
        },
    }
    async with httpx.AsyncClient() as client:
        resp = await client.post(url, json=rpc_request)
        resp.raise_for_status()
        body = resp.json()
        return body.get("result")




async def router_handler(payload: Dict[str, Any]) -> Dict[str, Any]:
    """
    payload: {"query": "<user text>"}
    returns:
      {
        "text": "...Router summary...",
        "route": "<scenario_name>",
        "agents_called": ["data", "support", ...],
        "logs": ["...", "..."]
      }
    """

    user_text = str(payload.get("query", "")).strip()
    lowered = user_text.lower()

    logs: List[str] = []
    agents_called: List[str] = []

    logs.append(f"Router received query: {user_text}")


    # Simple Query
    if "get customer information for id" in lowered:
        route = "simple_data_query"
        logs.append("Classified as SIMPLE DATA QUERY (single data agent).")

    # Coordinated Query
    elif "need help upgrading my account" in lowered or "upgrade my account" in lowered:
        route = "coordinated_upgrade"
        logs.append("Classified as COORDINATED QUERY (data + support).")

    # Complex Query
    elif "active customers" in lowered and "open tickets" in lowered:
        route = "complex_open_tickets"
        logs.append("Classified as COMPLEX QUERY (data + support, open tickets).")

    # Escalation
    elif "charged twice" in lowered or ("refund" in lowered and "immediately" in lowered):
        route = "escalation_billing"
        logs.append("Classified as ESCALATION (urgent billing + refund).")

    # Multi-Intent
    elif "update my email" in lowered and "ticket history" in lowered:
        route = "multi_intent_email_and_history"
        logs.append("Classified as MULTI-INTENT (update + history).")

    # Fallback
    else:
        route = "fallback_support"
        logs.append("Classified as FALLBACK SUPPORT (support only).")

    results: List[str] = []



    # 1) SIMPLE DATA QUERY: only data agent
    if route == "simple_data_query":
        agents_called.append("data")
        logs.append("Calling Data Agent for customer details (simple query).")
        data_res = await a2a_call(DATA_URL, "data_agent_skill", {"query": user_text})
        data_text = str(data_res.get("text", data_res))
        results.append(f"[DATA]\n{data_text}")

    # 2) COORDINATED QUERY: data -> support
    elif route == "coordinated_upgrade":
        agents_called.extend(["data", "support"])
        logs.append("Calling Data Agent for customer context (upgrade).")
        data_res = await a2a_call(DATA_URL, "data_agent_skill", {"query": user_text})
        data_text = str(data_res.get("text", data_res))

        logs.append("Calling Support Agent to craft upgrade guidance using data context.")
        combined_prompt = f"Data context: {data_text}\n\nUser request: {user_text}"
        support_res = await a2a_call(SUPPORT_URL, "support-general", {"query": combined_prompt})
        support_text = str(support_res.get("text", support_res))

        results.append(f"[DATA]\n{data_text}")
        results.append(f"[SUPPORT]\n{support_text}")

    # 3) COMPLEX QUERY: data -> support
    elif route == "complex_open_tickets":
        agents_called.extend(["data", "support"])
        logs.append("Calling Data Agent to list active customers.")
        data_res = await a2a_call(DATA_URL, "data_agent_skill", {"query": user_text})
        data_text = str(data_res.get("text", data_res))

        logs.append("Calling Support Agent to analyze open tickets for those customers.")
        combined_prompt = f"Data context: {data_text}\n\nUser request: {user_text}"
        support_res = await a2a_call(SUPPORT_URL, "support-general", {"query": combined_prompt})
        support_text = str(support_res.get("text", support_res))

        results.append(f"[DATA]\n{data_text}")
        results.append(f"[SUPPORT]\n{support_text}")

    # 4) ESCALATION: payment + support
    elif route == "escalation_billing":
        agents_called.extend(["payment", "support"])
        logs.append("Calling Payment Agent for refund / double-charge resolution.")
        pay_res = await a2a_call(PAYMENT_URL, "payment", {"query": user_text})
        pay_text = str(pay_res.get("text", pay_res))

        logs.append("Calling Support Agent to send empathetic explanation to user.")
        support_res = await a2a_call(SUPPORT_URL, "support-general", {"query": user_text})
        support_text = str(support_res.get("text", support_res))

        results.append(f"[PAYMENT]\n{pay_text}")
        results.append(f"[SUPPORT]\n{support_text}")


    elif route == "multi_intent_email_and_history":
        agents_called.extend(["data", "data", "support"])
        logs.append("Calling Data Agent to update email.")
        upd_res = await a2a_call(DATA_URL, "data_agent_skill", {"query": user_text})
        upd_text = str(upd_res.get("text", upd_res))

        logs.append("Calling Data Agent again to fetch ticket history.")
        hist_res = await a2a_call(DATA_URL, "data_agent_skill", {"query": "ticket history"})
        hist_text = str(hist_res.get("text", hist_res))

        logs.append("Calling Support Agent to summarize both actions for the user.")
        combined_prompt = (
            f"Data context: {upd_text}\n\nTicket history: {hist_text}\n\nUser request: {user_text}"
        )
        support_res = await a2a_call(SUPPORT_URL, "support-general", {"query": combined_prompt})
        support_text = str(support_res.get("text", support_res))

        results.append(f"[UPDATE]\n{upd_text}")
        results.append(f"[HISTORY]\n{hist_text}")
        results.append(f"[SUPPORT]\n{support_text}")


    else:
        agents_called.append("support")
        logs.append("Calling Support Agent as fallback.")
        sup_res = await a2a_call(SUPPORT_URL, "support-general", {"query": user_text})
        sup_text = str(sup_res.get("text", sup_res))
        results.append(f"[SUPPORT]\n{sup_text}")

    # ---- Step 3: summarize outputs ----
    summary = "Router summary:\n" + "\n\n".join(results)

    logs.append("Router finished aggregation and returned final summary.")

    return {
        "text": summary,
        "route": route,
        "agents_called": agents_called,
        "logs": logs,
    }


# ---------------------------------------------------------------------------
# Agent metadata & A2A app
# ---------------------------------------------------------------------------

router_skill = AgentSkill(
    id="router",
    name="Request Routing",
    description="Dispatches tasks to data, support, and payment agents.",
    tags=["router", "workflow", "a2a"],
    inputModes=["json"],
    outputModes=["json"],
    examples=[
        "Get history and provide a final response",
        "Handle a billing question",
        "General support request",
    ],
)

router_card = AgentCard(
    name="Router Agent",
    description="Routes user intents across A2A agents using a workflow.",
    url="http://localhost:8010",
    version="1.0.0",
    documentationUrl="https://example.com/docs/router",
    defaultInputModes=["json"],
    defaultOutputModes=["json"],
    capabilities=AgentCapabilities(streaming=False),
    provider=AgentProvider(
        organization="Assignment 5",
        url="http://localhost:8010",
    ),
    skills=[router_skill],
)

router_handler_wrapper = DefaultRequestHandler(
    agent_executor=router_handler,
    task_store=InMemoryTaskStore(),
)

router_app = A2AStarletteApplication(
    agent_card=router_card,
    http_handler=router_handler_wrapper,
).build()

def launch_router():
    import uvicorn
    uvicorn.run(router_app, host="0.0.0.0", port=8010)

router_proc = Process(target=launch_router, daemon=True)
router_proc.start()

print("üöÄ Router Agent with logging running at http://localhost:8010")



üöÄ Router Agent with logging running at http://localhost:8010


In [None]:
# ---------------------------------------------------------------------------
# Helper: Build a JSON-RPC message/send payload
# ---------------------------------------------------------------------------
def build_rpc_payload(prompt: str) -> dict:
    msg = create_text_message(prompt, role=Role.user)
    return {
        "jsonrpc": "2.0",
        "id": msg.messageId,
        "method": "message/send",
        "params": {"message": msg.model_dump()},
    }


# ---------------------------------------------------------------------------
# Helper: Extract final agent response from the Router Task object
# ---------------------------------------------------------------------------
def extract_final_answer(task_result: dict) -> str:
    history = task_result.get("history", [])
    agent_msgs = [m for m in history if m.get("role") == "agent"]

    if not agent_msgs:
        return "<no agent messages>"

    final_msg = agent_msgs[-1]
    parts = final_msg.get("parts") or []
    if not parts:
        return "<no text parts>"

    return parts[0].get("text") or "<empty>"


# ---------------------------------------------------------------------------
# One scenario execution
# ---------------------------------------------------------------------------
async def run_single_scenario(name: str, query: str):
    payload = build_rpc_payload(query)

    async with httpx.AsyncClient() as client:
        response = await client.post(ROUTER_RPC, json=payload, timeout=30.0)
        response.raise_for_status()
        result = response.json().get("result")

    print("=" * 100)
    print(f"SCENARIO : {name}")
    print(f"QUERY    : {query}")
    print("-" * 100)

    if result is None:
        print("No result returned from Router.")
    else:
        final_text = extract_final_answer(result)
        print("FINAL RESPONSE:")
        print(final_text)

    print("=" * 100)
    print()


# ---------------------------------------------------------------------------
# Full scenario suite
# ---------------------------------------------------------------------------
scenarios = [
    ("Simple: basic customer lookup", "Get customer information for ID 5"),
    ("Coordinated: data + support", "I'm customer 12345 and need help upgrading my account"),
    ("Complex: multi-constraint search", "Show me all active customers who have open tickets"),
    ("Escalation: urgent refund", "I've been charged twice, please refund immediately!"),
    ("Multi-intent: data update + history", "Update my email and show ticket history"),
]


async def run_all():
    for name, query in scenarios:
        await run_single_scenario(name, query)


await run_all()


SCENARIO : Simple: basic customer lookup
QUERY    : Get customer information for ID 5
----------------------------------------------------------------------------------------------------
FINAL RESPONSE:
Router summary:
Customer record: {'id': 1, 'name': 'Ana Customer', 'email': 'ana@example.com', 'status': 'active', 'created_at': '2025-12-02 23:43:39'}
Hi there ‚Äî I reviewed the latest notes on your account.
I‚Äôve read through the account history you mentioned.
Here‚Äôs what I recommend based on your request:
- Tell me any specific details you'd like us to verify or double-check.
- We can also set up a short follow-up if you need more help.
- If this is urgent, reply here and I‚Äôll jump on it immediately.
If you'd like me to take action now, just reply to this message and I‚Äôll handle it.

SCENARIO : Coordinated: data + support
QUERY    : I'm customer 12345 and need help upgrading my account
-------------------------------------------------------------------------------------------