In [1]:
import sys

from a2a.client import client as real_client_module
from a2a.client.card_resolver import A2ACardResolver


class PatchedClientModule:
    def __init__(self, real_module) -> None:
        for attr in dir(real_module):
            if not attr.startswith('_'):
                setattr(self, attr, getattr(real_module, attr))
        self.A2ACardResolver = A2ACardResolver


patched_module = PatchedClientModule(real_client_module)
sys.modules['a2a.client.client'] = patched_module  # type: ignore

In [2]:
import asyncio
import logging
import os
import sys
import threading
import time

from typing import Any
import uuid

import httpx
import nest_asyncio
import uvicorn

from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
    Message, Part, TextPart, Role
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from dotenv import load_dotenv
from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService

from google.adk.tools import google_search
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from google.adk.tools.mcp_tool.mcp_session_manager import SseServerParams

  from google.cloud.aiplatform.utils import gcs_utils


In [3]:
MODEL_NAME = "gpt-4o-mini"
MCP_SERVER_URL = "http://127.0.0.1:8001"

CUSTOMER_DATA_AGENT_URL = "http://127.0.0.1:10020"
SUPPORT_AGENT_URL = "http://127.0.0.1:10021"
ROUTER_AGENT_URL = "http://127.0.0.1:10022"


In [4]:
import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("customer_service_system")

In [5]:
# Import API key
from dotenv import load_dotenv
import os
load_dotenv()
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY')

# Defining Agents

1. Router Agent
2. Customer Data Agent
3. Support Agent

In [6]:

# Support Agent
def build_support_agent() -> Agent:
    mcp_toolset = MCPToolset(
        connection_params=SseServerParams(
            url=MCP_SERVER_URL,
            timeout=300.0,
            sse_read_timeout=300.0
        )
    )
    instruction = """
    You are the Support Agent. You handle customer support requests.
    
    Available MCP Tools:
    - create_ticket(customer_id, issue, priority): Create a support ticket
    - get_customer_history(customer_id): Get ticket history
    
    Responsibilities:
    - Analyze customer issues and determine priority (low/medium/high)
    - Create tickets for issues that need tracking
    - Provide helpful responses to customer queries
    - Escalate urgent issues (billing disputes, account access) as high priority
    
    When you receive customer context from another agent, use it to personalize your response.
    
    Priority Guidelines:
    - HIGH: Billing issues, account access problems, security concerns
    - MEDIUM: Feature questions, general complaints, update requests
    - LOW: General inquiries, feature requests
    
    return Agent(
        model=MODEL_NAME,
        name="router_agent",
        instruction=instruction,
        tools=[mcp_toolset]
    )
    """
    return Agent(
        model=MODEL_NAME,
        name="support_agent",
        instruction=instruction,
        tools=[mcp_toolset],
    )

# Customer data
def build_customer_data_agent() -> Agent:
    mcp_toolset = MCPToolset(
        connection_params=SseServerParams(
            url=MCP_SERVER_URL,
            timeout=300.0,
            sse_read_timeout=300.0
        )
    )
    instruction = """
    You are the Customer Data Agent. You have exclusive access to the customer database.
    
    Available MCP Tools:
    - get_customer(customer_id): Get a single customer by ID
    - list_customers(status, limit): List customers by status ('active' or 'disabled')
    - update_customer(customer_id, data): Update customer fields
    - create_ticket(customer_id, issue, priority): Create a support ticket
    - get_customer_history(customer_id): Get all tickets for a customer
    
    When asked for customer information:
    1. Use the appropriate MCP tool
    2. Return the data in a clear, structured format
    3. Include all relevant fields
    
    Always respond with factual data from the database. Never make up customer information.
    """
    return Agent(
        model=MODEL_NAME,
        name="customer_data_agent",
        instruction=instruction,
        tools=[mcp_toolset],
    )

# Router
def build_router_agent(remote_customer_data: RemoteA2aAgent, remote_support: RemoteA2aAgent) -> Agent:
    instruction = """
    You are an Orchestration Agent. Your job is to break down a user query into a sequence of steps for specialist agents.

    Available Agents:
    1. Customer Data Agent: Can 'retrieve', 'list', or 'update' customer data. Access to database.
    2. Support Agent: Can handle 'support' requests, create tickets, and assess priority.

    Rules:
    - If the user asks for help/support but hasn't provided data, you might need to get data first (e.g., list customers or get ID).
    - If the user asks for a complex multi-step operation (e.g., "for all customers"), break it down if possible, or send a list query first.
    - "pass_context": Set to true if the result of previous steps should be passed to this step.
    - IMPORTANT: Always preserve customer IDs in queries to the Support Agent. If a customer ID is mentioned in the original query, include it in the Support Agent query.

    Example Plans:

    Scenario: "Get info for customer 5"
    {
    "steps": [
        {
            "action": "call_data_agent", 
            "query": "Get info for customer 5", 
            "pass_context": false
        }
    ],
    "final_synthesis": "Present the data to the user."
    }

    Scenario: "I need help with my account, customer ID 5"
    {
    "steps": 
    [
        {
            "action": "call_data_agent", 
            "query": "Get info for customer 5", 
            "pass_context": false
        },
        {
            "action": "call_support_agent", 
            "query": "I need help with my account, customer ID 5", "pass_context": true
        }
    ],
    "final_synthesis": "Summarize the support action taken."
    }

    Scenario: "I want to cancel my subscription but I'm having billing issues, customer ID 1"
    {
    "steps": [
        {
            "action": "call_data_agent", 
            "query": "Get info for customer 1", 
            "pass_context": false
        },
        {
            "action": "call_support_agent", 
            "query": "I want to cancel my subscription and I'm having billing issues, 
            customer ID 1", "pass_context": tru
        }
    ],
    "final_synthesis": "Summarize the cancellation and billing support action taken."
    }
"""
    host = SequentialAgent(
        name="router_host_agent",
        sub_agents=[remote_customer_data, remote_support]
    )
    return host


# A2A Orchestration Loop

In [7]:
import json
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

In [8]:
# --- A2A server helpers ---
def create_agent_a2a_server(
    agent: Agent, agent_card: AgentCard
) -> A2AStarletteApplication:
    """
    Create an A2A server for any ADK agent.

    This matches the pattern from the lab notebook.
    """
    runner = Runner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=config)

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    return A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler,
    )

async def run_agent_server(agent: Any, agent_card: AgentCard, port: int) -> None:
    app = create_agent_a2a_server(agent, agent_card)

    config = uvicorn.Config(
        app.build(),
        host="127.0.0.1",
        port=port,
        log_level="warning",
        loop="none",
    )
    server = uvicorn.Server(config)
    await server.serve()
print("A2A server infrastructure defined.")


A2A server infrastructure defined.


In [9]:
# Run this cell to see the actual AgentCard fields
from a2a.types import AgentCard
import inspect

# Option 1: See the signature
print(inspect.signature(AgentCard))

# Option 2: See all fields
print("\nAgentCard fields:")
for field_name, field_info in AgentCard.model_fields.items():
    required = "REQUIRED" if field_info.is_required() else "optional"
    print(f"  {field_name}: {required}")

(*, additionalInterfaces: list[a2a.types.AgentInterface] | None = None, capabilities: a2a.types.AgentCapabilities, defaultInputModes: list[str], defaultOutputModes: list[str], description: str, documentationUrl: str | None = None, iconUrl: str | None = None, name: str, preferredTransport: str | None = 'JSONRPC', protocolVersion: str | None = '0.3.0', provider: a2a.types.AgentProvider | None = None, security: list[dict[str, list[str]]] | None = None, securitySchemes: dict[str, a2a.types.SecurityScheme] | None = None, signatures: list[a2a.types.AgentCardSignature] | None = None, skills: list[a2a.types.AgentSkill], supportsAuthenticatedExtendedCard: bool | None = None, url: str, version: str) -> None

AgentCard fields:
  additional_interfaces: optional
  capabilities: REQUIRED
  default_input_modes: REQUIRED
  default_output_modes: REQUIRED
  description: REQUIRED
  documentation_url: optional
  icon_url: optional
  name: REQUIRED
  preferred_transport: optional
  protocol_version: option

In [10]:
# Customer Data Agent Card
customer_data_agent_card = AgentCard(
    name="Customer Data Agent",
    url=CUSTOMER_DATA_AGENT_URL,
    description="Accesses customer and ticket data via MCP tools. The only agent with direct database access.",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    defaultInputModes=["text/plain"],
    defaultOutputModes=["application/json"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="customer_data_operations",
            name="Customer Data Operations",
            description="Retrieve, update, and manage customer records and ticket history",
            tags=["customer", "database", "mcp", "tickets"],
            examples=[
                "Get customer information for ID 5",
                "List all active customers",
                "Update customer email",
                "Show ticket history for customer 3",
            ],
        )
    ],
)

# Support Agent Card
support_agent_card = AgentCard(
    name="Support Agent",
    url=SUPPORT_AGENT_URL,
    description="Handles customer support conversations, assesses priorities, and creates tickets",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    defaultInputModes=["text/plain"],
    defaultOutputModes=["application/json"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="support_handling",
            name="Customer Support Handling",
            description="Handle support issues, assess priority, create tickets, provide resolutions",
            tags=["support", "billing", "escalation", "tickets"],
            examples=[
                "I need help with my account",
                "I was charged twice, please refund",
                "How do I upgrade my subscription?",
            ],
        )
    ],
)

# Router Agent Card
router_agent_card = AgentCard(
    name="Router Agent",
    url=ROUTER_AGENT_URL,
    description="Orchestrates customer queries by coordinating between Customer Data and Support agents",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    defaultInputModes=["text/plain"],
    defaultOutputModes=["application/json"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="query_orchestration",
            name="Customer Service Orchestration",
            description="Routes queries to appropriate agents and coordinates multi-agent responses",
            tags=["routing", "orchestration", "multi-agent"],
            examples=[
                "I'm customer 5 and need help upgrading my account",
                "I've been charged twice, please refund immediately!",
                "Update my email and show my ticket history",
            ],
        )
    ],
)

print("Agent Cards defined:")
print(f"  - {customer_data_agent_card.name}")
print(f"  - {support_agent_card.name}")
print(f"  - {router_agent_card.name}")

Agent Cards defined:
  - Customer Data Agent
  - Support Agent
  - Router Agent


In [11]:
# Build the actual ADK agents
customer_data_agent = build_customer_data_agent()
support_agent = build_support_agent()

print("Built specialist agents:")
print(f"  - {customer_data_agent.name}")
print(f"  - {support_agent.name}")

Built specialist agents:
  - customer_data_agent
  - support_agent


In [12]:
# Create remote agents
remote_customer_data_agent = RemoteA2aAgent(
    name="customer_data_remote",
    description="Remote A2A wrapper for Customer Data Agent - accesses database via MCP",
    agent_card=f"{CUSTOMER_DATA_AGENT_URL}{AGENT_CARD_WELL_KNOWN_PATH}",
)

remote_support_agent = RemoteA2aAgent(
    name="support_remote",
    description="Remote A2A wrapper for Support Agent - handles customer issues",
    agent_card=f"{SUPPORT_AGENT_URL}{AGENT_CARD_WELL_KNOWN_PATH}",
)

print("Created remote agent wrappers:")
print(f"  - {remote_customer_data_agent.name} -> {CUSTOMER_DATA_AGENT_URL}")
print(f"  - {remote_support_agent.name} -> {SUPPORT_AGENT_URL}")

Created remote agent wrappers:
  - customer_data_remote -> http://127.0.0.1:10020
  - support_remote -> http://127.0.0.1:10021


  remote_customer_data_agent = RemoteA2aAgent(
  remote_support_agent = RemoteA2aAgent(


In [13]:
# Router uses the remote wrappers to coordinate
router_agent = build_router_agent(
    remote_customer_data=remote_customer_data_agent,
    remote_support=remote_support_agent,
)

print(f"Built router agent: {router_agent.name}")
print(f"  Sub-agents: {[a.name for a in router_agent.sub_agents]}")

Built router agent: router_host_agent
  Sub-agents: ['customer_data_remote', 'support_remote']


In [14]:
# Start A2A Server
async def start_all_servers() -> None:
    tasks = [
        asyncio.create_task(
            run_agent_server(customer_data_agent, customer_data_agent_card, 10020)
        ),
        asyncio.create_task(
            run_agent_server(support_agent, support_agent_card, 10021)
        ),
        asyncio.create_task(
            run_agent_server(router_agent, router_agent_card, 10022)
        ),
    ]
    
    # Wait a moment for servers to start
    await asyncio.sleep(2.0)
    
    logger.info("A2A servers started:")
    logger.info(f"  Customer Data Agent: {CUSTOMER_DATA_AGENT_URL}")
    logger.info(f"  Support Agent:       {SUPPORT_AGENT_URL}")
    logger.info(f"  Router Agent:        {ROUTER_AGENT_URL}")
    
    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        logger.info("Server tasks cancelled")


def run_servers_in_background() -> None:
    """
    Run servers in a background thread (for notebook compatibility).
    """
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(start_all_servers())
    except Exception as e:
        print(f"Server error: {e}")

print("Server startup functions defined.")

Server startup functions defined.


In [15]:
from a2a.types import Message, Part, TextPart, Role

In [None]:
# A2A CLIENT - For sending queries to agents
import uuid
from typing import Dict, List

class A2AClient:
    """
    Simple client for calling A2A servers.
    Used to send test queries to the Router Agent.
    """
    
    def __init__(self, default_timeout: float = 240.0):
        self._agent_card_cache: Dict[str, Dict[str, Any]] = {}
        self.default_timeout = default_timeout
    
    async def send_query(self, agent_url: str, message: str) -> str:
        """
        Send a query to an A2A agent and get the response.
        
        Args:
            agent_url: Base URL of the agent (e.g., http://127.0.0.1:10022)
            message: The query to send
            
        Returns:
            The agent's response text
        """
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0,
        )
        
        async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
            # Get agent card (cached)
            if agent_url not in self._agent_card_cache:
                card_response = await httpx_client.get(
                    f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}"
                )
                self._agent_card_cache[agent_url] = card_response.json()
            
            agent_card_data = self._agent_card_cache[agent_url]
            agent_card = AgentCard(**agent_card_data)
            
            # Create A2A client
            config = ClientConfig(
                httpx_client=httpx_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
                use_client_preference=True,
            )
            
            factory = ClientFactory(config)
            client = factory.create(agent_card)
            
            # Send message with unique ID (required by A2A protocol)
            #message_obj = create_text_message_object(content=message) 
            #message_obj.messageId = str(uuid.uuid4())
            message_obj = Message(
                role=Role.user, 
                parts=[Part(TextPart(text=message))],
                messageId=str(uuid.uuid4()))
            
            responses: List[Any] = []
            async for response in client.send_message(message_obj):
                responses.append(response)
            
            # Extract text from response
            if responses and isinstance(responses[0], tuple) and len(responses[0]) > 0:
                task = responses[0][0]
                try:
                    return task.artifacts[0].parts[0].root.text
                except (AttributeError, IndexError):
                    return str(task)
            
            return "No response received"

print("A2A Client defined.")

A2A Client defined.


In [17]:
# TEST SCENARIOS

async def run_test_scenarios() -> None:
    """
    Run all 5 test scenarios required by the assignment.
    """
    client = A2AClient()
    
    # =========================================================================
    # Scenario 1: Simple Query
    # =========================================================================
    print("\n" + "="*70)
    print("SCENARIO 1: Simple Query")
    print("Query: 'Get customer information for ID 5'")
    print("Expected: Single agent, straightforward MCP call")
    print("="*70)
    
    response1 = await client.send_query(
        ROUTER_AGENT_URL,
        "Get customer information for ID 5"
    )
    print(f"\nRESPONSE:\n{response1}")
    
    # =========================================================================
    # Scenario 2: Coordinated Query
    # =========================================================================
    print("\n" + "="*70)
    print("SCENARIO 2: Coordinated Query")
    print("Query: 'I'm customer 5 and need help upgrading my account'")
    print("="*70)
    
    response2 = await client.send_query(
        ROUTER_AGENT_URL,
        "I'm customer 5 and need help upgrading my account to premium tier."
    )
    print(f"\nRESPONSE:\n{response2}")
    
    # =========================================================================
    # Scenario 3: Complex Query
    # =========================================================================
    print("\n" + "="*70)
    print("SCENARIO 3: Complex Query")
    print("Query: 'Show me all active customers who have open tickets'")
    print("="*70)
    
    response3 = await client.send_query(
        ROUTER_AGENT_URL,
        "Show me all active customers who have open tickets. Summarize them grouped by customer with ticket priorities."
    )
    print(f"\nRESPONSE:\n{response3}")
    
    # =========================================================================
    # Scenario 4: Escalation
    # =========================================================================
    print("\n" + "="*70)
    print("SCENARIO 4: Escalation")
    print("Query: 'I've been charged twice, please refund immediately!'")
    print("="*70)
    
    response4 = await client.send_query(
        ROUTER_AGENT_URL,
        "I'm customer 1. I've been charged twice, please refund immediately! I am very upset."
    )
    print(f"\nRESPONSE:\n{response4}")
    
    # =========================================================================
    # Scenario 5: Multi-Intent
    # =========================================================================
    print("\n" + "="*70)
    print("SCENARIO 5: Multi-Intent")
    print("Query: 'Update my email to new@email.com and show my ticket history'")
    print("="*70)
    
    response5 = await client.send_query(
        ROUTER_AGENT_URL,
        "I'm customer 2. Update my email to new@email.com and then show my ticket history."
    )
    print(f"\nRESPONSE:\n{response5}")
    
    print("\n" + "="*70)
    print("ALL TEST SCENARIOS COMPLETED")
    print("="*70)

print("Test scenarios function defined.")

Test scenarios function defined.


In [18]:
import threading
import time

print("="*70)
print("MULTI-AGENT CUSTOMER SERVICE SYSTEM")
print("="*70)
    
print("\n[1/3] Starting A2A servers in background...")
server_thread = threading.Thread(
    target=run_servers_in_background,
    daemon=True,
)
server_thread.start()
    
# Give servers time to start
print("[2/3] Waiting for servers to initialize...")
time.sleep(5.0)
    
print("[3/3] Running test scenarios...\n")
    
# Run test scenarios
await run_test_scenarios()

  config = A2aAgentExecutorConfig()
  executor = A2aAgentExecutor(runner=runner, config=config)


MULTI-AGENT CUSTOMER SERVICE SYSTEM

[1/3] Starting A2A servers in background...
[2/3] Waiting for servers to initialize...


2025-12-03 16:51:00,776 - INFO - A2A servers started:
2025-12-03 16:51:00,778 - INFO -   Customer Data Agent: http://127.0.0.1:10020
2025-12-03 16:51:00,781 - INFO -   Support Agent:       http://127.0.0.1:10021
2025-12-03 16:51:00,782 - INFO -   Router Agent:        http://127.0.0.1:10022
2025-12-03 16:51:03,850 - INFO - HTTP Request: GET http://127.0.0.1:10022/.well-known/agent-card.json "HTTP/1.1 200 OK"
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 242, in _handle_requests
    a2a_request = A2ARequest.model_validate(body)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/site-packages/pydantic/main.py", line 568, in model_validate
    return cls.__pydantic_validator__.validate_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 21 validation errors for A2ARequest
SendMessageRequest.method
  Input should

[3/3] Running test scenarios...


SCENARIO 1: Simple Query
Query: 'Get customer information for ID 5'
Expected: Single agent, straightforward MCP call


A2AClientHTTPError: HTTP Error 400: Invalid SSE response or protocol error: Expected response header Content-Type to contain 'text/event-stream', got 'application/json'