# Multi-Agent CSS

## System Architecture
- **Router Agent**: Routes and orchestrates requests to specialist agents
- **Customer Data Agent** (port 10021): Handles customer information and data queries
- **Support Agent**: Manages support tickets and resolution tracking

Github Link: https://github.com/heyhassaan/Multi-Agent-Customer-Service-System-with-A2A-and-MCP

**Import Lib**

In [None]:
import asyncio
import pathlib
import signal
import subprocess
import sys
import time
import psutil
from IPython.display import Markdown, display

try:
    import httpx
except ImportError:
    !pip install httpx
    import httpx

PROJECT_ROOT = pathlib.Path.cwd()
ROUTER_AGENT_CARD_URL = "http://localhost:10020/.well-known/agent-card.json"
AGENT_PORTS = [10020, 10021, 10022]
DATABASE_NAME = "multi_agent_service.db"

**Functions**

In [None]:
def terminate_processes_on_ports(ports: list[int]) -> None:
    """Terminate any processes using the specified ports."""
    terminated_ports = []
    for port in ports:
        for proc in psutil.process_iter(['pid', 'name']):
            try:
                for conn in proc.connections():
                    if conn.laddr.port == port:
                        print(f"üî™ Terminating process {proc.pid} ({proc.name()}) on port {port}")
                        proc.kill()
                        terminated_ports.append(port)
                        break
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                pass
    
    if terminated_ports:
        time.sleep(2)  # Wait for ports to be released
        print(f"Cleaned up ports: {terminated_ports}")
    else:
        print("No processes to clean up")


async def wait_for_router_ready(timeout: float = 30.0) -> bool:
    """Poll the router AgentCard endpoint until it responds or times out."""
    print("Waiting for router to be ready...")
    start = time.perf_counter()
    async with httpx.AsyncClient() as client:
        while True:
            try:
                resp = await client.get(ROUTER_AGENT_CARD_URL, timeout=2)
                if resp.status_code == 200:
                    elapsed = time.perf_counter() - start
                    print(f"Router ready after {elapsed:.1f}s")
                    return True
            except httpx.HTTPError:
                pass
            
            if time.perf_counter() - start > timeout:
                print(f"Router not reachable after {timeout}s")
                return False
            
            await asyncio.sleep(0.5)


def start_agent_servers() -> subprocess.Popen:
    """Launch all A2A agent servers."""
    print("Starting A2A agent servers...")
    return subprocess.Popen(
        [sys.executable, "agents_server.py"],
        cwd=PROJECT_ROOT,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )


def stop_agent_servers(proc: subprocess.Popen) -> str:
    """Terminate the background server process and return logs."""
    print("Stopping agent servers...")
    logs = ""
    
    if proc.poll() is None:
        proc.send_signal(signal.SIGINT)
        try:
            proc.wait(timeout=5)
        except subprocess.TimeoutExpired:
            print("Servers didn't stop gracefully, forcing termination...")
            proc.kill()
    
    if proc.stdout:
        logs = proc.stdout.read()
    
    print("Servers stopped")
    return logs

In [None]:
def execute_test_scenarios() -> subprocess.CompletedProcess:
    """Execute the test scenario runner and capture output."""
    print("\n" + "="*60)
    print("Executing multi-agent test scenarios...")
    print("="*60 + "\n")
    
    return subprocess.run(
        [sys.executable, "demo_scenarios.py"],
        cwd=PROJECT_ROOT,
        capture_output=True,
        text=True,
    )

**Unblock Ports**

This ensures no previous agent server instances are blocking the required ports.

In [None]:
try:
    import psutil
except ImportError:
    !pip install psutil
    import psutil

terminate_processes_on_ports(AGENT_PORTS)

‚úÖ No processes to clean up


  for conn in proc.connections():


**Agent Servers**

This will start all three agent servers (Router, Customer Data, and Support) in the background.

In [None]:
agent_server_process = start_agent_servers()

# Give servers a moment to initialize
await asyncio.sleep(3)

# Check if process is still running
if agent_server_process.poll() is not None:
    print(" Server process exited unexpectedly!")
    if agent_server_process.stdout:
        print("\nServer output:")
        print(agent_server_process.stdout.read())
else:
    print(" Agent server process is running")

üöÄ Starting A2A servers...
‚úÖ Server process is running


## Step 3: Wait for Router to be Ready

Poll the router's health endpoint to ensure it's ready to accept requests.

In [None]:
router_is_ready = await wait_for_router_ready(timeout=30.0)

if not router_is_ready:
    print("\n Router failed to start. Checking server logs...\n")
    logs = stop_agent_servers(agent_server_process)
    print(logs[:1000])  # Print first 1000 chars of logs
    raise RuntimeError("Router did not become ready in time")

‚è≥ Waiting for router to be ready...
‚úÖ Router ready after 0.0s


## Step 4: Run Test Scenarios

Execute all the test scenarios defined in `demo_scenarios.py`.

In [None]:
scenario_result = execute_test_scenarios()

# Display the output
print(scenario_result.stdout)

if scenario_result.stderr:
    print("\n" + "="*60)
    print("STDERR:")
    print("="*60)
    print(scenario_result.stderr)

if scenario_result.returncode != 0:
    print(f"\n Test scenarios exited with code {scenario_result.returncode}")
else:
    print("\n All test scenarios completed successfully!")


Running test scenarios...

Multi-Agent Customer Service System (A2A)
Test Scenarios - Multi-Turn Conversations

Starting test scenarios...


SCENARIO 1: Multi-turn customer support with ID follow-up

[User]: I need help upgrading my account
[Agent]: I can certainly help with that. To start, could you please provide your customer ID?


[User]: My customer ID is 12345
[Agent]: Great, I have created a ticket (ID 19) for your account upgrade. The concerned team will reach out to you shortly to assist with the process. Is there anything else I can help you with?


[User]: What options do I have?
[Agent]: I understand you're asking about options related to your account upgrade. To provide the best assistance, could you please clarify what options you're interested in? Are you looking for information about the upgrade process, different upgrade plans, or something else related to the ticket I created for you?



SCENARIO 2: Billing issue with multiple follow-ups

[User]: I have a billing pro

**Stop Servers**

In [None]:
server_log_content = stop_agent_servers(agent_server_process)

# Display relevant server log excerpts
print("\n" + "="*60)
print("Agent Server Logs (last 2000 chars):")
print("="*60)
print(server_log_content[-2000:])

üõë Stopping servers...
‚úÖ Servers stopped

Server Logs (last 2000 chars):
k not found or task_id not set. Creating new task for event (task_id: cfb1c9df-b9fa-4d22-9bb4-7bf9b7aa74e5, context_id: 4679817f-f3be-439a-9ce7-9c806a8d3d40).
HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-lite:generateContent "HTTP/1.1 200 OK"
Response received from the model.
Sending out request, model: gemini-2.0-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False
HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-lite:generateContent "HTTP/1.1 200 OK"
Response received from the model.
INFO:     127.0.0.1:56855 - "POST / HTTP/1.1" 200 OK
HTTP Request: POST http://localhost:10021 "HTTP/1.1 200 OK"
Sending out request, model: gemini-2.0-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False
Task not found or task_id not set. Creating new task for event (task_id: bdb69b65-08e8-42fe-a300-a45b0270aa50, cont

**Save Output**

In [None]:
output_file_content = f"""# Multi-Agent System Demo Run Output

## Command
```
python main_a2a.py
```

## Standard Output
```
{scenario_result.stdout.strip()}
```

## Error Output
```
{scenario_result.stderr.strip() or '<<empty>>'}
```

## Agent Server Logs (last 2000 chars)
```
{server_log_content[-2000:].strip()}
```
"""

output_file_path = PROJECT_ROOT / "demo_output.md"
output_file_path.write_text(output_file_content)

print(f"\nüìù Output saved to: {output_file_path}")
print("\n" + "="*60)
print("Multi-Agent Demo Complete!")
print("="*60)


üìù Output saved to: /Users/karim/Desktop/uchicago/Term 4/gen ai/ass5/demo_output.md

Demo Complete!


In [None]:
# Emergency cleanup - use if needed to force stop all agent servers
try:
    stop_agent_servers(agent_server_process)
except:
    pass

terminate_processes_on_ports(AGENT_PORTS)
print("‚úÖ Emergency cleanup complete")

üõë Stopping servers...
‚úÖ Servers stopped
‚úÖ No processes to clean up
‚úÖ Cleanup complete


  for conn in proc.connections():


# Conclusion

Rebuilding the customer-support workflow with Google‚Äôs A2A protocol reinforced how valuable clear agent boundaries are. Once each assistant exposed an AgentCard and a JSON-RPC endpoint, orchestration became a matter of passing tasks rather than wiring bespoke function calls. Pairing that with MCP meant the Customer Data Agent never needed to know *how* the database was implemented‚Äîit simply invoked standardized tools. The Router/Specialist pattern worked well in this setting: once the router knew which intents required data first (premium customers, billing escalations, etc.), the rest of the pipeline felt like composing Lego bricks. The hardest problem was getting the negotiation scenarios to feel natural. Support needs customer IDs and billing context, but the Router must stay in control, so the prompts had to encourage the specialists to explicitly request missing context instead of hallucinating. Automating the demo helped shake out timing issues‚Äîspinning up three uvicorn servers, waiting for AgentCards, and then streaming queries uncovered race conditions I wouldn‚Äôt have spotted manually. Finally, running everything through a single script (and capturing logs) exposed how sensitive the Gemini API is to throttling, so I added conservative delays between scenarios to keep the run stable.
