In [2]:
import os
import re
import json
import logging
from typing import TypedDict, Optional, List, Dict, Any
from enum import Enum

import requests
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
os.environ["OPENAI_API_KEY"] = ""  # Set your API key here
MCP_SERVER_URL = "http://127.0.0.1:5000/mcp"
MAX_HOPS = 10  # Prevent infinite loops

In [4]:

def detect_mcp_url() -> str:
    """
    Detects MCP server URL dynamically:
    - Tries ngrok public URL first
    - Falls back to local server
    """
    # 1. Check NGROK_API running
    try:
        r = requests.get("http://127.0.0.1:4040/api/tunnels", timeout=1)
        tunnels = r.json().get("tunnels", [])
        for t in tunnels:
            if "https" in t.get("public_url", ""):
                return t["public_url"] + "/mcp"
    except:
        pass

    # 2. Local fallback
    return "http://127.0.0.1:5000/mcp"

MCP_SERVER_URL = detect_mcp_url()
print("üîó Using MCP Server:", MCP_SERVER_URL)

üîó Using MCP Server: http://127.0.0.1:5000/mcp


In [5]:
# =====================================================
# Enums and Constants
# =====================================================

class ScenarioType(str, Enum):
    """Enumeration of supported scenario types."""
    ACCOUNT_HELP = "account_help"
    CANCEL_BILLING = "cancel_billing"
    PREMIUM_TICKET_STATUS = "premium_ticket_status"
    GENERIC_SUPPORT = "generic_support"


class AgentName(str, Enum):
    """Enumeration of agent names."""
    ROUTER = "Router"
    CUSTOMER_DATA = "CustomerDataAgent"
    SUPPORT = "SupportAgent"


In [6]:
# =====================================================
# State Definition
# =====================================================

class SupportState(TypedDict, total=False):
    """Shared state passed between agents."""
    # Input
    input: str
    
    # Classification
    scenario_type: Optional[ScenarioType]
    customer_id: Optional[str]
    
    # Data retrieved by CustomerDataAgent
    customer_data: Optional[Dict[str, Any]]
    customer_tier: Optional[str]
    premium_ids: Optional[List[str]]
    billing_info: Optional[str]
    ticket_report: Optional[str]
    
    # Support agent state
    support_response: Optional[str]
    router_notes: Optional[str]
    
    # Flow control flags
    support_seen: Optional[bool]
    needs_billing_context: Optional[bool]
    billing_done: Optional[bool]
    
    # Tracing and safety
    a2a_trace: Optional[str]
    hop_count: int
    error: Optional[str]

In [7]:
# =====================================================
# MCP Client
# =====================================================

class MCPClient:
    """Client for communicating with MCP server via HTTP + SSE."""
    
    def __init__(self, server_url: str, timeout: int = 10):
        self.server_url = server_url
        self.timeout = timeout
    
    def send(self, message: Dict[str, Any]) -> Dict[str, Any]:
        """
        Send a JSON-RPC message to the MCP server.
        
        Args:
            message: JSON-RPC message dict
            
        Returns:
            Parsed JSON-RPC response
            
        Raises:
            RuntimeError: If communication fails or no data received
        """
        try:
            resp = requests.post(
                self.server_url,
                json=message,
                stream=True,
                timeout=self.timeout,
            )
            resp.raise_for_status()
            
            # Parse SSE response
            for raw_line in resp.iter_lines():
                if not raw_line:
                    continue
                if raw_line.startswith(b"data: "):
                    payload = raw_line[len(b"data: "):]
                    return json.loads(payload.decode("utf-8"))
            
            raise RuntimeError("No SSE data received from MCP server")
            
        except requests.RequestException as e:
            logger.error(f"MCP request failed: {e}")
            raise RuntimeError(f"MCP communication error: {e}")
    
    def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """
        Call an MCP tool.
        
        Args:
            name: Tool name
            arguments: Tool arguments matching inputSchema
            
        Returns:
            Parsed tool result
            
        Raises:
            RuntimeError: If tool call fails
        """
        message = {
            "jsonrpc": "2.0",
            "id": f"call-{name}",
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments,
            },
        }
        
        data = self.send(message)
        
        if "error" in data:
            error_msg = data["error"]
            logger.error(f"MCP tool error: {error_msg}")
            raise RuntimeError(f"MCP error: {error_msg}")
        
        result = data.get("result", {})
        content = result.get("content", [])
        
        # Extract text content and parse JSON if possible
        if isinstance(content, list) and content:
            first = content[0]
            if isinstance(first, dict) and first.get("type") == "text":
                text = first.get("text", "")
                try:
                    return json.loads(text)
                except json.JSONDecodeError:
                    return text
        
        return result


# Initialize MCP client
mcp_client = MCPClient(MCP_SERVER_URL)

In [8]:
# =====================================================
# MCP Service Layer
# =====================================================

class MCPService:
    """Service layer for MCP operations with error handling."""
    
    @staticmethod
    def get_customer(customer_id: str) -> Dict[str, Any]:
        """Retrieve customer information."""
        try:
            cid = int(customer_id)
        except (TypeError, ValueError):
            return {"success": False, "error": "Invalid customer ID"}
        
        try:
            return mcp_client.call_tool("get_customer", {"customer_id": cid})
        except Exception as e:
            logger.error(f"Failed to get customer {cid}: {e}")
            return {"success": False, "error": str(e)}
    
    @staticmethod
    def get_billing_info(customer_id: str) -> str:
        """Build billing summary from customer history."""
        try:
            cid = int(customer_id)
        except (TypeError, ValueError):
            return "Invalid customer ID for billing lookup."
        
        try:
            result = mcp_client.call_tool("get_customer_history", {"customer_id": cid})
            
            if not result.get("success"):
                return "Unable to retrieve billing history."
            
            tickets = result.get("tickets", [])
            billing_keywords = ["bill", "invoice", "payment", "charge"]
            
            billing_tickets = [
                t for t in tickets
                if any(kw in t.get("issue", "").lower() for kw in billing_keywords)
            ]
            
            if not billing_tickets:
                return "No billing-related issues found for this customer."
            
            lines = [
                f"- [{t.get('created_at')}] "
                f"({t.get('priority', '').upper()}/{t.get('status')}) "
                f"{t.get('issue')}"
                for t in billing_tickets
            ]
            
            return "Recent billing-related tickets:\n" + "\n".join(lines)
            
        except Exception as e:
            logger.error(f"Failed to get billing info for {cid}: {e}")
            return f"Error retrieving billing information: {e}"
    
    @staticmethod
    def get_premium_customers() -> List[str]:
        """Identify premium customers (those with high-priority tickets)."""
        try:
            result = mcp_client.call_tool("list_customers", {})
            
            if not result.get("success"):
                logger.warning("Failed to list customers")
                return []
            
            customers = result.get("customers", [])
            premium_ids = []
            
            for customer in customers:
                cid = customer.get("id")
                if cid is None:
                    continue
                
                try:
                    hist = mcp_client.call_tool("get_customer_history", {"customer_id": cid})
                    
                    if hist.get("success"):
                        tickets = hist.get("tickets", [])
                        if any(t.get("priority") == "high" for t in tickets):
                            premium_ids.append(str(cid))
                            
                except Exception as e:
                    logger.warning(f"Failed to check premium status for customer {cid}: {e}")
                    continue
            
            return premium_ids
            
        except Exception as e:
            logger.error(f"Failed to get premium customers: {e}")
            return []
    
    @staticmethod
    def get_high_priority_tickets(customer_ids: List[str]) -> str:
        """Aggregate high-priority tickets for given customers."""
        lines = []
        
        for cid_str in customer_ids:
            try:
                cid = int(cid_str)
            except (TypeError, ValueError):
                continue
            
            try:
                hist = mcp_client.call_tool("get_customer_history", {"customer_id": cid})
                
                if not hist.get("success"):
                    continue
                
                tickets = hist.get("tickets", [])
                high_priority = [t for t in tickets if t.get("priority") == "high"]
                
                for ticket in high_priority:
                    lines.append(
                        f"Customer #{cid} ‚Üí "
                        f"Ticket #{ticket.get('id')} "
                        f"[{ticket.get('status')}]: {ticket.get('issue')} "
                        f"({ticket.get('created_at')})"
                    )
                    
            except Exception as e:
                logger.warning(f"Failed to get tickets for customer {cid}: {e}")
                continue
        
        if not lines:
            return "No high-priority tickets found for the given premium customers."
        
        return "High-priority tickets:\n" + "\n".join(lines)

In [9]:
# =====================================================
# Utility Functions
# =====================================================

def classify_scenario(text: str) -> ScenarioType:
    """Classify user query into scenario type."""
    query_lower = text.lower()
    
    if all(kw in query_lower for kw in ["high-priority", "premium", "ticket"]):
        return ScenarioType.PREMIUM_TICKET_STATUS
    
    if "cancel" in query_lower and any(kw in query_lower for kw in ["billing", "charge", "payment"]):
        return ScenarioType.CANCEL_BILLING
    
    if any(kw in query_lower for kw in ["customer id", "account", "my account"]):
        return ScenarioType.ACCOUNT_HELP
    
    return ScenarioType.GENERIC_SUPPORT


def extract_customer_id(text: str) -> Optional[str]:
    """Extract customer ID from text (first 3+ digit number)."""
    match = re.search(r"\b(\d{3,})\b", text)
    return match.group(1) if match else None


def add_trace(state: SupportState, message: str) -> str:
    """Add a trace message to the state."""
    current_trace = state.get("a2a_trace", "")
    return current_trace + message + "\n"

In [10]:
# =====================================================
# Agent Implementations
# =====================================================

def router_agent(state: SupportState) -> SupportState:
    """
    Router Agent - Orchestrates the workflow.
    
    Responsibilities:
    - Classify incoming queries
    - Track hop count to prevent infinite loops
    - Coordinate between specialist agents
    """
    hop_count = state.get("hop_count", 0) + 1
    trace = add_trace(state, f"Router: hop_count={hop_count}")
    
    # Safety check: prevent infinite loops
    if hop_count > MAX_HOPS:
        logger.error(f"Maximum hop count ({MAX_HOPS}) exceeded")
        return {
            "hop_count": hop_count,
            "a2a_trace": trace + "Router: MAX_HOPS exceeded, terminating.\n",
            "error": "Maximum routing iterations exceeded",
            "support_response": "I apologize, but I'm having trouble processing your request. Please try rephrasing or contact support directly."
        }
    
    updated: SupportState = {
        "hop_count": hop_count,
        "a2a_trace": trace,
    }
    
    # Initial classification
    if not state.get("scenario_type"):
        scenario = classify_scenario(state["input"])
        customer_id = state.get("customer_id") or extract_customer_id(state["input"])
        
        trace = add_trace(
            {"a2a_trace": trace},
            f"Router: classified scenario={scenario.value}, customer_id={customer_id}"
        )
        
        updated.update({
            "scenario_type": scenario,
            "customer_id": customer_id,
            "router_notes": f"Scenario detected: {scenario.value}",
            "a2a_trace": trace,
        })
    else:
        # Revisiting - just add trace
        trace = add_trace(
            {"a2a_trace": trace},
            "Router: revisiting to decide next hop"
        )
        updated["a2a_trace"] = trace
    
    return updated


def customer_data_agent(state: SupportState) -> SupportState:
    """
    Customer Data Agent - Handles data retrieval via MCP.
    
    Responsibilities:
    - Fetch customer information
    - Retrieve billing history
    - Identify premium customers
    """
    scenario = state.get("scenario_type")
    customer_id = state.get("customer_id") or extract_customer_id(state["input"])
    trace = state.get("a2a_trace", "")
    
    try:
        if scenario == ScenarioType.ACCOUNT_HELP:
            # Scenario 1: Retrieve customer account details
            customer = MCPService.get_customer(customer_id) if customer_id else {}
            
            if customer.get("success"):
                customer_data = customer.get("customer", {})
                tier = customer_data.get("tier", "standard")
                trace = add_trace(
                    {"a2a_trace": trace},
                    f"CustomerDataAgent: fetched customer={customer_id}, tier={tier}"
                )
                
                return {
                    "customer_id": customer_id,
                    "customer_data": customer_data,
                    "customer_tier": tier,
                    "a2a_trace": trace,
                }
            else:
                trace = add_trace(
                    {"a2a_trace": trace},
                    f"CustomerDataAgent: failed to fetch customer - {customer.get('error')}"
                )
                return {"a2a_trace": trace, "error": customer.get("error")}
        
        elif scenario == ScenarioType.CANCEL_BILLING:
            # Scenario 2: Retrieve billing information
            billing_info = MCPService.get_billing_info(customer_id or "unknown")
            trace = add_trace(
                {"a2a_trace": trace},
                f"CustomerDataAgent: retrieved billing info for {customer_id}"
            )
            
            return {
                "customer_id": customer_id,
                "billing_info": billing_info,
                "billing_done": True,
                "a2a_trace": trace,
            }
        
        elif scenario == ScenarioType.PREMIUM_TICKET_STATUS:
            # Scenario 3: Identify premium customers
            premium_ids = MCPService.get_premium_customers()
            trace = add_trace(
                {"a2a_trace": trace},
                f"CustomerDataAgent: found {len(premium_ids)} premium customers"
            )
            
            return {
                "premium_ids": premium_ids,
                "a2a_trace": trace,
            }
        
        # No action needed for this scenario
        trace = add_trace(
            {"a2a_trace": trace},
            "CustomerDataAgent: no action required for this scenario"
        )
        return {"a2a_trace": trace}
        
    except Exception as e:
        logger.error(f"CustomerDataAgent error: {e}")
        trace = add_trace(
            {"a2a_trace": trace},
            f"CustomerDataAgent: error occurred - {e}"
        )
        return {"a2a_trace": trace, "error": str(e)}


def support_agent(state: SupportState) -> SupportState:
    """
    Support Agent - Generates customer-facing responses.
    
    Responsibilities:
    - Craft appropriate responses based on scenario
    - Coordinate with CustomerDataAgent when needed
    - Handle escalations and edge cases
    """
    scenario = state.get("scenario_type")
    trace = state.get("a2a_trace", "")
    
    try:
        # Scenario 1: Account Help
        if scenario == ScenarioType.ACCOUNT_HELP:
            customer = state.get("customer_data", {})
            tier = state.get("customer_tier", "standard")
            name = customer.get("name", "valued customer")
            
            if tier == "premium":
                response = (
                    f"Hi {name}, I can help with your account.\n\n"
                    f"I see you're a premium customer with priority support access.\n"
                    f"You have 24/7 support availability and faster response times.\n\n"
                    "Please let me know the specific issue with your account "
                    "(login, billing, settings, etc.)."
                )
            else:
                response = (
                    f"Hi {name}, I'm here to help with your account.\n\n"
                    "Please tell me more about the issue you're experiencing, "
                    "and I'll do my best to resolve it for you."
                )
            
            trace = add_trace(
                {"a2a_trace": trace},
                f"SupportAgent: generated account help response for tier={tier}"
            )
            
            return {
                "support_response": response,
                "support_seen": True,
                "a2a_trace": trace,
            }
        
        # Scenario 2: Cancellation with Billing Issues
        elif scenario == ScenarioType.CANCEL_BILLING:
            billing_info = state.get("billing_info")
            support_seen = state.get("support_seen", False)
            
            # First time: request billing context
            if not support_seen and billing_info is None:
                trace = add_trace(
                    {"a2a_trace": trace},
                    "SupportAgent: requesting billing context from CustomerDataAgent"
                )
                return {
                    "support_seen": True,
                    "needs_billing_context": True,
                    "a2a_trace": trace,
                }
            
            # Generate response with billing context
            if billing_info:
                response = (
                    "I understand you'd like to cancel your subscription and "
                    "you're experiencing billing issues.\n\n"
                    f"{billing_info}\n\n"
                    "Let me help you resolve the billing problem first. "
                    "Once that's cleared, I can:\n"
                    "1. Process your cancellation immediately, or\n"
                    "2. Help you switch to a lower-cost plan if you'd prefer\n\n"
                    "Which would you prefer?"
                )
                
                trace = add_trace(
                    {"a2a_trace": trace},
                    "SupportAgent: generated coordinated cancellation response"
                )
                
                return {
                    "support_response": response,
                    "needs_billing_context": False,
                    "a2a_trace": trace,
                }
            
            # Waiting for billing info
            trace = add_trace(
                {"a2a_trace": trace},
                "SupportAgent: waiting for billing_info"
            )
            return {"a2a_trace": trace}
        
        # Scenario 3: Premium Ticket Status
        elif scenario == ScenarioType.PREMIUM_TICKET_STATUS:
            premium_ids = state.get("premium_ids")
            ticket_report = state.get("ticket_report")
            
            # Generate ticket report if we have premium IDs
            if premium_ids and not ticket_report:
                ticket_report = MCPService.get_high_priority_tickets(premium_ids)
                trace = add_trace(
                    {"a2a_trace": trace},
                    f"SupportAgent: generated report for {len(premium_ids)} premium customers"
                )
                
                response = (
                    "Here's the status of all high-priority tickets for premium customers:\n\n"
                    f"{ticket_report}\n\n"
                    "Would you like me to:\n"
                    "- Generate a detailed CSV report\n"
                    "- Group tickets by customer or status\n"
                    "- Escalate any specific tickets"
                )
                
                return {
                    "ticket_report": ticket_report,
                    "support_response": response,
                    "a2a_trace": trace,
                }
            
            # Still waiting for premium IDs
            if not premium_ids:
                trace = add_trace(
                    {"a2a_trace": trace},
                    "SupportAgent: waiting for premium customer list"
                )
                return {"a2a_trace": trace}
        
        # Generic Support
        else:
            response = (
                "Thank you for contacting support. "
                "I'm here to help with any questions or issues you have.\n\n"
                "Could you please provide more details about what you need assistance with?"
            )
            
            trace = add_trace(
                {"a2a_trace": trace},
                "SupportAgent: generated generic support response"
            )
            
            return {
                "support_response": response,
                "a2a_trace": trace,
            }
    
    except Exception as e:
        logger.error(f"SupportAgent error: {e}")
        trace = add_trace(
            {"a2a_trace": trace},
            f"SupportAgent: error occurred - {e}"
        )
        
        return {
            "a2a_trace": trace,
            "error": str(e),
            "support_response": "I apologize, but I encountered an error. Please try again or contact support directly."
        }

In [11]:
# =====================================================
# Graph Construction
# =====================================================

def build_support_graph() -> StateGraph:
    """Construct the LangGraph workflow."""
    graph = StateGraph(SupportState)
    
    # Add nodes
    graph.add_node(AgentName.ROUTER.value, router_agent)
    graph.add_node(AgentName.CUSTOMER_DATA.value, customer_data_agent)
    graph.add_node(AgentName.SUPPORT.value, support_agent)
    
    # Start with router
    graph.add_edge(START, AgentName.ROUTER.value)
    
    # Agents return to router for next decision
    graph.add_edge(AgentName.CUSTOMER_DATA.value, AgentName.ROUTER.value)
    graph.add_edge(AgentName.SUPPORT.value, AgentName.ROUTER.value)
    
    # Router decision logic
    def route_decision(state: SupportState) -> str:
        """Determine next agent based on current state."""
        # Check for errors or max hops
        if state.get("error") or state.get("hop_count", 0) >= MAX_HOPS:
            return "END"
        
        scenario = state.get("scenario_type")
        
        # Scenario 1: Account Help
        if scenario == ScenarioType.ACCOUNT_HELP:
            if not state.get("customer_data"):
                return AgentName.CUSTOMER_DATA.value
            if not state.get("support_response"):
                return AgentName.SUPPORT.value
            return "END"
        
        # Scenario 2: Cancellation + Billing
        elif scenario == ScenarioType.CANCEL_BILLING:
            if not state.get("support_seen"):
                return AgentName.SUPPORT.value
            if state.get("needs_billing_context") and not state.get("billing_done"):
                return AgentName.CUSTOMER_DATA.value
            if state.get("billing_done") and not state.get("support_response"):
                return AgentName.SUPPORT.value
            return "END"
        
        # Scenario 3: Premium Ticket Status
        elif scenario == ScenarioType.PREMIUM_TICKET_STATUS:
            if not state.get("premium_ids"):
                return AgentName.CUSTOMER_DATA.value
            if not state.get("ticket_report"):
                return AgentName.SUPPORT.value
            return "END"
        
        # Generic: Direct to support
        if not state.get("support_response"):
            return AgentName.SUPPORT.value
        return "END"
    
    graph.add_conditional_edges(
        AgentName.ROUTER.value,
        route_decision,
        {
            AgentName.CUSTOMER_DATA.value: AgentName.CUSTOMER_DATA.value,
            AgentName.SUPPORT.value: AgentName.SUPPORT.value,
            "END": END,
        },
    )
    
    return graph.compile()

In [12]:
# =====================================================
# Main Execution
# =====================================================

# Initialize the graph
router_system = build_support_graph()

    
def ask_query( query: str):
    print(f"\n{'='*70}")
    print(f"Query: {query}")
    print(f"{'='*70}")
    try:
        result = router_system.invoke({"input": query, "hop_count": 0})
        
        # Display response
        response = result.get("support_response")
        if response:
            print(f"\nüìã RESPONSE:\n{response}")
        
        # Display error if any
        error = result.get("error")
        if error:
            print(f"\n‚ùå ERROR: {error}")
        
        # Display trace (optional - comment out for cleaner output)
        trace = result.get("a2a_trace", "")
        if trace:
            print(f"\nüîç TRACE:\n{trace}")
        
        print(f"\n{'='*70}\n")
        
    except Exception as e:
        logger.error(f"Failed to process query: {e}")
        print(f"\n‚ùå SYSTEM ERROR: {e}\n")
        print(f"{'='*70}\n")

In [39]:
ask_query("I want to cancel my subscription but I'm having billing issues")


Query: I want to cancel my subscription but I'm having billing issues

üìã RESPONSE:
I understand you'd like to cancel your subscription and you're experiencing billing issues.

Invalid customer ID for billing lookup.

Let me help you resolve the billing problem first. Once that's cleared, I can:
1. Process your cancellation immediately, or
2. Help you switch to a lower-cost plan if you'd prefer

Which would you prefer?

üîç TRACE:
Router: hop_count=1
Router: classified scenario=cancel_billing, customer_id=None
SupportAgent: requesting billing context from CustomerDataAgent
Router: hop_count=2
Router: revisiting to decide next hop
CustomerDataAgent: retrieved billing info for None
Router: hop_count=3
Router: revisiting to decide next hop
SupportAgent: generated coordinated cancellation response
Router: hop_count=4
Router: revisiting to decide next hop





In [40]:
ask_query("I need help with my account, customer ID 12345")


Query: I need help with my account, customer ID 12345

‚ùå ERROR: Customer with ID 12345 not found

üîç TRACE:
Router: hop_count=1
Router: classified scenario=account_help, customer_id=12345
CustomerDataAgent: failed to fetch customer - Customer with ID 12345 not found
Router: hop_count=2
Router: revisiting to decide next hop





In [35]:
ask_query("What's the status of all high-priority tickets for premium customers?")


Query: What's the status of all high-priority tickets for premium customers?

üìã RESPONSE:
Here's the status of all high-priority tickets for premium customers:

High-priority tickets:
Customer #4 ‚Üí Ticket #2 [in_progress]: Database connection timeout errors (2025-11-27 23:12:44)
Customer #7 ‚Üí Ticket #3 [open]: Payment processing failing for all transactions (2025-11-27 23:12:44)
Customer #10 ‚Üí Ticket #4 [in_progress]: Critical security vulnerability found (2025-11-27 23:12:44)
Customer #1 ‚Üí Ticket #1 [open]: Cannot login to account (2025-11-27 23:12:44)
Customer #14 ‚Üí Ticket #5 [resolved]: Website completely down (2025-11-27 23:12:44)

Would you like me to:
- Generate a detailed CSV report
- Group tickets by customer or status
- Escalate any specific tickets

üîç TRACE:
Router: hop_count=1
Router: classified scenario=premium_ticket_status, customer_id=None
CustomerDataAgent: found 5 premium customers
Router: hop_count=2
Router: revisiting to decide next hop
SupportAgent: