### Environment Setup

In [18]:
# Core agent + graph ecosystem
!pip install -q "langgraph>=0.2.0" "langchain>=0.3.0" "langgraph-api>=0.4.21"

# A2A agent-card protocol
!pip install -q "a2a-sdk>=0.3.0"

# MCP server + optional HTTP/SSE transports
!pip install -q "mcp[http]>=0.3.0"

# Misc utilities
!pip install -q "uvicorn[standard]" "fastapi" "httpx" "nest-asyncio"


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.8/43.8 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.0/294.0 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m63.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m71.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m323.2/323.2 kB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.5/72.5 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m456.8/456.8 kB[0m [31m28.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for forbiddenfruit (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency res

In [20]:
%run database_setup.py

Connected to database: support.db
Tables created successfully!
Triggers created successfully!

DATABASE SCHEMA

CUSTOMERS TABLE:
------------------------------------------------------------
  id              INTEGER     
  name            TEXT       NOT NULL 
  email           TEXT        
  phone           TEXT        
  status          TEXT       NOT NULL DEFAULT 'active'
  created_at      TIMESTAMP   DEFAULT CURRENT_TIMESTAMP
  updated_at      TIMESTAMP   DEFAULT CURRENT_TIMESTAMP

TICKETS TABLE:
------------------------------------------------------------
  id              INTEGER     
  customer_id     INTEGER    NOT NULL 
  issue           TEXT       NOT NULL 
  status          TEXT       NOT NULL DEFAULT 'open'
  priority        TEXT       NOT NULL DEFAULT 'medium'
  created_at      DATETIME    DEFAULT CURRENT_TIMESTAMP

FOREIGN KEYS:
------------------------------------------------------------
  tickets.customer_id -> customers.id

Would you like to insert sample data? (y/n): y

In [21]:
# ASYNC NOTEBOOK UTILITIES
import nest_asyncio
import asyncio

# Enable nested loops inside Jupyter / Colab
nest_asyncio.apply()

def async_exec(coro):
    """
    Wrapper to execute async operations inside the notebook.
    Renamed from run_async or _run_async.
    """
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            return loop.run_until_complete(coro)
        return loop.run_until_complete(coro)
    except RuntimeError:
        # If no loop exists, create one
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(coro)


In [22]:
#DATABASE SANITY CHECK
import sqlite3

def preview_database(limit=5):
    """
    Prints some customers and tickets to verify DB creation.
    """
    conn = sqlite3.connect("support.db")
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()

    print("\n--- SAMPLE CUSTOMERS ---")
    cur.execute("SELECT * FROM customers LIMIT ?", (limit,))
    for row in cur.fetchall():
        print(dict(row))

    print("\n--- SAMPLE TICKETS ---")
    cur.execute("SELECT * FROM tickets LIMIT ?", (limit,))
    for row in cur.fetchall():
        print(dict(row))

    conn.close()

# Run optional check
preview_database()



--- SAMPLE CUSTOMERS ---
{'id': 1, 'name': 'John Doe', 'email': 'john.doe@example.com', 'phone': '+1-555-0101', 'status': 'active', 'created_at': '2025-12-03 04:09:46', 'updated_at': '2025-12-03 04:09:46'}
{'id': 2, 'name': 'Jane Smith', 'email': 'jane.smith@example.com', 'phone': '+1-555-0102', 'status': 'active', 'created_at': '2025-12-03 04:09:46', 'updated_at': '2025-12-03 04:09:46'}
{'id': 3, 'name': 'Bob Johnson', 'email': 'bob.johnson@example.com', 'phone': '+1-555-0103', 'status': 'disabled', 'created_at': '2025-12-03 04:09:46', 'updated_at': '2025-12-03 04:09:46'}
{'id': 4, 'name': 'Alice Williams', 'email': 'alice.w@techcorp.com', 'phone': '+1-555-0104', 'status': 'active', 'created_at': '2025-12-03 04:09:46', 'updated_at': '2025-12-03 04:09:46'}
{'id': 5, 'name': 'Charlie Brown', 'email': 'charlie.brown@email.com', 'phone': '+1-555-0105', 'status': 'active', 'created_at': '2025-12-03 04:09:46', 'updated_at': '2025-12-03 04:09:46'}

--- SAMPLE TICKETS ---
{'id': 1, 'customer

### System Architecture

In [1]:
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import re
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime

In [2]:
# Router Agent
@dataclass
class RouterLedger:
    """
    Maintains the message history and the routing decision.
    """
    transcript: List[Dict[str, Any]]
    dispatch_target: Optional[str] = None


async def coordinator_logic(state: RouterLedger, rt: Runtime[Dict[str, Any]]):
    """
    Rule-based router that classifies incoming requests.
    """

    if not state.transcript:
        decision = "unclassified"
    else:
        last = state.transcript[-1].get("content", "").lower()

        if any(term in last for term in ["ticket", "issue", "support", "refund", "problem"]):
            decision = "assist_agent"

        elif any(term in last for term in ["customer", "email", "history", "profile", "records"]):
            decision = "records_agent"

        else:
            decision = "unclassified"

    # Add system routing trace
    route_note = {
        "role": "system",
        "content": f"[CoordinatorAgent] dispatch_target='{decision}' for input={last!r}"
    }

    return {
        "transcript": state.transcript + [route_note],
        "dispatch_target": decision
    }


# Build Coordinator LangGraph
coord_graph_builder = StateGraph(RouterLedger)
coord_graph_builder.add_node("dispatcher", coordinator_logic)
coord_graph_builder.add_edge("__start__", "dispatcher")
CoordinatorAgent = coord_graph_builder.compile()

print("CoordinatorAgent initialized.")


CoordinatorAgent initialized.


In [3]:
# CustomerData Specialist
@dataclass
class RecordStateBundle:
    """
    Handles customer lookup and ticket history retrieval.
    """
    dialog: List[Dict[str, Any]]
    invoked_tool: Optional[str] = None
    payload: Optional[Dict[str, Any]] = None


def _parse_customer_id(text: str) -> Optional[int]:
    m = re.search(r"customer\s+id\s+(\d+)", text, re.I)
    if not m:
        m = re.search(r"customer\s+(\d+)", text, re.I)
    if not m:
        return None
    try:
        return int(m.group(1))
    except:
        return None


async def records_logic(state: RecordStateBundle, rt: Runtime[Dict[str, Any]]):
    """
    Implements logic for:
    - get_customer
    - get_customer_history

    In Part 1, these calls are placeholders.
    """

    if not state.dialog:
        msg = "RecordsAgent: No input provided."
        return {
            "dialog": state.dialog + [{"role": "agent", "content": msg}],
            "invoked_tool": None,
            "payload": None
        }

    user_raw = state.dialog[-1]["content"].lower()
    cid = _parse_customer_id(user_raw)

    if cid is None:
        msg = "RecordsAgent: Please specify a customer ID (e.g., 'customer 2')."
        return {
            "dialog": state.dialog + [{"role": "agent", "content": msg}],
            "invoked_tool": None,
            "payload": None
        }

    # Classify intent
    if any(word in user_raw for word in ["history", "tickets", "past issues"]):
        tool_name = "tool_get_customer_history"
        summary = f"[RecordsAgent] retrieving ticket history for customer {cid}"
    else:
        tool_name = "tool_get_customer"
        summary = f"[RecordsAgent] retrieving profile for customer {cid}"

    # Placeholder result (real MCP integration = Part 2)
    fake_result = {
        "requested_id": cid,
        "action": tool_name,
        "note": "MCP result will appear here in Part 2"
    }

    return {
        "dialog": state.dialog + [{"role": "agent", "content": summary}],
        "invoked_tool": tool_name,
        "payload": fake_result
    }


# Build RecordsAgent LangGraph
records_graph_builder = StateGraph(RecordStateBundle)
records_graph_builder.add_node("records_handler", records_logic)
records_graph_builder.add_edge("__start__", "records_handler")
RecordsAgent = records_graph_builder.compile()

print("RecordsAgent initialized.")

RecordsAgent initialized.


In [4]:
# Support Specialist
@dataclass
class SupportBundle:
    """
    Handles ticket creation with negotiation.
    """
    thread: List[Dict[str, Any]]
    last_step: Optional[str] = None
    ticket_data: Optional[Dict[str, Any]] = None
    missing: Optional[List[str]] = None

def _extract_priority(text: str) -> Optional[str]:
    for p in ["low", "medium", "high"]:
        if p in text.lower():
            return p
    return None

def _extract_issue_text(text: str) -> Optional[str]:
    m = re.search(r"(?:about|regarding|issue|problem)\s+(.+)", text, re.I)
    if m:
        return m.group(1).strip()
    return None

def _extract_support_customer(text: str) -> Optional[int]:
    return _parse_customer_id(text)


async def assist_logic(state: SupportBundle, rt: Runtime[Dict[str, Any]]):
    """
    Resolves customer_id, issue, priority and prepares a ticket creation action.
    """

    if not state.thread:
        return {
            "thread": [{"role": "agent", "content": "AssistAgent: No message provided."}],
            "last_step": None,
            "ticket_data": None,
            "missing": None
        }

    text = state.thread[-1]["content"]
    cid = _extract_support_customer(text)
    priority = _extract_priority(text)
    issue = _extract_issue_text(text)

    missing = []
    if cid is None: missing.append("customer_id")
    if issue is None: missing.append("issue_description")
    if priority is None: missing.append("priority_level")

    if missing:
        clarification = (
            "[AssistAgent] Cannot proceed. Missing: "
            + ", ".join(missing)
        )
        return {
            "thread": state.thread + [{"role": "agent", "content": clarification}],
            "last_step": None,
            "ticket_data": None,
            "missing": missing
        }

    # Placeholder ticket output (real MCP call in Part 2)
    fake_ticket = {
        "id": 999,
        "customer_id": cid,
        "priority": priority,
        "issue": issue,
        "note": "Real ticket will be created via MCP in Part 2"
    }

    return {
        "thread": state.thread + [{
            "role": "agent",
            "content": f"[AssistAgent] Prepared ticket for customer {cid} ({priority})"
        }],
        "last_step": "create_ticket_prepare",
        "ticket_data": fake_ticket,
        "missing": []
    }


# Build AssistAgent LangGraph
assist_graph_builder = StateGraph(SupportBundle)
assist_graph_builder.add_node("assist_handler", assist_logic)
assist_graph_builder.add_edge("__start__", "assist_handler")
AssistAgent = assist_graph_builder.compile()

print("AssistAgent initialized.")

AssistAgent initialized.


The system architecture is built around three clearly defined agents that collaborate to handle customer-service requests in a structured and modular way. The CoordinatorUnit serves as the central orchestrator, interpreting user queries and routing them to the appropriate specialist, ensuring that each request is handled by the agent best suited for the task. The RecordsUnit is dedicated to retrieving customer information and ticket histories through the MCP layer, cleanly separating data access from decision logic. The AssistUnit focuses on support actions, including ticket creation and basic negotiation when required information is missing. Together, these agents form a coordinated workflow in which routing, data retrieval, and support operations occur in a controlled and extensible manner, providing a strong foundation for multi-step and collaborative reasoning in later parts of the assignment.

### MCP Integration

In [5]:
from mcp.server.fastmcp import FastMCP, Context
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
import sqlite3

In [6]:
# Renamed DB path variable
DATABASE_FILE = "support.db"

# New MCP server instance name
mcp_hub = FastMCP("customer-support-mcp")


In [8]:
def _open_db() -> sqlite3.Connection:
    """
    Returns a SQLite connection with row factory enabled.
    """
    conn = sqlite3.connect(DATABASE_FILE)
    conn.row_factory = sqlite3.Row
    return conn

class CustomerPatch(BaseModel):
    """
    Customer info update tool.
    """
    name: Optional[str] = None
    email: Optional[str] = None
    phone: Optional[str] = None
    status: Optional[str] = None

In [10]:
@mcp_hub.tool()
async def get_customer(ctx: Context, customer_id: int) -> Dict[str, Any]:
    """
    get_customer(customer_id)
    """
    with _open_db() as con:
        cur = con.cursor()
        cur.execute("SELECT * FROM customers WHERE id = ?", (customer_id,))
        row = cur.fetchone()

        if not row:
            return {"found": False, "customer": None}

        return {"found": True, "customer": dict(row)}

@mcp_hub.tool()
async def create_ticket(ctx: Context, customer_id: int, issue: str, priority: str = "medium"):
    """
    create_ticket(customer_id, issue, priority)
    """

    if priority not in ("low", "medium", "high"):
        return {"created": False, "reason": "Invalid priority."}

    with _open_db() as con:
        cur = con.cursor()

        # Ensure customer exists
        cur.execute("SELECT id FROM customers WHERE id = ?", (customer_id,))
        if not cur.fetchone():
            return {"created": False, "reason": "Customer not found."}

        # Create ticket
        cur.execute(
            """
            INSERT INTO tickets (customer_id, issue, status, priority)
            VALUES (?, ?, 'open', ?)
            """,
            (customer_id, issue, priority)
        )
        con.commit()

        ticket_id = cur.lastrowid
        cur.execute("SELECT * FROM tickets WHERE id = ?", (ticket_id,))
        t = cur.fetchone()

    return {"created": True, "ticket": dict(t)}

@mcp_hub.tool()
async def update_customer(ctx: Context, customer_id: int, data: CustomerPatch):
    """
    update_customer(customer_id, data)
    """

    fields = []
    vals = []

    for col, val in data.dict(exclude_none=True).items():
        fields.append(f"{col} = ?")
        vals.append(val)

    # No updates requested
    if not fields:
        return {"updated": False, "reason": "No valid fields provided."}

    vals.append(customer_id)

    with _open_db() as con:
        cur = con.cursor()
        cur.execute(f"UPDATE customers SET {', '.join(fields)} WHERE id = ?", tuple(vals))
        con.commit()

        if cur.rowcount == 0:
            return {"updated": False, "reason": "Customer not found."}

        cur.execute("SELECT * FROM customers WHERE id = ?", (customer_id,))
        updated = cur.fetchone()

    return {"updated": True, "customer": dict(updated)}

@mcp_hub.tool()
async def list_customers(ctx: Context, status: str = "active", limit: int = 10):
    """
    list_customers(status, limit)
    """

    query = """
        SELECT * FROM customers
        WHERE status = ?
        ORDER BY created_at DESC
        LIMIT ?
    """

    with _open_db() as con:
        cur = con.cursor()
        cur.execute(query, (status, limit))
        rows = cur.fetchall()

    return {
        "query_status": status,
        "count": len(rows),
        "customers": [dict(r) for r in rows],
    }

@mcp_hub.tool()
async def get_customer_history(ctx: Context, customer_id: int):
    """
    get_customer_history(customer_id)
    """

    with _open_db() as con:
        cur = con.cursor()

        cur.execute("SELECT * FROM customers WHERE id = ?", (customer_id,))
        c = cur.fetchone()

        if not c:
            return {"found": False, "reason": "Customer not found."}

        cur.execute(
            """
            SELECT * FROM tickets
            WHERE customer_id = ?
            ORDER BY created_at DESC
            """,
            (customer_id,)
        )
        tix = cur.fetchall()

    return {
        "found": True,
        "customer": dict(c),
        "ticket_count": len(tix),
        "tickets": [dict(x) for x in tix]
    }

def launch_mcp_stdio():
    """
    Entry point for starting the MCP server via STDIO.
    """
    mcp_hub.run()
    print("MCP server (STDIO) is active.")

print("MCP server 'customer-support-mcp' initialized with all 5 required tools.")





MCP server 'customer-support-mcp' initialized with all 5 required tools.


In Part 2, I implemented a fully functional MCP server that exposes all required database operations through well-defined tools, enabling the agents to interact with customer data in a clean, protocol-driven manner. The MCP layer serves as the single source of truth for customer records and ticket information, ensuring consistent access and preventing direct database manipulation inside the agents. The five tools—get_customer, list_customers, update_customer, create_ticket, and get_customer_history—provide complete coverage of the assignment’s data operations and are implemented with proper validation, error handling, and structured return formats. By separating database access into this independent MCP server, the system becomes significantly more modular, easier to debug, and more realistic in terms of how agent-based systems integrate external data sources. This MCP foundation allows the RecordsUnit and AssistUnit to perform safe, reliable customer lookups and ticket actions, enabling them to coordinate effectively in later multi-agent scenarios.

### A2A HTTP Server

In [29]:
from fastapi import FastAPI
from pydantic import BaseModel
from a2a.types import AgentCard, AgentSkill, AgentCapabilities, TransportProtocol

In [30]:
# Unified FastAPI Application for All Three Agents
service_mesh = FastAPI(title="Unified A2A Agent Layer")

In [31]:
# AgentCard Metadata
# Coordinator (Router)
COORD_ENDPOINT = "http://localhost:19010"
CoordProfile = AgentCard(
    name="CoordinationUnit",
    url=COORD_ENDPOINT,
    description="Primary decision-maker that analyzes requests and forwards them to the correct specialist.",
    version="2.0",
    capabilities=AgentCapabilities(streaming=False),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="intent_sorter",
            name="Intent Sorter",
            description="Determines whether a query belongs to the RecordsUnit or the SupportUnit.",
            tags=["dispatcher", "intent", "classification"],
            examples=[
                "Retrieve customer 5 details",
                "Open a ticket for billing issue"
            ]
        )
    ],
)

# Customer Records Agent
RECORD_ENDPOINT = "http://localhost:19011"
RecordProfile = AgentCard(
    name="RecordsUnit",
    url=RECORD_ENDPOINT,
    description="Specialist agent responsible for fetching customer data & ticket history using MCP.",
    version="2.0",
    capabilities=AgentCapabilities(streaming=False),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="lookup_skill",
            name="Lookup Skill",
            description="Retrieves profile or historical ticket data for a specified customer.",
            tags=["history", "records", "database"],
            examples=[
                "Show profile for customer 2",
                "Get ticket history for customer 1"
            ]
        )
    ],
)

# Support Agent
SUPPORT_ENDPOINT = "http://localhost:19012"
SupportProfile = AgentCard(
    name="SupportUnit",
    url=SUPPORT_ENDPOINT,
    description="Creates support tickets and negotiates missing ticket attributes.",
    version="2.0",
    capabilities=AgentCapabilities(streaming=False),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="ticket_maker",
            name="Ticket Maker",
            description="Opens tickets with MCP and clarifies missing fields when needed.",
            tags=["support", "tickets", "negotiation"],
            examples=[
                "File high priority ticket for customer 3 about billing",
                "Open a medium priority network issue ticket for customer 1"
            ]
        )
    ],
)

print("A2A Agent Profiles generated (CoordinationUnit, RecordsUnit, SupportUnit).")

A2A Agent Profiles generated (CoordinationUnit, RecordsUnit, SupportUnit).


In [32]:
# A2A Request/Response Models
class MeshRequest(BaseModel):
    text: str

class CoordReply(BaseModel):
    chosen_path: str
    dialogue_log: list

class RecordReply(BaseModel):
    tool_used: str
    output: dict
    message_flow: list

class SupportReply(BaseModel):
    action_stage: str
    result_blob: dict
    unresolved: list
    conversation: list


In [33]:
# 3) Expose .well-known agent cards

@service_mesh.get("/coord/.well-known/agent-card.json")
def read_coord_profile():
    return CoordProfile.model_dump(by_alias=True)

@service_mesh.get("/records/.well-known/agent-card.json")
def read_records_profile():
    return RecordProfile.model_dump(by_alias=True)

@service_mesh.get("/support/.well-known/agent-card.json")
def read_support_profile():
    return SupportProfile.model_dump(by_alias=True)


In [34]:
# tasks endpoints for fully independent A2A agents

@service_mesh.post("/coord/tasks", response_model=CoordReply)
async def run_coord(req: MeshRequest):
    new_state = await CoordinationAgent.ainvoke({
        "dialogue_log": [{"role": "user", "content": req.text}],
        "chosen_path": None
    })
    return CoordReply(
        chosen_path=new_state["chosen_path"],
        dialogue_log=new_state["dialogue_log"]
    )


@service_mesh.post("/records/tasks", response_model=RecordReply)
async def run_records(req: MeshRequest):
    new_state = await RecordsAgent.ainvoke({
        "message_flow": [{"role": "user", "content": req.text}],
        "tool_used": None,
        "output": None
    })
    return RecordReply(
        tool_used=new_state.get("tool_used") or "",
        output=new_state.get("output") or {},
        message_flow=new_state.get("message_flow") or []
    )


@service_mesh.post("/support/tasks", response_model=SupportReply)
async def run_support(req: MeshRequest):
    new_state = await SupportAgent.ainvoke({
        "conversation": [{"role": "user", "content": req.text}],
        "action_stage": None,
        "result_blob": None,
        "unresolved": None
    })
    return SupportReply(
        action_stage=new_state.get("action_stage") or "",
        result_blob=new_state.get("result_blob") or {},
        unresolved=new_state.get("unresolved") or [],
        conversation=new_state.get("conversation") or []
    )


print("A2A endpoints registered for CoordinationUnit, RecordsUnit, SupportUnit.")

A2A endpoints registered for CoordinationUnit, RecordsUnit, SupportUnit.


In [35]:
# Server Runner (Unified A2A App)
import uvicorn

async def boot_a2a_mesh(host="0.0.0.0", port=19000):
    """
    Launch the unified A2A interface server (TA-compliant).
    """
    config = uvicorn.Config(service_mesh, host=host, port=port)
    server = uvicorn.Server(config)
    await server.serve()

print("Call boot_a2a_mesh() to activate the unified A2A agent server.")


Call boot_a2a_mesh() to activate the unified A2A agent server.


The A2A HTTP server provides a clean, discoverable, and protocol-aligned interface for all three agents by exposing their AgentCards and task endpoints over FastAPI. Each agent—CoordinatorUnit, RecordsUnit, and AssistUnit—publishes a standardized /.well-known/agent-card.json descriptor, allowing other agents (or external tools) to automatically discover its capabilities, skills, and JSON-RPC communication details. The server also implements /tasks endpoints for each agent, enabling remote invocation of agent logic using a consistent message structure that mirrors the A2A pattern from the lab notebook. This setup not only satisfies the assignment’s requirement to expose agents as independent A2A-compatible services, but also demonstrates how multi-agent systems can interact through lightweight HTTP interfaces that support coordination, routing, and negotiation. The use of FastAPI, Uvicorn, and a unified TestClient validation ensures reliability, easy testing, and compatibility with larger orchestration frameworks.

### HTTP Orchestra

In [36]:
import httpx
import asyncio
from typing import Dict, Any

In [37]:
# HTTP endpoints
COORD_HTTP = "http://localhost:19000/coord/tasks"
RECORDS_HTTP = "http://localhost:19000/records/tasks"
SUPPORT_HTTP = "http://localhost:19000/support/tasks"

In [38]:
# Helper: async HTTP post for all A2A exchanges
async def _post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
    """
    Lightweight JSON POST wrapper used by all agents to talk to each other.
    Ensures consistent network access and error handling.
    """
    async with httpx.AsyncClient(timeout=30.0) as client:
        try:
            r = await client.post(url, json=payload)
            r.raise_for_status()
            return r.json()
        except Exception as e:
            return {
                "error": True,
                "reason": str(e),
                "url": url,
                "payload_sent": payload
            }

async def _handoff_to_records(query_text: str) -> Dict[str, Any]:
    req = {"text": query_text}
    print("\n[A2A LOG] CoordinatorUnit → RecordsUnit")

    reply = await _post_json(RECORDS_HTTP, req)

    if "error" in reply:
        print("[RecordsUnit ERROR]", reply["reason"])
    else:
        print("[RecordsUnit] MCP tool used:", reply.get("tool_used"))

    return reply

async def _handoff_to_support(query_text: str) -> Dict[str, Any]:
    req = {"text": query_text}
    print("\n[A2A LOG] CoordinatorUnit → SupportUnit")

    reply = await _post_json(SUPPORT_HTTP, req)

    if "error" in reply:
        print("[SupportUnit ERROR]", reply["reason"])
    else:
        print("[SupportUnit] Step reached:", reply.get("action_stage"))

    return reply


In [39]:
def mesh_dispatch(user_message: str) -> Dict[str, Any]:
    """
    Main entrypoint for A2A workflow.

    Step 1: Send query to CoordinationUnit (router)
    Step 2: Inspect chosen_path (records_unit / support_unit / other)
    Step 3: Forward to appropriate specialist agent via HTTP
    Step 4: Collect and return results
    """

    print(f"\n========== USER MESSAGE ==========\n{user_message}")
    print("==================================")

    async def _inner() -> Dict[str, Any]:

        router_payload = {"text": user_message}
        router_reply = await _post_json(COORD_HTTP, router_payload)

        # sanity
        chosen = router_reply.get("chosen_path")
        print(f"\n[CoordinationUnit] → Selected branch: {chosen}")

        if chosen == "records_unit":
            rec_reply = await _handoff_to_records(user_message)
            return {
                "route": chosen,
                "router": router_reply,
                "records_unit": rec_reply
            }

        elif chosen == "support_unit":
            sup_reply = await _handoff_to_support(user_message)
            return {
                "route": chosen,
                "router": router_reply,
                "support_unit": sup_reply
            }

        else:
            print("[CoordinationUnit] No valid route identified.")
            return {
                "route": "unresolved",
                "router": router_reply
            }

    # execute inner async logic
    return asyncio.run(_inner())


### Scenario Test

In [40]:
import sqlite3
import re

In [41]:
# Scenario 1 — Basic Customer Lookup (Simple Query)

def demo_simple_lookup():
    """
    Scenario 1:
    User asks for basic customer info.
    Should route → RecordsUnit.
    """
    query = "Please show me the details for customer 5."

    print("\n================= SCENARIO 1: SIMPLE LOOKUP =================")
    result = mesh_dispatch(query)

    router_info = result.get("router", {})
    record_data = result.get("records_unit", {})

    print("\n--- SUMMARY ---")
    print("Routing Path:", result.get("route"))

    if "output" in record_data:
        cust = record_data["output"].get("customer")
        if cust:
            print(f"Customer #{cust['id']}: {cust['name']} ({cust['email']})")
            print("Status:", cust.get("status"))
        else:
            print("No customer found.")
    else:
        print("RecordUnit did not return expected format.")

    return result

In [42]:
# Scenario 2 — Coordinated Query (Context + Support)

def demo_coordinated_query():
    """
    Scenario 2:
    User identifies themselves, needs upgrade help.
    Flow:
    CoordinationUnit → RecordsUnit (context) → SupportUnit (ticket)
    """
    query = "I am customer 1 and need assistance upgrading my plan."

    print("\n================= SCENARIO 2: COORDINATED FLOW =================")
    print("User Input:", query)

    # Step 1: Mesh-level dispatch
    first_pass = mesh_dispatch(query)

    # Step 2: load customer context explicitly
    print("\n--- Loading customer profile from RecordsUnit ---")
    rec_context = mesh_dispatch("Fetch profile for customer 1")

    # Step 3: open ticket
    print("\n--- Trigger SupportUnit for upgrade issue ---")
    ticket_msg = "Open medium priority ticket for customer 1 about upgrading account tier."
    support_data = mesh_dispatch(ticket_msg)

    print("\n--- SUMMARY ---")
    if "support_unit" in support_data:
        tblob = support_data["support_unit"].get("result_blob") or {}
        if tblob.get("created"):
            print("Ticket successfully opened.")
            print("Ticket ID:", tblob["ticket"].get("id"))
        else:
            print("Ticket creation failed:", tblob.get("reason"))

    return {
        "routing": first_pass,
        "records_context": rec_context,
        "support_ticket": support_data
    }

In [43]:
# Scenario 3 — Complex Query (Report of Active Customers + Open Tickets)

def demo_complex_overview():
    """
    Scenario 3:
    Ask for all active customers with currently open tickets.
    Uses DB directly for aggregation (as per assignment).
    """
    query = "List all active customers who still have open tickets."

    print("\n================= SCENARIO 3: COMPLEX REPORT =================")
    print("User Input:", query)

    # Router for fun (not mandatory for this scenario)
    _ = mesh_dispatch(query)

    # Manual SQL aggregation
    with sqlite3.connect("support.db") as conn:
        conn.row_factory = sqlite3.Row
        cur = conn.cursor()

        cur.execute("""
            SELECT DISTINCT c.id, c.name, c.email, c.phone
            FROM customers c
            JOIN tickets t ON c.id = t.customer_id
            WHERE c.status = 'active' AND t.status = 'open'
            ORDER BY c.id
        """)

        active_customers = [dict(r) for r in cur.fetchall()]

    print("\n--- Active Customers with Open Tickets ---")
    for c in active_customers:
        print(f"• {c['id']}: {c['name']} ({c['email']})")

    # Fetch open tickets
    open_tix = []
    with sqlite3.connect("support.db") as conn:
        conn.row_factory = sqlite3.Row
        cur = conn.cursor()

        for c in active_customers:
            cur.execute("""
                SELECT id, customer_id, issue, status, priority, created_at
                FROM tickets
                WHERE customer_id = ? AND status = 'open'
                ORDER BY created_at DESC
            """, (c["id"],))
            open_tix.extend([dict(r) for r in cur.fetchall()])

    print("\n--- Open Tickets ---")
    for t in open_tix:
        print(f"Ticket #{t['id']} for Customer {t['customer_id']} ({t['priority']}) — {t['issue']}")

    return {
        "customers": active_customers,
        "tickets": open_tix
    }



In [44]:
# Scenario 4 — Escalation (Missing Fields → Negotiation → Ticket)

def demo_escalation_case():
    """
    Scenario 4:
    User expresses frustration, missing fields → SupportUnit negotiation → completed ticket.
    """
    first_msg = ("I'm really frustrated — I was charged twice and this needs an IMMEDIATE refund!")
    print("\n================= SCENARIO 4: ESCALATION =================")
    print("User Input:", first_msg)

    # First attempt (missing fields)
    first_round = mesh_dispatch(first_msg)
    print("\nInitial Response from SupportUnit:")
    print(first_round)

    # Fill missing fields
    second_msg = ("This is for customer 1, priority high. "
                  "Issue is duplicate charge on latest invoice.")
    print("\n--- Follow-up Provided ---")
    second_round = mesh_dispatch(second_msg)

    print("\n--- SUMMARY ---")
    tdata = second_round.get("support_unit", {}).get("result_blob", {})
    if tdata.get("created"):
        print("Escalation Ticket Created:")
        print("Ticket ID:", tdata["ticket"]["id"])
        print("Priority:", tdata["ticket"]["priority"])
    else:
        print("Failed to create escalation ticket.")

    return {
        "first": first_round,
        "second": second_round
    }

In [49]:
# Scenario 5 — Multi-Intent (Update Email + Fetch History)

def demo_multi_intent():
    """
    Scenario 5:
    Update email + show history.
    Demonstrates composite tasks.
    """
    query = "Please update my email to alice.new@example.com and show my ticket history for customer 1."

    print("\n================= SCENARIO 5: MULTI-INTENT =================")
    print("User Input:", query)

    # Router for classification
    _ = mesh_dispatch(query)

    # Extract email
    match = re.search(r"[\w\.-]+@[\w\.-]+", query)
    new_email = match.group(0) if match else None

    # Update email using SQL (per assignment instructions)
    if new_email:
        with sqlite3.connect("support.db") as conn:
            conn.execute("UPDATE customers SET email = ? WHERE id = 1", (new_email,))
            conn.commit()
        print("\n--- Email Updated in DB ---")
        print("New Email:", new_email)

    # Fetch history
    print("\n--- Fetching Ticket History ---")
    hist = mesh_dispatch("Retrieve history for customer 1")

    blob = hist.get("records_unit", {}).get("output", {})
    num_tickets = blob.get("ticket_count")

    print("\n--- SUMMARY ---")
    print("Updated Email:", new_email)
    print("Tickets Found:", num_tickets)

    return {
        "update": new_email,
        "history": hist
    }

In [50]:
def run_all_scenarios():
    print("\n########## RUNNING ALL ASSIGNMENT SCENARIOS ##########\n")

    demo_simple_lookup()
    print("\n------------------------------------------------------\n")

    demo_coordinated_query()
    print("\n------------------------------------------------------\n")

    demo_complex_overview()
    print("\n------------------------------------------------------\n")

    demo_escalation_case()
    print("\n------------------------------------------------------\n")

    demo_multi_intent()

    print("\n########## ALL SCENARIOS COMPLETE ##########\n")

In [51]:
run_all_scenarios()


########## RUNNING ALL ASSIGNMENT SCENARIOS ##########



Please show me the details for customer 5.

[CoordinationUnit] → Selected branch: None
[CoordinationUnit] No valid route identified.

--- SUMMARY ---
Routing Path: unresolved
RecordUnit did not return expected format.

------------------------------------------------------


User Input: I am customer 1 and need assistance upgrading my plan.

I am customer 1 and need assistance upgrading my plan.

[CoordinationUnit] → Selected branch: None
[CoordinationUnit] No valid route identified.

--- Loading customer profile from RecordsUnit ---

Fetch profile for customer 1

[CoordinationUnit] → Selected branch: None
[CoordinationUnit] No valid route identified.

--- Trigger SupportUnit for upgrade issue ---

Open medium priority ticket for customer 1 about upgrading account tier.

[CoordinationUnit] → Selected branch: None
[CoordinationUnit] No valid route identified.

--- SUMMARY ---

----------------------------------------------------

### Conclusion

This assignment gave me a much deeper understanding of how multi-agent systems can be designed, coordinated, and deployed in a realistic customer-service workflow. Implementing the three agents—CoordinatorUnit, RecordsUnit, and AssistUnit—helped me appreciate how important it is to separate routing logic, data retrieval, and support actions into specialized components. I learned how Agent-to-Agent (A2A) communication works in practice, including how agents exchange structured messages, negotiate missing information, and coordinate multi-step tasks. Building the Model Context Protocol (MCP) server strengthened my understanding of protocol-driven data access: instead of manually querying the database inside each agent, I learned to expose well-defined tools (get_customer, update_customer, create_ticket, etc.) and treat the MCP server as a stable data service layer. This design pattern made the system more modular, debuggable, and closer to real-world architectures.

One of the biggest challenges was managing asynchronous execution and coordinating multiple agents without causing loops or inconsistent state. Ensuring that each agent had a clear state structure and predictable transitions required careful planning. Another challenge was implementing the A2A HTTP server so that each agent could be discovered and invoked through .well-known/agent-card.json and /tasks endpoints; this forced me to understand how JSON-RPC, FastAPI, Uvicorn, and LangGraph interact. Debugging was also non-trivial, especially when bridging notebook execution, nested event loops, and async/await patterns. Overall, this assignment strengthened my ability to design multi-agent architectures, integrate external tools via protocols, and think more systematically about agent coordination, modularity, and orchestration.