In [1]:
!pip install -U langgraph langchain_openai --quiet


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Multi-Agent Customer Service System with A2A (LangGraph) and MCP 

University of Chicago Applied Data Science

**Author:** Hyunji Amy Kim

**Date:** 12/02/2025

**Goal:**
Build a multi-agent customer service system where specialized agents coordinate using Agent-to-Agent (A2A) communication and access customer data through the Model Context Protocol (MCP). 

### Agents Overview

**1. Router Agent (Orchestrator)**
- Receives customer queries
- Analyzes query intent
- Routes to appropriate specialist agent
- Coordinates responses from multiple agents

**2. Customer Data Agent (Specialist)**
- Accesses customer database via MCP
- Retrieves customer information
- Updates customer records
- Handles data validation

**3. Support Agent (Specialist)**
- Handles general customer support queries
- Can escalate complex issues
- Requests customer context from Data Agent
- Provides solutions and recommendations


In [1]:
import os
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional, List, Dict
import re
import random

In [None]:
os.environ["OPENAI_API_KEY"] = ""

# =====================================================
# 1️ Define shared state
# =====================================================

class SupportState(TypedDict, total=False):
    input: str
    scenario_type: Optional[str]
    customer_id: Optional[str]

    customer_data: Optional[Dict]
    customer_tier: Optional[str]
    premium_ids: Optional[List[str]]
    billing_info: Optional[str]
    ticket_report: Optional[str]

    support_response: Optional[str]
    router_notes: Optional[str]

    support_seen: Optional[bool]
    needs_billing_context: Optional[bool]
    billing_done: Optional[bool]

    a2a_trace: Optional[str]

    # NEW: safety counter to avoid infinite routing loops
    hop_count: int

In [8]:
# =====================================================
# 2️ MCP client (HTTP + SSE) used by LangGraph agents
# =====================================================

import requests
import json
from typing import Any, Dict, List

# This must match the Flask MCP server in mcp_server.ipynb
MCP_SERVER_URL = "http://127.0.0.1:5000/mcp"


def mcp_send(message: Dict[str, Any]) -> Dict[str, Any]:
    """
    Send a single MCP JSON-RPC message to the MCP server and return
    the JSON-RPC response object.

    The MCP server streams responses using Server-Sent Events (SSE)
    with lines formatted as: "data: {json}\\n\\n".
    """
    # Send the request as a streaming POST
    resp = requests.post(
        MCP_SERVER_URL,
        json=message,
        stream=True,
        timeout=10,
    )
    resp.raise_for_status()

    # Parse the first SSE "data: ..." line as JSON
    for raw_line in resp.iter_lines():
        if not raw_line:
            continue
        # Expect "data: {json}"
        if raw_line.startswith(b"data: "):
            payload = raw_line[len(b"data: "):]
            data = json.loads(payload.decode("utf-8"))
            return data

    # If we reach here, nothing useful was returned
    raise RuntimeError("No SSE data received from MCP server")


def mcp_call_tool(name: str, arguments: Dict[str, Any]) -> Any:
    """
    High-level helper for MCP tools/call.

    - name: MCP tool name (e.g., "get_customer")
    - arguments: dict of tool arguments (must match the tool's inputSchema)

    Returns:
        Parsed Python object that the tool returned (dict/list/string).
    """
    message: Dict[str, Any] = {
        "jsonrpc": "2.0",
        "id": "call-1",  # simple static ID is fine for this assignment
        "method": "tools/call",
        "params": {
            "name": name,
            "arguments": arguments,
        },
    }

    data = mcp_send(message)

    # JSON-RPC error from MCP
    if "error" in data:
        raise RuntimeError(f"MCP error: {data['error']}")

    result = data.get("result", {})
    content = result.get("content", [])

    # According to mcp_server.ipynb, tools/call returns:
    #   "result": {
    #       "content": [
    #           {"type": "text", "text": "{...json...}"}
    #       ]
    #   }
    if isinstance(content, list) and content:
        first = content[0]
        if isinstance(first, dict) and first.get("type") == "text":
            text = first.get("text", "")
            # Tool helpers in the server already json.dumps() the dict,
            # so here we try to parse it back into Python.
            try:
                return json.loads(text)
            except json.JSONDecodeError:
                # Fallback: just return the raw text if it is not valid JSON
                return text

    # Fallback: return the raw "result" if content is missing
    return result

# ----- Domain-specific helpers that call MCP tools used by the LangGraph agents -----

def mcp_get_customer(customer_id: str) -> Dict[str, Any]:
    """
    Call the MCP 'get_customer' tool.

    Returns a dict like:
        {
            "success": True/False,
            "customer": {...}  # when success is True
            "error": "...",    # when success is False
        }
    """
    try:
        cid = int(customer_id)
    except (TypeError, ValueError):
        return {"success": False, "error": "Invalid customer ID"}

    result = mcp_call_tool("get_customer", {"customer_id": cid})

    # The tool already returns a dict with "success", "customer", etc.
    return result


def mcp_get_billing_info(customer_id: str) -> str:
    """
    Build a human-readable billing summary by calling the MCP
    'get_customer_history' tool and filtering billing-related tickets.

    This is used in Scenario 2 (cancellation + billing issues).
    """
    try:
        cid = int(customer_id)
    except (TypeError, ValueError):
        return "Invalid customer ID for billing lookup."

    result = mcp_call_tool("get_customer_history", {"customer_id": cid})
    if not result.get("success"):
        return "Unable to retrieve billing history."

    tickets = result.get("tickets", [])
    billing_related = [
        t
        for t in tickets
        if "bill" in t.get("issue", "").lower()
        or "invoice" in t.get("issue", "").lower()
        or "payment" in t.get("issue", "").lower()
    ]

    if not billing_related:
        return "No billing-related issues found for this customer."

    lines: List[str] = []
    for t in billing_related:
        lines.append(
            f"- [{t.get('created_at')}] "
            f"({str(t.get('priority', '')).upper()}/{t.get('status')}) "
            f"{t.get('issue')}"
        )

    return "Recent billing-related tickets:\n" + "\n".join(lines)


def mcp_get_premium_customers() -> List[str]:
    """
    Identify "premium" customers using MCP tools.

    For this assignment, we define a "premium" customer as:
      - a customer that has at least one high-priority ticket.

    Implementation:
      - call 'list_customers' to get all customers
      - for each, call 'get_customer_history' and check for high-priority tickets
    """
    result = mcp_call_tool("list_customers", {})
    if not result.get("success"):
        return []

    customers = result.get("customers", [])
    premium_ids: List[str] = []

    for c in customers:
        cid = c.get("id")
        if cid is None:
            continue

        hist = mcp_call_tool("get_customer_history", {"customer_id": cid})
        if not hist.get("success"):
            continue

        tickets = hist.get("tickets", [])
        has_high = any(t.get("priority") == "high" for t in tickets)
        if has_high:
            premium_ids.append(str(cid))

    return premium_ids


def mcp_get_high_priority_tickets(customer_ids: List[str]) -> str:
    """
    Aggregate high-priority tickets for a list of customer IDs by calling
    the MCP 'get_customer_history' tool for each customer.

    This is used in Scenario 3 (status of all high-priority tickets for premium customers).
    """
    lines: List[str] = []

    for cid_str in customer_ids:
        try:
            cid = int(cid_str)
        except (TypeError, ValueError):
            continue

        hist = mcp_call_tool("get_customer_history", {"customer_id": cid})
        if not hist.get("success"):
            continue

        tickets = hist.get("tickets", [])
        for t in tickets:
            if t.get("priority") != "high":
                continue

            lines.append(
                f"Customer #{cid} → "
                f"Ticket #{t.get('id')} "
                f"[{t.get('status')}] : {t.get('issue')} "
                f"({t.get('created_at')})"
            )

    if not lines:
        return "No high-priority tickets found for the given premium customers."

    return "High-priority tickets:\n" + "\n".join(lines)

In [9]:
# =====================================================
# 3 Define Agents (each chooses its tool dynamically)
# =====================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# --- Router Agent ---
def _classify_scenario(text: str) -> str:
    q = text.lower()
    if "high-priority" in q and "premium" in q and "ticket" in q:
        return "premium_ticket_status"         # Scenario 3
    if "cancel" in q and ("billing" in q or "charge" in q or "payment" in q):
        return "cancel_billing"               # Scenario 2
    if "customer id" in q or "account" in q:
        return "account_help"                 # Scenario 1
    return "generic_support"

def _extract_customer_id(text: str) -> Optional[str]:
    # simple heuristic: first 3+ digit number
    m = re.search(r"\b(\d{3,})\b", text)
    return m.group(1) if m else None

def router_agent(state: SupportState) -> SupportState:
    trace = state.get("a2a_trace", "")
    hop = state.get("hop_count", 0) + 1
    trace += f"Router: hop_count={hop}\n"

    # update hop_count and then classify or revisit
    updated: SupportState = {
        "hop_count": hop,
        "a2a_trace": trace,
    }

    # First time classification
    if "scenario_type" not in state or state["scenario_type"] is None:
        scenario = _classify_scenario(state["input"])
        cid = state.get("customer_id") or _extract_customer_id(state["input"])
        trace += f"Router: classified scenario={scenario}, customer_id={cid}\n"
        updated["scenario_type"] = scenario
        updated["customer_id"] = cid
        updated["router_notes"] = f"Scenario detected: {scenario}"
        updated["a2a_trace"] = trace
        return updated

    # Revisit
    trace += "Router: revisiting to decide next hop based on updated state.\n"
    updated["a2a_trace"] = trace
    return updated


# --- Customer Data Agent ---
def customer_data_agent(state: SupportState) -> SupportState:
    scenario = state.get("scenario_type")
    cid = state.get("customer_id") or _extract_customer_id(state["input"])
    trace = state.get("a2a_trace", "")

    if scenario == "account_help":
        # Scenario 1: Check customer account details
        customer = mcp_get_customer(cid) if cid else {}
        tier = customer.get("tier", "standard") if customer else "standard"
        trace += f"CustomerDataAgent: fetched customer={cid}, tier={tier}\n"
        return {
            "customer_id": cid,
            "customer_data": customer,
            "customer_tier": tier,
            "a2a_trace": trace,
        }

    if scenario == "cancel_billing":
        # Scenario 2: Check billing info before cancellation
        billing = mcp_get_billing_info(cid or "unknown")
        trace += f"CustomerDataAgent: retrieved billing info for {cid}\n"
        return {
            "customer_id": cid,
            "billing_info": billing,
            "billing_done": True,
            "a2a_trace": trace,
        }

    if scenario == "premium_ticket_status":
        # Scenario 3: Check premium customers
        premium_ids = mcp_get_premium_customers()
        trace += f"CustomerDataAgent: premium customers = {premium_ids}\n"
        return {
            "premium_ids": premium_ids,
            "a2a_trace": trace,
        }

    trace += "CustomerDataAgent: no-op for this scenario.\n"
    return {"a2a_trace": trace}

# --- Support Agent ---
def support_agent(state: SupportState) -> SupportState:
    scenario = state.get("scenario_type")
    query = state["input"]
    trace = state.get("a2a_trace", "")

    # ---------- Scenario 1: account help ----------
    if scenario == "account_help":
        customer = state.get("customer_data") or {}
        tier = state.get("customer_tier", "standard")
        name = customer.get("name", "the customer")

        response = (
            f"Hi, {name}. I can help with your account.\n"
            f"- Detected tier: {tier}\n"
            "Since you're a premium customer, you have priority support and can reach us 24/7.\n"
            "Let me know the specific issue with your account (login, billing, settings, etc.)."
            if tier == "premium" else
            f"Hi, {name}. I can help with your account. "
            "Please tell me more about the problem you're encountering."
        )
        trace += "SupportAgent: generated response for Scenario 1 (account help).\n"
        return {
            "support_response": response,
            "support_seen": True,
            "a2a_trace": trace,
        }

    # ---------- Scenario 2: cancellation + billing ----------
    if scenario == "cancel_billing":
        billing_info = state.get("billing_info")
        support_seen = state.get("support_seen", False)

        if not support_seen and billing_info is None:
            trace += "SupportAgent: needs billing context → asking CustomerDataAgent.\n"
            return {
                "support_seen": True,
                "needs_billing_context": True,
                "a2a_trace": trace,
            }

        if billing_info is not None:
            response = (
                "I understand you’d like to cancel your subscription and you’re also "
                "experiencing billing issues.\n\n"
                f"Here is your current billing status:\n{billing_info}\n\n"
                "First, we’ll resolve the outstanding billing problem. Once that’s cleared, "
                "I can process your cancellation immediately or help you switch to a lower-cost plan "
                "if you’d prefer to keep some access."
            )
            trace += "SupportAgent: produced coordinated response using billing context (Scenario 2).\n"
            return {
                "support_response": response,
                "needs_billing_context": False,
                "a2a_trace": trace,
            }

        # Fallback
        trace += "SupportAgent: waiting for billing_info...\n"
        return {"a2a_trace": trace}

    # ---------- Scenario 3: high-priority tickets for premium customers ----------
    if scenario == "premium_ticket_status":
        premium_ids = state.get("premium_ids")
        ticket_report = state.get("ticket_report")

        if premium_ids and ticket_report is None:
            ticket_report = mcp_get_high_priority_tickets(premium_ids)
            trace += "SupportAgent: fetched high-priority tickets for premium customers.\n"
        elif ticket_report is None:
            trace += "SupportAgent: no premium_ids available yet, waiting...\n"
            return {"a2a_trace": trace}

        response = (
            "Here is the status of all high-priority tickets for premium customers:\n\n"
            f"{ticket_report}\n\n"
            "If you’d like, I can also generate a CSV or summary grouped by customer."
        )
        trace += "SupportAgent: generated multi-step coordinated report (Scenario 3).\n"
        return {
            "ticket_report": ticket_report,
            "support_response": response,
            "a2a_trace": trace,
        }

    # ---------- Generic fallback ----------
    trace += "SupportAgent: handled generic support scenario.\n"
    return {
        "support_response": "Thanks for reaching out. Can you tell me more about your issue?",
        "a2a_trace": trace,
    }

In [None]:
# =====================================================
# 4 LangGraph Wiring (A2A Routing)
# =====================================================
graph = StateGraph(SupportState)
graph = StateGraph(SupportState)

graph.add_node("Router", router_agent)
graph.add_node("CustomerDataAgent", customer_data_agent)
graph.add_node("SupportAgent", support_agent)

graph.add_edge(START, "Router")

graph.add_edge("CustomerDataAgent", "Router")
graph.add_edge("SupportAgent", "Router")

def route_decision(state: SupportState) -> str:
    """
    Decide the next agent based on current state.
    """
    scenario = state.get("scenario_type")
    # Scenario 1: "I need help with my account, customer ID 12345"
    if scenario == "account_help":
        if not state.get("customer_data"):
            return "CustomerDataAgent"
        if not state.get("support_response"):
            return "SupportAgent"
        return "END"

    # Scenario 2: "I want to cancel my subscription but I'm having billing issues"
    if scenario == "cancel_billing":
        # If SupportAgent hasn't seen the request yet, send there first
        if not state.get("support_seen"):
            return "SupportAgent"
        # If billing context is needed, go to CustomerDataAgent
        if state.get("needs_billing_context", False) and not state.get("billing_done"):
            return "CustomerDataAgent"
        # If billing is done but no support response yet, go back to SupportAgent
        if state.get("billing_done", False) and not state.get("support_response"):
            return "SupportAgent"
        return "END"

    # Scenario 3: "What's the status of all high-priority tickets for premium customers?"
    if scenario == "premium_ticket_status":
        # Step 1: premium customer list → CustomerDataAgent
        if not state.get("premium_ids"):
            return "CustomerDataAgent"
        # Step 2: premium_ids based high-priority tickets → SupportAgent
        if not state.get("ticket_report"):
            return "SupportAgent"
        return "END"

    # Generic scenario → SupportAgent directly
    if not state.get("support_response"):
        return "SupportAgent"
    return "END"

# Router decides next hop
graph.add_conditional_edges(
    "Router",
    route_decision,
    {
        "CustomerDataAgent": "CustomerDataAgent",
        "SupportAgent": "SupportAgent",
        "END": END,
    },
)

router_system = graph.compile()

In [11]:
# =====================================================
# 5 Run Demo Queries
# =====================================================
test_queries = [
    # Scenario 1
    "I need help with my account, customer ID 12345",
    # Scenario 2
    "I want to cancel my subscription but I'm having billing issues",
    # Scenario 3
    "What's the status of all high-priority tickets for premium customers?",
    # Scenario 4
    "Get customer information for ID 5",
    # Scenario 5
    "Show me all active customers who have open tickets",
]

for q in test_queries:
    print("======================================================")
    print("USER:", q)
    print("------------------------------------------------------")
    result = router_system.invoke({"input": q})
    print("\nFinal response:\n", result.get("support_response"))
    print("\nA2A trace:\n", result.get("a2a_trace"))
    print("======================================================\n")

USER: I need help with my account, customer ID 12345
------------------------------------------------------

Final response:
 Hi, the customer. I can help with your account. Please tell me more about the problem you're encountering.

A2A trace:
 Router: hop_count=1
Router: classified scenario=account_help, customer_id=12345
CustomerDataAgent: fetched customer=12345, tier=standard
Router: hop_count=2
Router: revisiting to decide next hop based on updated state.
SupportAgent: generated response for Scenario 1 (account help).
Router: hop_count=3
Router: revisiting to decide next hop based on updated state.


USER: I want to cancel my subscription but I'm having billing issues
------------------------------------------------------

Final response:
 I understand you’d like to cancel your subscription and you’re also experiencing billing issues.

Here is your current billing status:
Invalid customer ID for billing lookup.

First, we’ll resolve the outstanding billing problem. Once that’s cle

**Conclusion**

Through implementing the MCP-based A2A multi-agent system, I gained a deeper understanding of structured agent collaboration and how explicit tool-mediated data retrieval enhances reliability. I learned how agents can share state context using LangGraph, and how orchestration logic in the Router agent enables dynamic delegation of tasks, rather than static pipelines. Designing the system to maintain alignment between retrieved data, internal reasoning steps, and final outputs helped reinforce the importance of transparency and explainability in multi-agent architectures.

One of the challenges was ensuring that agents did not enter recursive or circular routing loops, which initially required careful state tracking and termination conditions. Another difficulty was fully decoupling database access so that all data interactions occurred exclusively through MCP protocol rather than direct DB queries. Resolving these required a clearer separation of responsibilities across agents and robust message passing through validated MCP tool calls. Ultimately, these challenges helped shape a more modular, scalable, and trustworthy multi-agent architecture.