## Overview
The goal for this repo is to build a multi-agent customer service system where specialized agents coordinate using Agent-to-Agent (A2A) communication and access customer data through the Model Context Protocol (MCP).  
HERE IS THE GITHUB REPO for this project:

https://github.com/LatifaTam/Multi-Agent-Customer-Service-System-with-A2A-and-MCP-Overview

### 0. Conclusion
Throughout the implementation of this multi-agent system, several interesting challenges came up. One of the biggest observations was how strict ADK is when handling tools. Any mismatch in required parameters or slight formatting issues immediately causes tool failures, producing noisy errors that can be hard to diagnose. It reinforced how important it is to give the LLM extremely explicit tool instructions so it does not make up parameters or misname anything.

The other recurring issue was the NoneType error when reading task artifacts. Sometimes ADK produces incomplete artifact structures during the first execution, which caused the "NoneType is not subscriptable" exception. Re-running often resolved it, so the underlying cause appears to be inconsistent artifact generation rather than an application bug. This project also revealed how easily uvicorn processes remain alive in the background, requiring manual cleanup to avoid "address already in use" errors. Despite these hurdles, the final system works reliably once ports and tool instructions are carefully managed.

### 1. Database Setup
We start by running database_setup.py included in the github repo to get the support.db, which will be the source database for this project.

In [None]:
!python database_setup.py

Connected to database: support.db
Tables created successfully!
Triggers created successfully!

DATABASE SCHEMA

CUSTOMERS TABLE:
------------------------------------------------------------
  id              INTEGER     
  name            TEXT       NOT NULL 
  email           TEXT        
  phone           TEXT        
  status          TEXT       NOT NULL DEFAULT 'active'
  created_at      TIMESTAMP   DEFAULT CURRENT_TIMESTAMP
  updated_at      TIMESTAMP   DEFAULT CURRENT_TIMESTAMP

TICKETS TABLE:
------------------------------------------------------------
  id              INTEGER     
  customer_id     INTEGER    NOT NULL 
  issue           TEXT       NOT NULL 
  status          TEXT       NOT NULL DEFAULT 'open'
  priority        TEXT       NOT NULL DEFAULT 'medium'
  created_at      DATETIME    DEFAULT CURRENT_TIMESTAMP

FOREIGN KEYS:
------------------------------------------------------------
  tickets.customer_id -> customers.id

Would you like to insert sample data? (y/n): y

#### 1.1 Inspect dataset

In [None]:
!ls

database_setup.py  __pycache__	sample_data  support.db


In [None]:
import sqlite3

conn = sqlite3.connect('support.db')
cursor = conn.cursor()

cursor.execute("SELECT * FROM CUSTOMERS LIMIT 1;")
rows = cursor.fetchall()
rows

[(1,
  'John Doe',
  'john.doe@example.com',
  '+1-555-0101',
  'active',
  '2025-12-09 04:18:09',
  '2025-12-09 04:18:09')]

### 2. Build MCP Server
We will use flask with SQLite connection and the following tool endpoints:
- get_customer(customer_id) - uses customers.id
- list_customers(status, limit) - uses customers.status
- update_customer(customer_id, data) - uses customers fields
- create_ticket(customer_id, issue, priority) - uses tickets fields
- get_customer_history(customer_id) - uses tickets.customer_id

In [None]:
# Install required packages
!pip install flask flask-cors requests termcolor pyngrok google-adk google-genai a2a-sdk python-dotenv aiohttp uvicorn nest-asyncio

Collecting flask-cors
  Downloading flask_cors-6.0.1-py3-none-any.whl.metadata (5.3 kB)
Collecting pyngrok
  Downloading pyngrok-7.5.0-py3-none-any.whl.metadata (8.1 kB)
Collecting a2a-sdk
  Downloading a2a_sdk-0.3.20-py3-none-any.whl.metadata (7.9 kB)
Downloading flask_cors-6.0.1-py3-none-any.whl (13 kB)
Downloading pyngrok-7.5.0-py3-none-any.whl (24 kB)
Downloading a2a_sdk-0.3.20-py3-none-any.whl (141 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m141.5/141.5 kB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyngrok, flask-cors, a2a-sdk
Successfully installed a2a-sdk-0.3.20 flask-cors-6.0.1 pyngrok-7.5.0


In [None]:
import sqlite3
import json
from datetime import datetime
from typing import Optional, Dict, List, Any

def get_db_connection():
    """Create a database connection with row factory for dict-like access."""
    conn = sqlite3.connect('support.db')
    conn.row_factory = sqlite3.Row  # This allows us to access columns by name
    return conn

def row_to_dict(row: sqlite3.Row) -> Dict[str, Any]:
    """Convert a SQLite row to a dictionary."""
    return {key: row[key] for key in row.keys()}

# ==================== READ OPERATIONS ====================
# -----------------------------
# 1. get_customer(customer_id)
# -----------------------------

def get_customer(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve a specific customer by ID.

    Args:
        customer_id: The unique ID of the customer

    Returns:
        Dict containing customer data or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()

        if row:
            return {
                'success': True,
                'customer': row_to_dict(row)
            }
        else:
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

# -----------------------------
# 2. list_customers(status, limit)
# -----------------------------
def list_customers(status: Optional[str] = None) -> Dict[str, Any]:
    """
    List all customers, optionally filtered by status.

    Args:
        status: Optional filter - 'active', 'disabled', or None for all

    Returns:
        Dict containing list of customers or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        if status:
            if status not in ['active', 'disabled']:
                return {
                    'success': False,
                    'error': 'Status must be "active" or "disabled"'
                }
            cursor.execute('SELECT * FROM customers WHERE status = ? ORDER BY name', (status,))
        else:
            cursor.execute('SELECT * FROM customers ORDER BY name')

        rows = cursor.fetchall()
        conn.close()

        customers = [row_to_dict(row) for row in rows]

        return {
            'success': True,
            'count': len(customers),
            'customers': customers
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }


# -----------------------------
# 5. get_customer_history(customer_id)
# -----------------------------
def get_customer_history(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve ticket history for a customer.

    Args:
        customer_id: The unique ID of the customer

    Returns:
        Dict containing ticket history or error message
    """
    try:
        # Check if customer exists
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT 1 FROM customers WHERE id = ?', (customer_id,))
        if not cursor.fetchone():
            conn.close()
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }

        cursor.execute(
            'SELECT * FROM tickets WHERE customer_id = ? ORDER BY id DESC',
            (customer_id,)
        )
        rows = cursor.fetchall()
        col_names = [desc[0] for desc in cursor.description]
        conn.close()

        return {
            'success': True,
            'history': [dict(zip(col_names, r)) for r in rows]
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }


# ==================== UPDATE OPERATIONS ====================
# -----------------------------
# 3. update_customer(customer_id, data)
# -----------------------------
def update_customer(customer_id: int, name: Optional[str] = None,
                   email: Optional[str] = None, phone: Optional[str] = None) -> Dict[str, Any]:
    """
    Update customer information.

    Args:
        customer_id: The unique ID of the customer to update
        name: New name (optional)
        email: New email (optional)
        phone: New phone (optional)

    Returns:
        Dict containing updated customer data or error message
    """
    try:
        # Check if customer exists
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        if not cursor.fetchone():
            conn.close()
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }

        # Build update query dynamically based on provided fields
        updates = []
        params = []

        if name is not None:
            updates.append('name = ?')
            params.append(name.strip())
        if email is not None:
            updates.append('email = ?')
            params.append(email)
        if phone is not None:
            updates.append('phone = ?')
            params.append(phone)

        if not updates:
            conn.close()
            return {
                'success': False,
                'error': 'No fields to update'
            }

        # Always update the updated_at timestamp
        updates.append('updated_at = CURRENT_TIMESTAMP')
        params.append(customer_id)

        update_clause = ', '.join(updates)
        query = f'UPDATE customers SET {update_clause} WHERE id = ?'
        cursor.execute(query, params)
        conn.commit()

        # Fetch updated customer
        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()

        return {
            'success': True,
            'message': f'Customer {customer_id} updated successfully',
            'customer': row_to_dict(row)
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

# -----------------------------
# 4. create_ticket(customer_id, issue, priority)
# -----------------------------
def create_ticket(customer_id: int, issue: str, priority: str = "medium") -> Dict[str, Any]:
    """
    Create a new ticket for a customer.

    Args:
        customer_id: The unique ID of the customer
        issue: Description of the issue
        priority: Ticket priority (default: "medium")

    Returns:
        Dict containing ticket data or error message
    """
    try:
        # Validate inputs and customer existence
        if not issue or not issue.strip():
            return {
                'success': False,
                'error': 'Issue description is required'
            }

        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT 1 FROM customers WHERE id = ?', (customer_id,))
        if not cursor.fetchone():
            conn.close()
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }

        cursor.execute(
            "INSERT INTO tickets (customer_id, issue, status, priority) VALUES (?, ?, 'open', ?)",
            (customer_id, issue.strip(), priority)
        )
        ticket_id = cursor.lastrowid
        conn.commit()

        cursor.execute('SELECT * FROM tickets WHERE id = ?', (ticket_id,))
        ticket_row = cursor.fetchone()
        conn.close()

        return {
            'success': True,
            'message': f'Ticket {ticket_id} created successfully',
            'ticket': row_to_dict(ticket_row)
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

# Quick test
print("\nüß™ Quick test - Fetching customer ID 1:")
result = get_customer(1)
if result['success']:
    customer = result['customer']
    print(f"   Name: {customer['name']}")
    print(f"   Email: {customer['email']}")
    print(f"   Status: {customer['status']}")


üß™ Quick test - Fetching customer ID 1:
   Name: John Doe
   Email: john.doe@example.com
   Status: active


#### 2.1 MCP HTTP Streaming Server Implementation
We'll build an MCP server that:
1. Implements the MCP protocol specification
2. Uses Server-Sent Events (SSE) for streaming responses
3. Exposes our customer management functions as MCP tools
4. Runs in a background thread so Colab remains responsive

### MCP Protocol Overview:

The MCP protocol uses JSON-RPC 2.0 messages over HTTP with SSE. Key message types:
- **initialize**: Handshake to establish connection and capabilities
- **tools/list**: Request list of available tools
- **tools/call**: Execute a specific tool

In [None]:
from flask import Flask, request, Response, jsonify
from flask_cors import CORS
import json
import threading
import time
from typing import Dict, Any, Generator

# Create Flask app
app = Flask(__name__)
CORS(app)  # Enable CORS for cross-origin requests

# Server state
server_thread = None
server_running = False

# MCP Protocol Implementation

# Define the tools that will be exposed via MCP
MCP_TOOLS = [
    {
        "name": "get_customer",
        "description": "Retrieve a specific customer by their ID. Returns customer details including name, email, phone, and status.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to retrieve"
                }
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "list_customers",
        "description": "List all customers in the database. Can optionally filter by status (active or disabled).",
        "inputSchema": {
            "type": "object",
            "properties": {
                "status": {
                    "type": "string",
                    "enum": ["active", "disabled"],
                    "description": "Optional filter by customer status"
                }
            }
        }
    },
    {
        "name": "update_customer",
        "description": "Update an existing customer's information. Provide the customer ID and the fields to update.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to update"
                },
                "name": {
                    "type": "string",
                    "description": "New name (optional)"
                },
                "email": {
                    "type": "string",
                    "description": "New email (optional)"
                },
                "phone": {
                    "type": "string",
                    "description": "New phone (optional)"
                }
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "create_ticket",
        "description": "Create a ticket based on customer's issue. Provide the customer ID, issue, and priority of issue.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to update"
                },
                "issue": {
                    "type": "string",
                    "description": "issue the customer has"
                },
                "priority": {
                    "type": "string",
                    "description": "the level of priority for the issue (optional)"
                }
            },
            "required": ["customer_id","issue"]
        }
    },
    {
        "name": "get_customer_history",
        "description": "Get all tickets history for customers filtering by the customer ID provided.",
        "inputSchema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "integer",
                    "description": "The unique ID of the customer to activate"
                }
            },
            "required": ["customer_id"]
        }
    }
]

def create_sse_message(data: Dict[str, Any]) -> str:
    """
    Format a message for Server-Sent Events (SSE).
    SSE format: 'data: {json}\n\n'
    """
    return f"data: {json.dumps(data)}\n\n"

def handle_initialize(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle MCP initialize request.
    This is the first message in the MCP protocol handshake.
    """
    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "result": {
            "protocolVersion": "2024-11-05",
            "capabilities": {
                "tools": {},  # We support tools
            },
            "serverInfo": {
                "name": "customer-management-server",
                "version": "1.0.0"
            }
        }
    }

def handle_tools_list(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle tools/list request.
    Returns the list of available tools.
    """
    return {
        "jsonrpc": "2.0",
        "id": message.get("id"),
        "result": {
            "tools": MCP_TOOLS
        }
    }

def handle_tools_call(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Handle tools/call request.
    Executes the requested tool and returns the result.
    """
    params = message.get("params", {})
    tool_name = params.get("name")
    arguments = params.get("arguments", {})

    # Map tool names to functions
    tool_functions = {
        "get_customer": get_customer,
        "list_customers": list_customers,
        "update_customer": update_customer,
        "create_ticket": create_ticket,
        "get_customer_history": get_customer_history,
    }

    if tool_name not in tool_functions:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32601,
                "message": f"Tool not found: {tool_name}"
            }
        }

    try:
        # Call the tool function with the provided arguments
        result = tool_functions[tool_name](**arguments)

        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "result": {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps(result, indent=2)
                    }
                ]
            }
        }
    except Exception as e:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32603,
                "message": f"Tool execution error: {str(e)}"
            }
        }

def process_mcp_message(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process an MCP message and route it to the appropriate handler.
    """
    method = message.get("method")

    if method == "initialize":
        return handle_initialize(message)
    elif method == "tools/list":
        return handle_tools_list(message)
    elif method == "tools/call":
        return handle_tools_call(message)
    else:
        return {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {
                "code": -32601,
                "message": f"Method not found: {method}"
            }
        }

# Flask Routes

@app.route('/mcp', methods=['POST'])
def mcp_endpoint():
    """
    Main MCP endpoint for MCP communication.
    Receives MCP messages and streams responses using Server-Sent Events.
    """
    # Get the MCP message from the request BEFORE entering the generator
    # This must be done in the request context
    message = request.get_json()

    def generate():
        try:
            print(f"üì• Received MCP message: {message.get('method')}")

            # Process the message
            response = process_mcp_message(message)

            print(f"üì§ Sending MCP response")

            # Send the response as SSE
            yield create_sse_message(response)

        except Exception as e:
            error_response = {
                "jsonrpc": "2.0",
                "id": None,
                "error": {
                    "code": -32700,
                    "message": f"Parse error: {str(e)}"
                }
            }
            yield create_sse_message(error_response)

    return Response(generate(), mimetype='text/event-stream')

@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint to verify server is running."""
    return jsonify({
        "status": "healthy",
        "server": "customer-management-mcp-server",
        "version": "1.0.0"
    })

print("‚úÖ MCP Server implementation complete!")
print("\nüîß Server features:")
print("   - MCP protocol support (2024-11-05)")
print("   - Server-Sent Events (SSE) streaming")
print(f"   - {len(MCP_TOOLS)} tools exposed")
print("   - Health check endpoint")
print("   - CORS enabled for cross-origin requests")

‚úÖ MCP Server implementation complete!

üîß Server features:
   - MCP protocol support (2024-11-05)
   - Server-Sent Events (SSE) streaming
   - 5 tools exposed
   - Health check endpoint
   - CORS enabled for cross-origin requests


#### 2.2 Localhost -> Public Link
We use ngrok here to configurate our local server to public link, so that we can test the link beyond the Jupyter Notebook backend environment.

In [None]:
!ngrok config add-authtoken 364uyk8s2zoAVPRhD17PtUoks91_7QGvH6ijDSpMCYWctPVZd

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
import threading
import time
import requests
from termcolor import colored
from pyngrok import ngrok
from google.colab import userdata

# Server configuration
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 5000
SERVER_URL = f'http://{SERVER_HOST}:{SERVER_PORT}'

def run_server():
    """Run the Flask server in a separate thread."""
    global server_running
    server_running = True
    app.run(host=SERVER_HOST, port=SERVER_PORT, debug=False, use_reloader=False)

def start_server(use_ngrok=True):
    """Start the MCP server in a background thread."""
    global server_thread, server_running

    if server_thread and server_thread.is_alive():
        print(colored("‚ö†Ô∏è  Server is already running!", "yellow"))
        return

    print(colored("üöÄ Starting MCP server...", "cyan"))

    # Start server in background thread
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()

    # Wait for server to start
    time.sleep(2)

    # Check if server is healthy
    try:
        response = requests.get(f'{SERVER_URL}/health', timeout=5)
        if response.status_code == 200:
            print(colored("‚úÖ MCP Server is running!", "green"))
            print(colored(f"üìç Local URL: {SERVER_URL}", "cyan"))

            # Set up ngrok tunnel if requested
            if use_ngrok:
                print(colored("\nüåê Setting up public tunnel with ngrok...", "cyan"))
                try:
                    # Get ngrok authtoken from Colab secrets
                    try:
                        authtoken = userdata.get('NGROK_AUTHTOKEN')
                        ngrok.set_auth_token(authtoken)
                        print(colored("‚úÖ Ngrok authenticated", "green"))
                    except Exception as e:
                        print(colored("‚ö†Ô∏è  NGROK_AUTHTOKEN not found in Colab secrets", "yellow"))
                        print(colored("   To use ngrok:", "yellow"))
                        print(colored("   1. Get free authtoken from https://ngrok.com", "yellow"))
                        print(colored("   2. In Colab: Click üîë (Secrets) in left sidebar", "yellow"))
                        print(colored("   3. Add secret: Name='NGROK_AUTHTOKEN', Value=<your-token>", "yellow"))
                        print(colored("   4. Enable 'Notebook access' for the secret", "yellow"))
                        print(colored("   5. Re-run this cell", "yellow"))
                        print(colored("\n   Server is still accessible locally at " + SERVER_URL, "cyan"))
                        return

                    # Create ngrok tunnel
                    public_url = ngrok.connect(SERVER_PORT)
                    print(colored(f"‚úÖ Public URL: {public_url}", "green", attrs=["bold"]))
                    print(colored(f"üìç MCP Endpoint: {public_url}/mcp", "green", attrs=["bold"]))
                    print(colored(f"üìç Health Check: {public_url}/health", "cyan"))
                    print()
                    print(colored("üîç MCP Inspector Instructions:", "yellow", attrs=["bold"]))
                    print(colored("1. Run in terminal: npx @modelcontextprotocol/inspector", "yellow"))
                    print(colored("2. This will open MCP Inspector in your browser", "yellow"))
                    print(colored(f"3. Enter MCP URL: {public_url}/mcp", "yellow"))
                    print(colored("4. Click 'Connect' and test the customer management tools!", "yellow"))
                except Exception as e:
                    if "NGROK_AUTHTOKEN" not in str(e):
                        print(colored(f"‚ö†Ô∏è  Could not set up ngrok tunnel: {e}", "yellow"))
                        print(colored("   Server is still accessible locally", "yellow"))
        else:
            print(colored("‚ùå Server started but health check failed", "red"))
    except Exception as e:
        print(colored(f"‚ùå Failed to connect to server: {e}", "red"))

def stop_server():
    """Stop the MCP server."""
    global server_running
    server_running = False
    print(colored("üõë Server stopped", "yellow"))
    print(colored("   Note: In Colab, the thread will continue until the runtime is reset", "yellow"))

def check_server_status():
    """Check if the server is running."""
    try:
        response = requests.get(f'{SERVER_URL}/health', timeout=2)
        if response.status_code == 200:
            print(colored("‚úÖ Server is running and healthy", "green"))
            health_data = response.json()
            print(f"   Status: {health_data['status']}")
            print(f"   Server: {health_data['server']}")
            print(f"   Version: {health_data['version']}")
            return True
        else:
            print(colored("‚ùå Server is not responding correctly", "red"))
            return False
    except Exception as e:
        print(colored("‚ùå Server is not running", "red"))
        print(f"   Error: {e}")
        return False

# Start the server
start_server()

üöÄ Starting MCP server...
 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:44:35] "GET /health HTTP/1.1" 200 -


‚úÖ MCP Server is running!
üìç Local URL: http://127.0.0.1:5000

üåê Setting up public tunnel with ngrok...
‚úÖ Ngrok authenticated
‚úÖ Public URL: NgrokTunnel: "https://unpathetic-antwan-malvasian.ngrok-free.dev" -> "http://localhost:5000"
üìç MCP Endpoint: NgrokTunnel: "https://unpathetic-antwan-malvasian.ngrok-free.dev" -> "http://localhost:5000"/mcp
üìç Health Check: NgrokTunnel: "https://unpathetic-antwan-malvasian.ngrok-free.dev" -> "http://localhost:5000"/health

üîç MCP Inspector Instructions:
1. Run in terminal: npx @modelcontextprotocol/inspector
2. This will open MCP Inspector in your browser
3. Enter MCP URL: NgrokTunnel: "https://unpathetic-antwan-malvasian.ngrok-free.dev" -> "http://localhost:5000"/mcp
4. Click 'Connect' and test the customer management tools!


#### 2.3 Test MCP
Since the request return what we expected, we can tell our MCP is well set on the air now.

##### 2.3.1 Initialize Connection

In [None]:
import requests
import json
from termcolor import colored

def send_mcp_message(method: str, params: dict = None, message_id: int = 1):
    """
    Send an MCP message to the server and display the request/response.
    """
    # Construct MCP message
    message = {
        "jsonrpc": "2.0",
        "id": message_id,
        "method": method
    }

    if params:
        message["params"] = params

    print(colored(f"\nüì§ Sending MCP Request:", "cyan", attrs=["bold"]))
    print(colored(json.dumps(message, indent=2), "cyan"))

    try:
        # Send request to MCP endpoint
        response = requests.post(
            f'{SERVER_URL}/mcp',
            json=message,
            headers={'Content-Type': 'application/json'},
            stream=True,
            timeout=10
        )

        # Parse SSE response
        for line in response.iter_lines():
            if line:
                line_str = line.decode('utf-8')
                if line_str.startswith('data: '):
                    data = json.loads(line_str[6:])  # Remove 'data: ' prefix

                    print(colored(f"\nüì• Received MCP Response:", "green", attrs=["bold"]))
                    print(colored(json.dumps(data, indent=2), "green"))

                    return data

    except Exception as e:
        print(colored(f"\n‚ùå Error: {e}", "red"))
        return None

# Test 1: Initialize
print(colored("="*60, "magenta"))
print(colored("TEST 1: MCP INITIALIZATION", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

init_response = send_mcp_message(
    method="initialize",
    params={
        "protocolVersion": "2024-11-05",
        "capabilities": {},
        "clientInfo": {
            "name": "colab-test-client",
            "version": "1.0.0"
        }
    },
    message_id=1
)

if init_response and 'result' in init_response:
    print(colored("\n‚úÖ Initialization successful!", "green", attrs=["bold"]))
    print(f"   Protocol Version: {init_response['result']['protocolVersion']}")
    print(f"   Server: {init_response['result']['serverInfo']['name']}")
else:
    print(colored("\n‚ùå Initialization failed", "red", attrs=["bold"]))

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:44:43] "POST /mcp HTTP/1.1" 200 -


TEST 1: MCP INITIALIZATION

üì§ Sending MCP Request:
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {},
    "clientInfo": {
      "name": "colab-test-client",
      "version": "1.0.0"
    }
  }
}
üì• Received MCP message: initialize
üì§ Sending MCP response

üì• Received MCP Response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "tools": {}
    },
    "serverInfo": {
      "name": "customer-management-server",
      "version": "1.0.0"
    }
  }
}

‚úÖ Initialization successful!
   Protocol Version: 2024-11-05
   Server: customer-management-server


##### 2.3.2 List Available Tools

In [None]:
print(colored("="*60, "magenta"))
print(colored("TEST 2: LIST AVAILABLE TOOLS", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

tools_response = send_mcp_message(
    method="tools/list",
    message_id=2
)

if tools_response and 'result' in tools_response:
    tools = tools_response['result']['tools']
    print(colored(f"\n‚úÖ Found {len(tools)} tools:", "green", attrs=["bold"]))
    for i, tool in enumerate(tools, 1):
        print(colored(f"\n{i}. {tool['name']}", "yellow", attrs=["bold"]))
        print(f"   {tool['description']}")
else:
    print(colored("\n‚ùå Failed to list tools", "red", attrs=["bold"]))

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:44:48] "POST /mcp HTTP/1.1" 200 -


TEST 2: LIST AVAILABLE TOOLS

üì§ Sending MCP Request:
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/list"
}
üì• Received MCP message: tools/list
üì§ Sending MCP response

üì• Received MCP Response:
{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "tools": [
      {
        "name": "get_customer",
        "description": "Retrieve a specific customer by their ID. Returns customer details including name, email, phone, and status.",
        "inputSchema": {
          "type": "object",
          "properties": {
            "customer_id": {
              "type": "integer",
              "description": "The unique ID of the customer to retrieve"
            }
          },
          "required": [
            "customer_id"
          ]
        }
      },
      {
        "name": "list_customers",
        "description": "List all customers in the database. Can optionally filter by status (active or disabled).",
        "inputSchema": {
          "type": "object",
          "properties

##### 2.3.3 Call Tool - Get Specific Customer

In [None]:
print(colored("="*60, "magenta"))
print(colored("TEST 4: GET CUSTOMER BY ID", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

get_response = send_mcp_message(
    method="tools/call",
    params={
        "name": "get_customer",
        "arguments": {
            "customer_id": 1
        }
    },
    message_id=4
)

if get_response and 'result' in get_response:
    content = get_response['result']['content'][0]['text']
    data = json.loads(content)

    if data['success']:
        customer = data['customer']
        print(colored("\n‚úÖ Customer found:", "green", attrs=["bold"]))
        print(f"   ID: {customer['id']}")
        print(f"   Name: {customer['name']}")
        print(f"   Email: {customer['email']}")
        print(f"   Phone: {customer['phone']}")
        print(f"   Status: {customer['status']}")
        print(f"   Created: {customer['created_at']}")
    else:
        print(colored(f"\n‚ùå Error: {data['error']}", "red"))
else:
    print(colored("\n‚ùå Tool call failed", "red", attrs=["bold"]))

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:44:52] "POST /mcp HTTP/1.1" 200 -


TEST 4: GET CUSTOMER BY ID

üì§ Sending MCP Request:
{
  "jsonrpc": "2.0",
  "id": 4,
  "method": "tools/call",
  "params": {
    "name": "get_customer",
    "arguments": {
      "customer_id": 1
    }
  }
}
üì• Received MCP message: tools/call
üì§ Sending MCP response

üì• Received MCP Response:
{
  "jsonrpc": "2.0",
  "id": 4,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\n  \"success\": true,\n  \"customer\": {\n    \"id\": 1,\n    \"name\": \"John Doe\",\n    \"email\": \"john.doe@example.com\",\n    \"phone\": \"+1-555-0101\",\n    \"status\": \"active\",\n    \"created_at\": \"2025-12-09 04:18:09\",\n    \"updated_at\": \"2025-12-09 04:18:09\"\n  }\n}"
      }
    ]
  }
}

‚úÖ Customer found:
   ID: 1
   Name: John Doe
   Email: john.doe@example.com
   Phone: +1-555-0101
   Status: active
   Created: 2025-12-09 04:18:09


### 3. Creat A2A coordinator framework
#### 3.1 Installation and Setup
Beyond pip install, we also need to setup new project in Google AI Lab for API key with the following link:

https://aistudio.google.com/app/api-keys

We can then set our public url and google api key as environment variable for easy access.

In [None]:
# Install required packages
%pip install --upgrade -q google-genai google-adk==1.9.0 a2a-sdk==0.3.0 python-dotenv aiohttp uvicorn requests mermaid-python nest-asyncio

[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m47.8/47.8 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.8/1.8 MB[0m [31m44.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m130.3/130.3 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m262.4/262.4 kB[0m [31m20.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m64.7/64.7 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[2

In [None]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

##### 3.1.1 Environment Configuration

In [None]:
# Targeted workaround for google-adk==1.9.0 compatibility with a2a-sdk==0.3.0
# This cell shall be removed when google-adk releases the version next to >1.9.0
# (after https://github.com/google/adk-python/pull/2297)

import sys

from a2a.client import client as real_client_module
from a2a.client.card_resolver import A2ACardResolver

class PatchedClientModule:
    def __init__(self, real_module) -> None:
        for attr in dir(real_module):
            if not attr.startswith('_'):
                setattr(self, attr, getattr(real_module, attr))
        self.A2ACardResolver = A2ACardResolver


patched_module = PatchedClientModule(real_client_module)
sys.modules['a2a.client.client'] = patched_module  # type: ignore

In [None]:
!pip install -U "google-adk" "google-genai" "a2a-sdk"

Collecting google-adk
  Downloading google_adk-1.20.0-py3-none-any.whl.metadata (14 kB)
Collecting a2a-sdk
  Using cached a2a_sdk-0.3.20-py3-none-any.whl.metadata (7.9 kB)
Downloading google_adk-1.20.0-py3-none-any.whl (2.3 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m2.3/2.3 MB[0m [31m31.2 MB/s[0m eta [36m0:00:00[0m
[?25hUsing cached a2a_sdk-0.3.20-py3-none-any.whl (141 kB)
Installing collected packages: a2a-sdk, google-adk
  Attempting uninstall: a2a-sdk
    Found existing installation: a2a-sdk 0.3.0
    Uninstalling a2a-sdk-0.3.0:
      Successfully uninstalled a2a-sdk-0.3.0
  Attempting uninstall: google-adk
    Found existing installation: google-adk 1.9.0
    Uninstalling google-adk-1.9.0:
      Successfully uninstalled google-adk-1.9.0
Successfully installed a2a-sdk-0.3.20 google-adk-1.20.0


In [None]:
import asyncio
import logging
import os
import sys
import threading
import time

from typing import Any

import httpx
import nest_asyncio
import uvicorn

from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from dotenv import load_dotenv
from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search

In [None]:
# Set Google Cloud Configuration
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'FLASE'
os.environ['GOOGLE_CLOUD_PROJECT'] = (
    'gen-lang-client-0178360277'  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
)
os.environ['GOOGLE_CLOUD_LOCATION'] = (
    'us-central1'  # Replace with your location
)

load_dotenv()
from google.colab import userdata

os.environ['GOOGLE_API_KEY'] = userdata.get('GOOGLE_API_KEY')



print('Environment variables configured:')
print(f'GOOGLE_GENAI_USE_VERTEXAI: {os.environ["GOOGLE_GENAI_USE_VERTEXAI"]}')
print(f'GOOGLE_CLOUD_PROJECT: {os.environ["GOOGLE_CLOUD_PROJECT"]}')
print(f'GOOGLE_CLOUD_LOCATION: {os.environ["GOOGLE_CLOUD_LOCATION"]}')

Environment variables configured:
GOOGLE_GENAI_USE_VERTEXAI: FLASE
GOOGLE_CLOUD_PROJECT: gen-lang-client-0178360277
GOOGLE_CLOUD_LOCATION: us-central1


In [None]:
# Authenticate your notebook environment (Colab only)
if 'google.colab' in sys.modules:
    from google.colab import auth

    auth.authenticate_user(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

In [None]:
# Setup logging
logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
)

#### 3.2 Creat Tools
We put previous funtionality we build with the database as function for our agents' tool box.

In [None]:
from typing import Any, Dict, Optional, List
import json
import requests

MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", f"{SERVER_URL}/mcp").rstrip("/")

def call_mcp_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
    message = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "tools/call",
        "params": {"name": name, "arguments": arguments},
    }
    try:
        resp = requests.post(
            MCP_ENDPOINT,
            json=message,
            headers={"Content-Type": "application/json"},
            stream=True,
            timeout=30,
        )
        resp.raise_for_status()
        for line in resp.iter_lines():
            if not line:
                continue
            if line.startswith(b"data: "):
                data = json.loads(line.decode("utf-8")[6:])
                result = data.get("result", {})
                content = result.get("content", [])
                if content and "text" in content[0]:
                    text_payload = content[0]["text"]
                    try:
                        return json.loads(text_payload)
                    except Exception:
                        return {"raw": text_payload}
                return result or data
        return {"success": False, "error": "No SSE data received from MCP"}
    except Exception as e:
        return {"success": False, "error": f"MCP call failed: {e}"}

# Renamed A2A tool wrappers (avoid clobbering MCP server functions)
def mcp_get_customer(tool_context, customer_id: int):
    return call_mcp_tool("get_customer", {"customer_id": customer_id})

def mcp_list_customers(tool_context, status: Optional[str] = None, limit: int = 10):
    args = {"status": status} if status else {}
    return call_mcp_tool("list_customers", args)

def mcp_update_customer(tool_context, customer_id: int, name=None, email=None, phone=None):
    payload = {"customer_id": customer_id}
    if name: payload["name"] = name
    if email: payload["email"] = email
    if phone: payload["phone"] = phone
    return call_mcp_tool("update_customer", payload)

def mcp_create_ticket(tool_context, customer_id: int, issue: str, priority: str = "medium"):
    return call_mcp_tool(
        "create_ticket",
        {"customer_id": customer_id, "issue": issue, "priority": priority},
    )

def mcp_get_customer_history(tool_context, customer_id: int):
    return call_mcp_tool("get_customer_history", {"customer_id": customer_id})


#### 3.3 Agent 1: Customer Data Agent

# Customer Data Agent


In [None]:
customer_data_agent = Agent(
    model="gemini-2.5-pro",
    name="customer_data_agent",
    instruction="""
You are the Customer Data Agent. You ONLY interact with the MCP customer database tools.
Use the provided tools for all reads/updates: get_customer, list_customers, update_customer, create_ticket, get_customer_history.
Return clear, factual results; do not fabricate data. If an ID is missing or not found, say so and stop.
""",
    tools=[mcp_get_customer, mcp_list_customers, mcp_update_customer, mcp_create_ticket, mcp_get_customer_history],
)


In [None]:
customer_data_agent_card = AgentCard(
    name="Customer Data Agent",
    url="http://localhost:10020",
    description="Specialist agent that manages customer records and ticket history via MCP.",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="customer_data_agent",
            name="Customer Data",
            description="Provides and manipulates customer records via MCP",
            tags=["data", "customer id", "customer history", "status", "issue", "ticket"],
            examples=[
                "Retrieve ticket history",
                "Create tickets for customers",
                "Update customer records",
                "List customers by their status",
                "Look up individual customers by ID",
            ],
        )
    ],
)

In [None]:
remote_customer_data_agent = RemoteA2aAgent(
    name="customer_data_agent",
    description="Provides and manipulates customer record via MCP",
    agent_card=f"http://localhost:10020{AGENT_CARD_WELL_KNOWN_PATH}",
)

#### 3.3 Agent 2: Support Agent

In [None]:
customer_support_agent = Agent(
    model="gemini-2.5-pro",
    name="customer_support_agent",
    instruction="""
You are the Customer Support Agent in a customer service system.

You:
- Help with general customer support queries like account issues, upgrades, cancellations, and billing questions.
- Escalate complex issues.
- Request customer context or data updates from the Data Agent when required.

Behavior:
- If a customer ID is not found/offered, clearly say so and ask for more info (email, name, etc.).
- For urgent billing issues (charged twice, refund, immediately, urgent), create a high priority ticket.
- Summarize what you did with tools in a friendly, clear, helpful response.
""",
    # tools=[google_search],  # enable if you need search
)


In [None]:
customer_support_agent_card = AgentCard(
    name="Customer Support Agent",
    url="http://localhost:10021",
    description="Handles general customer support, status update, billing, and escalation/priority assessment.",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="customer_support",
            name="Customer Support",
            description="Handles general customer support, status update, billing, and escalation/priority assessment.",
            tags=["support", "customer service", "solution", "recommendations", "billing"],
            examples=[
                "Get high-priority tickets for these IDs",
                "Can you handle this cancellation and billing problem?",
                "I've been charged twice, please refund immediately!",
            ],
        )
    ],
)

In [None]:
remote_customer_support_agent = RemoteA2aAgent(
    name="customer_support",
    description="Handles general customer support, status update, billing, and escalation/priority assessment.",
    agent_card=f"http://localhost:10021{AGENT_CARD_WELL_KNOWN_PATH}",
)

#### 3.3 Agent 3: Router Agent (Orchestrator)

In [None]:
host_agent = Agent(
    name="customer_host_agent",
    model="gemini-2.5-pro",
    instruction="""
You are the Router / Host Agent for a customer service system.

YOUR JOB:
- Break down complex queries into sub-tasks.
- Decide which sub-agent should handle each step.
- ALWAYS call the correct sub-agent via A2A.
- NEVER answer anything directly and don't make up data.

RULES:
- Do NOT guess data.
- Do NOT answer directly without A2A calls.
- ALWAYS call at least one A2A sub-agent per request.
- Make multiple A2A calls when needed.
- If customer ID doesn't exist, say you don't find it or re-confirm with customer.

Your output must always come from combining sub-agent tool results.
""",
    tools=[],
    sub_agents=[remote_customer_data_agent, remote_customer_support_agent],
)

In [None]:
host_agent_card = AgentCard(
    name="Customer Service Host",
    url="http://localhost:10022",
    description="Orchestrates customer data and support agents for full customer service flows. When a query asks for data, present the relevant data.",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"],
    default_output_modes=["application/json"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="customer_service",
            name="Customer Service Orchestrator",
            description="Routes customer queries to specialized agents and returns combined results.",
            tags=["customer support", "routing", "orchestration", "multi-agent"],
            examples=[
                "Get customer information for ID 5",
                "I'm customer 1 and need help upgrading my account",
                "Show me all active customers who have open tickets",
                "I've been charged twice, please refund immediately!",
            ],
        )
    ],
)

### 4. Running
We will start our agents and run the complete system.
#### 4.1 Starting the A2A Servers

In [None]:
def create_agent_a2a_server(agent, agent_card):
    """Create an A2A server for any ADK agent.

    Args:
        agent: The ADK agent instance
        agent_card: The ADK agent card

    Returns:
        A2AStarletteApplication instance
    """
    runner = Runner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=config)

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    # Create A2A application
    return A2AStarletteApplication(
        agent_card=agent_card, http_handler=request_handler
    )

In [None]:
nest_asyncio.apply()
server_tasks: list[asyncio.Task] = []

async def run_agent_server(agent, agent_card, port) -> None:
    app = create_agent_a2a_server(agent, agent_card)
    config = uvicorn.Config(
        app.build(),
        host="127.0.0.1",
        port=port,
        log_level="warning",
        loop="none",
    )
    server = uvicorn.Server(config)
    await server.serve()

async def start_all_servers() -> None:
    tasks = [
        asyncio.create_task(run_agent_server(customer_data_agent, customer_data_agent_card, 10020)),
        asyncio.create_task(run_agent_server(customer_support_agent, customer_support_agent_card, 10021)),
        asyncio.create_task(run_agent_server(host_agent, host_agent_card, 10022)),
    ]
    await asyncio.sleep(2)
    print("All agent servers started.")
    print(" - Customer Data Agent: http://127.0.0.1:10020")
    print(" - Support Agent:      http://127.0.0.1:10021")
    print(" - Host Agent:         http://127.0.0.1:10022")
    try:
        await asyncio.gather(*tasks)
    except KeyboardInterrupt:
        print("Shutting down servers...")

def run_servers_in_background() -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(start_all_servers())

server_thread = threading.Thread(target=run_servers_in_background, daemon=True)
server_thread.start()
time.sleep(3)


All agent servers started.
 - Customer Data Agent: http://127.0.0.1:10020
 - Support Agent:      http://127.0.0.1:10021
 - Host Agent:         http://127.0.0.1:10022


### 5. Testing the System
 Call the A2A agents (the 2 remote agents, and the host agent that refers to the 2 remote agents as sub agents)

In [None]:
class A2ASimpleClient:
    def __init__(self, default_timeout: float = 240.0):
        self._agent_info_cache: dict[str, dict[str, Any] | None] = {}
        self.default_timeout = default_timeout

    async def create_task(self, agent_url: str, message: str) -> str:
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0,
        )
        async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
            if agent_url in self._agent_info_cache and self._agent_info_cache[agent_url] is not None:
                agent_card_data = self._agent_info_cache[agent_url]
            else:
                agent_card_response = await httpx_client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}")
                agent_card_data = self._agent_info_cache[agent_url] = agent_card_response.json()

            agent_card = AgentCard(**agent_card_data)
            config = ClientConfig(
                httpx_client=httpx_client,
                supported_transports=[TransportProtocol.jsonrpc, TransportProtocol.http_json],
                use_client_preference=True,
            )
            factory = ClientFactory(config)
            client = factory.create(agent_card)
            message_obj = create_text_message_object(content=message)

            responses = []
            async for response in client.send_message(message_obj):
                responses.append(response)

            if responses and isinstance(responses[0], tuple) and len(responses[0]) > 0:
                task = responses[0][0]
                try:
                    return task.artifacts[0].parts[0].root.text
                except (AttributeError, IndexError):
                    return str(task)
            return "No response received"


#### 5.1 MCP tools testing

In [None]:
call_mcp_tool("get_customer", {"customer_id": 1})


INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:46:21] "POST /mcp HTTP/1.1" 200 -


üì• Received MCP message: tools/call
üì§ Sending MCP response


{'success': True,
 'customer': {'id': 1,
  'name': 'John Doe',
  'email': 'john.doe@example.com',
  'phone': '+1-555-0101',
  'status': 'active',
  'created_at': '2025-12-09 04:18:09',
  'updated_at': '2025-12-09 04:18:09'}}

#### 5.2 Host Agent testing

In [None]:
a2a_client = A2ASimpleClient()

# Host Agent end-to-end test: should route to data agent, which calls MCP
async def test_host() -> None:
    result = await a2a_client.create_task(
        "http://localhost:10022",
        "Get customer information for ID 1",
    )
    print(result)

asyncio.run(test_host())

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:46:32] "POST /mcp HTTP/1.1" 200 -


üì• Received MCP message: tools/call
üì§ Sending MCP response
OK.
- Name: John Doe
- Email: john.doe@example.com
- Phone: +1-555-0101
- Status: active
- Created: 2025-12-09 04:18:09



#### 5.3 Data Agent testing

In [None]:
# Customer Data Retrive
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10020', "Get customer information for ID 1"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 06:57:26] "POST /mcp HTTP/1.1" 200 -


üì• Received MCP message: tools/call
üì§ Sending MCP response
OK. Customer ID 1 is John Doe, email is john.doe@example.com, phone is +1-555-0101, status is active.



#### 5.4 Customer Support testing

In [None]:
# Customer Support
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10021', "i want to refund my product"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

Of course, I can help you with that.

Could you please provide me with your email address, full name, or order number so I can look up your purchase?


#### 5.0 In Case we need to update Agent
we will need to kill old servers to make sure the new version of tools/ agents are loaded.

In [None]:
#!pkill -f uvicorn

In [None]:
#!pkill -f "127.0.0.1:100"

In [None]:
# reset runtime state
#import gc, asyncio
#gc.collect()
#asyncio.get_event_loop().stop()

### 6. Testing Scenario


#### 6.1 Simple Query: "Get customer information for ID 5"
Single agent, straightforward MCP call

In [None]:
# Host Agent
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10022', "Get customer information for ID 5"
    )
    print(customer_service)

# Run the async function
asyncio.run(test_customer_service())

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:00:22] "POST /mcp HTTP/1.1" 200 -


üì• Received MCP message: tools/call
üì§ Sending MCP response
OK. I have the customer information for ID 5.
Charlie Brown, charlie.brown@email.com, +1-555-0105. Status: active.



#### 6.2 Coordinated Query: "I'm customer 12345 and need help upgrading my account"
Multiple agents coordinate: data fetch + support response

In [None]:
# Host Agent
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10022',
        "I'm customer 12345 and need help upgrading my account"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

Hello! I see you're looking to upgrade your account (customer ID 12345), and I can definitely help with that.

Could you please let me know which plan you'd like to upgrade to? We currently offer a **Pro** and a **Business** plan.

Once you let me know your choice, I can process the upgrade for you.


#### 6.3 Complex Query: "Show me all active customers who have open tickets"
Requires negotiation between data and support agents

In [45]:
# Host Agent
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10022',
        "Show me all active customers who have open tickets"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:02:58] "POST /mcp HTTP/1.1" 200 -
üì• Received MCP message: tools/call
üì§ Sending MCP response
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:03] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:03] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:03] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:03] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:03] "POST /mcp HTTP/1.1" 200 -
üì• Received MCP message: tools/call
üì§ Sending MCP response
üì• Received MCP message: tools/call
üì§ Sending MCP response
üì• Received MCP message: tools/call
üì§ Sending MCP response
üì• Received MCP message: tools/call
üì§ Sending MCP response
üì• Received MCP message: tools/call
üì§ Sending MCP response
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:13] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:03:13] "POST /mcp HTTP/1.1

#### 6.4 Escalation: "I've been charged twice, please refund immediately!"
Router must identify urgency and route appropriately

In [46]:
# Host Agent
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10022', "I've been charged twice, please refund immediately!"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

Oh no, I'm so sorry to hear you've been charged twice. That's definitely not right, and I understand how urgent this is. I can absolutely help you get this sorted out.

To create a high-priority ticket for our billing team to process your refund immediately, I'll need a bit more information.

Could you please provide me with your customer ID or the email address associated with your account?


#### 6.5 Multi-Intent: "Update my email to new@email.com and show my ticket history"
Parallel task execution and coordination

In [47]:
# Host Agent
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10022', "Update my email to new@email.com and show my ticket history"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

Hello! I can certainly help with updating your email and getting your ticket history.

First, I'll need to find your account. Could you please provide your customer ID, or the full name and email address currently on file? Once I've located your account, I'll ask our Data Agent to make that update for you.


#### 6.5.1 If we provide the ID as well . . .

In [51]:
# Host Agent
async def test_customer_service() -> None:
    """Test customer service agent."""
    customer_service = await a2a_client.create_task(
        'http://localhost:10022', "My ID is 1. Update my email to new@email.com and show my ticket history"
    )
    print(customer_service)


# Run the async function
asyncio.run(test_customer_service())

INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:28:20] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [09/Dec/2025 07:28:20] "POST /mcp HTTP/1.1" 200 -


üì• Received MCP message: tools/call
üì§ Sending MCP response
üì• Received MCP message: tools/call
üì§ Sending MCP response
I have updated your email to new@email.com. Here is your ticket history:

- Ticket 56: (Medium Priority) Password reset not working - In Progress
- Ticket 51: (High Priority) Cannot login to account - Open
- Ticket 31: (Medium Priority) Password reset not working - In Progress
- Ticket 26: (High Priority) Cannot login to account - Open
- Ticket 6: (Medium Priority) Password reset not working - In Progress
- Ticket 1: (High Priority) Cannot login to account - Open

