# Multi-Agent Customer Support System - End-to-End Demonstration

This notebook demonstrates the A2A (Agent-to-Agent) coordination capabilities of the multi-agent customer support system built with Google ADK and MCP Protocol.

## System Architecture Overview

- **Router Agent (Supervisor)**: Makes routing decisions using LLM
- **Customer Data Agent**: Handles customer information operations
- **Support Agent**: Manages support tickets
- **SQL Generator Agent**: Handles complex SQL queries
- **A2A Orchestrator**: Coordinates multi-agent execution
- **FastMCP Server**: Exposes tools via MCP protocol


## Setup and Configuration


In [None]:
import asyncio
import json
import sys
from pathlib import Path
from datetime import datetime

# Add project root to path
project_root = Path.cwd()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Import orchestrator
from a2a.orchestrator import A2AOrchestrator, process
from a2a.utils import MCP_HTTP_BASE_URL

print("Imports successful")
print(f"MCP Server URL: {MCP_HTTP_BASE_URL}")


## Helper Functions for Display


In [None]:
def print_section(title, char="="):
    """Print a formatted section header."""
    print(f"\n{char * 80}")
    print(f"{title}")
    print(f"{char * 80}\n")

def print_query(query, scenario_num=None):
    """Print user query in a formatted way."""
    if scenario_num:
        print(f"\n{'=' * 80}")
        print(f"SCENARIO {scenario_num}")
        print(f"{'=' * 80}")
    print(f"\nüë§ User Query: {query}")
    print(f"‚è∞ Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("\n" + "-" * 80)

def print_response(response, execution_time=None):
    """Print agent response in a formatted way."""
    print("\nü§ñ Agent Response:")
    print("-" * 80)
    print(response)
    if execution_time:
        print(f"\n‚è±Ô∏è  Execution Time: {execution_time:.2f} seconds")
    print("\n" + "=" * 80)


## Handoff Callback for A2A Visualization


In [None]:
# Track handoffs for display
handoff_log = []

def handoff_callback(event_type: str, data: dict):
    """Callback to track agent handoffs and routing decisions."""
    timestamp = datetime.now().strftime('%H:%M:%S')
    
    if event_type == 'routing':
        decision = data.get('decision', {})
        next_agent = decision.get('next_agent')
        if next_agent:
            handoff_log.append({
                'time': timestamp,
                'event': 'routing',
                'agent': next_agent,
                'reason': decision.get('reason', 'N/A')
            })
            print(f"\n  üîÄ [{timestamp}] Routing to: {next_agent.replace('_', ' ').title()} Agent")
            print(f"     Reason: {decision.get('reason', 'N/A')}")
    
    elif event_type == 'handoff':
        from_agent = data.get('from_agent')
        to_agent = data.get('to_agent')
        reason = data.get('reason', '')
        
        if from_agent and to_agent:
            handoff_log.append({
                'time': timestamp,
                'event': 'handoff',
                'from': from_agent,
                'to': to_agent,
                'reason': reason
            })
            print(f"\n  üîÑ [{timestamp}] Handoff: {from_agent.replace('_', ' ').title()} ‚Üí {to_agent.replace('_', ' ').title()} Agent")
            if reason:
                print(f"     Reason: {reason}")
        else:
            to_display = to_agent.replace('_', ' ').title() if to_agent else 'Unknown'
            print(f"\n  ‚û°Ô∏è  [{timestamp}] Executing: {to_display} Agent")
    
    elif event_type == 'agent_complete':
        agent = data.get('agent', '')
        handoff_log.append({
            'time': timestamp,
            'event': 'complete',
            'agent': agent
        })
        agent_display = agent.replace('_', ' ').title()
        print(f"\n  ‚úÖ [{timestamp}] {agent_display} Agent completed")
    
    elif event_type == 'completion':
        results = data.get('results', [])
        if len(results) > 1:
            agent_list = [r['agent'].replace('_', ' ').title() for r in results]
            print(f"\n  ‚ú® [{timestamp}] Completed with {len(results)} agents: {', '.join(agent_list)}")

def print_handoff_summary():
    """Print summary of agent handoffs."""
    if handoff_log:
        print("\nüìä A2A Coordination Summary:")
        print("-" * 80)
        for i, log in enumerate(handoff_log, 1):
            if log['event'] == 'routing':
                print(f"{i}. [{log['time']}] üîÄ Routed to: {log['agent'].replace('_', ' ').title()} - {log['reason']}")
            elif log['event'] == 'handoff':
                print(f"{i}. [{log['time']}] üîÑ Handoff: {log['from'].replace('_', ' ').title()} ‚Üí {log['to'].replace('_', ' ').title()}")
            elif log['event'] == 'complete':
                print(f"{i}. [{log['time']}] ‚úÖ {log['agent'].replace('_', ' ').title()} completed")
        handoff_log.clear()  # Clear for next scenario


In [None]:
# Create orchestrator with handoff callback
orchestrator = A2AOrchestrator(
    user_id="demo_user",
    session_id="demo_session",
    handoff_callback=handoff_callback
)

print("‚úÖ Orchestrator initialized")
print(f"   User ID: {orchestrator.user_id}")
print(f"   Session ID: {orchestrator.session_id}")


In [None]:
print_query("Get customer information for ID 5", scenario_num=1)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "Get customer information for ID 5",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 2: Task Allocation - Customer Support Request

**Query**: "I need help with my account, customer ID 1"

**Expected A2A Flow**:
1. Router Agent receives query
2. Router Agent ‚Üí Customer Data Agent: "Get customer info for ID 1"
3. Customer Data Agent fetches via MCP
4. Customer Data Agent ‚Üí Router Agent: Returns customer data
5. Router Agent analyzes customer tier/status
6. Router Agent ‚Üí Support Agent: "Handle support for this customer"
7. Support Agent generates response
8. Router Agent returns final response


In [None]:
print_query("I need help with my account, customer ID 1", scenario_num=2)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "I need help with my account, customer ID 1",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 3: Multi-Step Coordination

**Query**: "Get customer 1 and create a ticket for them with issue 'Cannot login'"

**Expected A2A Flow**:
1. Router Agent receives query
2. Router Agent ‚Üí Customer Data Agent: "Get customer 1 info first"
3. Customer Data Agent ‚Üí Router Agent: Returns customer data
4. Router Agent ‚Üí Support Agent: "Now create ticket for this customer"
5. Support Agent creates ticket via MCP
6. Support Agent ‚Üí Router Agent: Returns ticket creation result
7. Router Agent evaluates: Query complete
8. Router Agent returns combined response


In [None]:
print_query("Get customer 1 and create a ticket for them with issue 'Cannot login'", scenario_num=3)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "Get customer 1 and create a ticket for them with issue 'Cannot login'",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 4: Negotiation/Escalation - Billing Issues

**Query**: "I'm customer 2, I've been charged twice, please help me"

**Expected A2A Flow**:
1. Router Agent detects urgency (billing issue)
2. Router Agent ‚Üí Customer Data Agent: "Get customer 2 info"
3. Customer Data Agent ‚Üí Router Agent: Returns customer data
4. Router Agent ‚Üí Support Agent: "Handle urgent billing issue"
5. Support Agent creates high-priority ticket
6. Support Agent ‚Üí Router Agent: Returns ticket with high priority
7. Router Agent returns escalated response


In [None]:
print_query("I'm customer 2, I've been charged twice, please help me", scenario_num=4)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "I'm customer 2, I've been charged twice, please help me",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 5: Complex Query - Multiple Agents Required

**Query**: "Show me all active customers who have open tickets"

**Expected A2A Flow**:
1. Router Agent decomposes query
2. Router Agent ‚Üí Customer Data Agent: "Get all active customers"
3. Customer Data Agent ‚Üí Router Agent: Returns customer list
4. Router Agent ‚Üí Support Agent: "Get open tickets for these customers"
5. Support Agent queries tickets via MCP
6. Agents coordinate to format report
7. Router Agent synthesizes final answer


In [None]:
print_query("Show me all active customers who have open tickets", scenario_num=5)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "Show me all active customers who have open tickets",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 6: Multi-Intent Query - Parallel Operations

**Query**: "Get customer 3 and show me their ticket history"

**Expected A2A Flow**:
1. Router Agent receives multi-intent query
2. Router Agent ‚Üí Customer Data Agent: "Get customer 3"
3. Customer Data Agent ‚Üí Router Agent: Returns customer data
4. Router Agent ‚Üí Support Agent: "Get ticket history for customer 3"
5. Support Agent retrieves tickets via MCP
6. Support Agent ‚Üí Router Agent: Returns ticket history
7. Router Agent combines results
8. Router Agent returns comprehensive response


In [None]:
print_query("Get customer 3 and show me their ticket history", scenario_num=6)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "Get customer 3 and show me their ticket history",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 7: SQL Generator Agent - Complex Analytics

**Query**: "Show me customers who created accounts last month"

**Expected A2A Flow**:
1. Router Agent identifies complex query requiring SQL
2. Router Agent ‚Üí SQL Generator Agent: "Generate SQL for date filtering"
3. SQL Generator Agent generates SQL query
4. SQL Generator Agent ‚Üí MCP: Executes SQL via fallback_sql tool
5. SQL Generator Agent ‚Üí Router Agent: Returns query results
6. Router Agent evaluates: Query complete
7. Router Agent returns formatted results


In [None]:
print_query("Show me customers who created accounts last month", scenario_num=7)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "Show me customers who created accounts last month",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Scenario 8: Coordinated Query - Account Upgrade Request

**Query**: "I'm customer 1 and need help upgrading my account"

**Expected A2A Flow**:
1. Router Agent receives query with customer ID
2. Router Agent ‚Üí Customer Data Agent: "Get customer 1 info"
3. Customer Data Agent ‚Üí Router Agent: Returns customer data
4. Router Agent ‚Üí Support Agent: "Handle upgrade request for this customer"
5. Support Agent creates ticket for upgrade request
6. Support Agent ‚Üí Router Agent: Returns ticket confirmation
7. Router Agent synthesizes response with customer context


In [None]:
print_query("I'm customer 1 and need help upgrading my account", scenario_num=8)

# Execute query
start_time = datetime.now()

try:
    response = asyncio.run(
        orchestrator.process_query(
            "I'm customer 1 and need help upgrading my account",
            show_usage=False,
            silent=True
        )
    )
    
    execution_time = (datetime.now() - start_time).total_seconds()
    print_response(response, execution_time)
    print_handoff_summary()
    
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    print("üí° Make sure MCP server is running: python customer_mcp/server/mcp_server.py")


---

## Summary: A2A Coordination Patterns Demonstrated

### Pattern 1: Sequential Task Execution
- **Scenario 3**: Get customer ‚Üí Create ticket
- Agents execute in sequence based on dependencies
- Previous agent results passed as context to next agent

### Pattern 2: Multi-Agent Coordination
- **Scenario 5**: Multiple agents collaborate on complex query
- **Scenario 6**: Customer data + Ticket history coordination
- Agents share context and combine results

### Pattern 3: Specialized Agent Routing
- **Scenario 7**: SQL Generator Agent for complex queries
- Router intelligently selects specialized agent
- SQL agent handles queries beyond standard tool capabilities

### Pattern 4: Escalation and Priority Handling
- **Scenario 4**: Urgent billing issue automatically escalated
- Support Agent assigns high priority based on issue type
- Router coordinates urgency detection and routing

### Key Observations:

1. **Router Agent (Supervisor)** makes all routing decisions
2. **Agents communicate via Router**, not directly with each other
3. **Context Preservation**: Previous agent results passed to next agent
4. **Dynamic Routing**: Router adapts based on query complexity
5. **Tool Abstraction**: All agents use MCP tools, not direct function calls
6. **Session Management**: Each agent maintains separate session via Google ADK


## Technical Implementation Notes

### How A2A Coordination Works

1. **Orchestrator** (`a2a/orchestrator.py`):
   - Calls Router Agent (Supervisor) for routing decisions
   - Executes selected agents using Google ADK's `Runner`
   - Collects results and passes to Router for evaluation
   - Continues until Router says `done: true`

2. **Router Agent** (`a2a/agent/router_agent.py`):
   - Uses LLM (GPT-4o-mini) to analyze queries
   - Returns JSON decisions: `{"next_agent": "...", "done": false, "reason": "..."}`
   - Evaluates query completeness after each agent execution

3. **Specialized Agents**:
   - Each agent connects to MCP server via `McpToolset`
   - Tools discovered dynamically via MCP protocol
   - Execute tools via HTTP calls to FastMCP server

4. **MCP Server** (`customer_mcp/server/mcp_server.py`):
   - Exposes tools via FastMCP framework
   - Handles JSON-RPC 2.0 requests
   - Routes tool calls to Python functions

### Communication Flow

```
User Query
    ‚Üì
Orchestrator.process_query()
    ‚Üì
Router Agent (Supervisor) - LLM analyzes query
    ‚Üì
Router ‚Üí Orchestrator: {"next_agent": "customer_data", "done": false}
    ‚Üì
Orchestrator ‚Üí Customer Data Agent
    ‚Üì
Customer Data Agent ‚Üí Google ADK Runner
    ‚Üì
Runner ‚Üí MCP Client ‚Üí HTTP POST ‚Üí FastMCP Server
    ‚Üì
FastMCP ‚Üí Tool Function (get_customer)
    ‚Üì
Tool ‚Üí SQLite Database
    ‚Üì
Response flows back: Database ‚Üí Tool ‚Üí FastMCP ‚Üí MCP Client ‚Üí Agent ‚Üí Orchestrator
    ‚Üì
Orchestrator ‚Üí Router Agent: "Evaluate results"
    ‚Üì
Router ‚Üí Orchestrator: {"next_agent": null, "done": true}
    ‚Üì
Orchestrator ‚Üí User: Final response
```


## Prerequisites for Running This Notebook

1. **Start MCP Server** (in a separate terminal):
   ```bash
   python customer_mcp/server/mcp_server.py
   ```

2. **Environment Variables**:
   - `OPENAI_API_KEY` must be set in `.env` file
   - MCP server running on `localhost:8001`

3. **Database Setup**:
   ```bash
   python database/database_setup.py
   ```

4. **Install Dependencies**:
   ```bash
   pip install -r requirements.txt
   ```

---

**Note**: All agents use LLM backends, connect directly to MCP server (not Python functions), and A2A support is built into Google ADK - no custom classes needed!
