<a href="https://colab.research.google.com/github/MudMonster341/Agentic_AI/blob/main/Agent_Routing_Anthropic_13-11-2025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade anthropic
!pip install nest_asyncio

Collecting anthropic
  Downloading anthropic-0.72.1-py3-none-any.whl.metadata (28 kB)
Downloading anthropic-0.72.1-py3-none-any.whl (357 kB)
[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/357.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m357.4/357.4 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: anthropic
Successfully installed anthropic-0.72.1


In [None]:
import anthropic
import asyncio
import nest_asyncio
import time
import random
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Dict
from threading import Lock
from collections import defaultdict
import os
from getpass import getpass

# Enable nested asyncio for Colab
nest_asyncio.apply()

print("‚úì All imports successful!")

‚úì All imports successful!


In [None]:
# Secure API key input for Colab
from google.colab import userdata

# Option 1: Use Colab Secrets (Recommended)
# Go to the key icon (üîë) in left sidebar > Add Secret > Name: ANTHROPIC_API_KEY
try:
    api_key = userdata.get('ANTHROPIC_API_KEY')
    print("‚úì API key loaded from Colab Secrets")
except:
    # Option 2: Manual input (will be hidden)
    api_key = getpass("Enter your Anthropic API Key: ")
    print("‚úì API key entered manually")

# Validate API key format
if not api_key or not api_key.startswith('sk-ant-'):
    raise ValueError("Invalid API key format. Should start with 'sk-ant-'")

# Initialize Anthropic clients
client = anthropic.Anthropic(api_key=api_key)
async_client = anthropic.AsyncAnthropic(api_key=api_key)

print("‚úì Anthropic client initialized successfully")

‚úì API key loaded from Colab Secrets
‚úì Anthropic client initialized successfully


In [None]:
class AgentType(Enum):
    ORDER_STATUS = "order_status"
    REFUND = "refund"
    TECHNICAL_SUPPORT = "technical_support"
    ESCALATION = "escalation"

class QueryType(Enum):
    TRACKING = "tracking"
    REFUND = "refund"
    TECHNICAL = "technical"
    ESCALATION = "escalation"

@dataclass
class Query:
    id: int
    content: str
    query_type: QueryType
    priority: int  # 1-5, 5 being highest
    timestamp: float
    customer_context: Optional[str] = None

@dataclass
class AgentCapability:
    agent_type: AgentType
    model_name: str  # Claude model to use
    max_tokens: int
    temperature: float
    system_prompt: str
    max_concurrent_tasks: int
    capabilities: List[QueryType]

print("‚úì Data structures defined")


‚úì Data structures defined


In [None]:
class ClaudeAgent:
    """Agent powered by actual Claude API"""

    def __init__(self, agent_id: int, capability: AgentCapability, async_client):
        self.agent_id = agent_id
        self.capability = capability
        self.async_client = async_client
        self.current_tasks = 0
        self.completed_tasks = 0
        self.failed_tasks = 0
        self.total_processing_time = 0.0
        self.lock = Lock()

    def is_available(self) -> bool:
        with self.lock:
            return self.current_tasks < self.capability.max_concurrent_tasks

    def can_handle(self, query: Query) -> bool:
        return query.query_type in self.capability.capabilities

    async def process_query(self, query: Query) -> dict:
        """Process query using Claude API"""
        with self.lock:
            self.current_tasks += 1

        start_time = time.time()

        try:
            # Construct messages for Claude
            user_message = f"""Customer Query: {query.content}

Query Type: {query.query_type.value}
Priority: {query.priority}/5
{f'Context: {query.customer_context}' if query.customer_context else ''}

Please provide a helpful, professional response to this customer query."""

            # Make async API call to Claude
            message = await self.async_client.messages.create(
                model=self.capability.model_name,
                max_tokens=self.capability.max_tokens,
                temperature=self.capability.temperature,
                system=self.capability.system_prompt,
                messages=[
                    {"role": "user", "content": user_message}
                ]
            )

            # Extract response
            response_text = message.content[0].text
            processing_time = time.time() - start_time

            with self.lock:
                self.current_tasks -= 1
                self.completed_tasks += 1
                self.total_processing_time += processing_time

            return {
                'success': True,
                'response': response_text,
                'processing_time': processing_time,
                'model': self.capability.model_name,
                'tokens_used': message.usage.input_tokens + message.usage.output_tokens
            }

        except anthropic.RateLimitError as e:
            # Handle rate limiting
            with self.lock:
                self.current_tasks -= 1
                self.failed_tasks += 1

            return {
                'success': False,
                'error': 'rate_limit',
                'message': 'API rate limit exceeded',
                'processing_time': time.time() - start_time
            }

        except Exception as e:
            with self.lock:
                self.current_tasks -= 1
                self.failed_tasks += 1

            return {
                'success': False,
                'error': str(type(e).__name__),
                'message': str(e),
                'processing_time': time.time() - start_time
            }

    def get_stats(self) -> dict:
        with self.lock:
            total = self.completed_tasks + self.failed_tasks
            return {
                'agent_id': self.agent_id,
                'type': self.capability.agent_type.value,
                'model': self.capability.model_name,
                'completed': self.completed_tasks,
                'failed': self.failed_tasks,
                'success_rate': self.completed_tasks / total if total > 0 else 0,
                'avg_processing_time': self.total_processing_time / total if total > 0 else 0,
                'current_load': self.current_tasks
            }

print("‚úì ClaudeAgent class defined")


‚úì ClaudeAgent class defined


In [None]:
def create_claude_agent_pool(async_client) -> List[ClaudeAgent]:
    """Create pool of Claude-powered agents with different specializations"""
    agents = []

    # Order Status Agents - Fast responses with Claude 3.5 Haiku
    for i in range(2):
        agents.append(ClaudeAgent(
            agent_id=i,
            capability=AgentCapability(
                agent_type=AgentType.ORDER_STATUS,
                model_name="claude-3-5-haiku-20241022",  # Fast & cost-effective
                max_tokens=300,
                temperature=0.3,
                system_prompt="""You are an Order Status Agent for an e-commerce platform.
Your role is to provide quick, accurate information about order tracking and delivery status.
Be concise, friendly, and reassuring. Always include estimated delivery dates when applicable.""",
                max_concurrent_tasks=3,
                capabilities=[QueryType.TRACKING]
            ),
            async_client=async_client
        ))

    # Refund Agents - Moderate complexity with Claude 3.5 Sonnet
    for i in range(2, 4):
        agents.append(ClaudeAgent(
            agent_id=i,
            capability=AgentCapability(
                agent_type=AgentType.REFUND,
                model_name="claude-3-5-sonnet-20241022",  # Balanced performance
                max_tokens=500,
                temperature=0.2,
                system_prompt="""You are a Refund Processing Agent for an e-commerce platform.
Handle refund requests professionally, explain the refund process clearly, and mention approval timelines.
Be empathetic but follow policy guidelines. Mention: processing takes 3-5 business days.""",
                max_concurrent_tasks=2,
                capabilities=[QueryType.REFUND]
            ),
            async_client=async_client
        ))

    # Technical Support Agents - Complex issues with Claude 3.5 Sonnet
    for i in range(4, 5):
        agents.append(ClaudeAgent(
            agent_id=i,
            capability=AgentCapability(
                agent_type=AgentType.TECHNICAL_SUPPORT,
                model_name="claude-3-5-sonnet-20241022",
                max_tokens=800,
                temperature=0.1,
                system_prompt="""You are a Technical Support Agent for an e-commerce platform.
Troubleshoot complex technical issues including app crashes, payment failures, and website errors.
Provide step-by-step solutions. Be patient and thorough. Escalate if issue requires developer intervention.""",
                max_concurrent_tasks=2,
                capabilities=[QueryType.TECHNICAL]
            ),
            async_client=async_client
        ))

    # Escalation Agent - Handles complaints with Claude 3.5 Sonnet (best quality)
    agents.append(ClaudeAgent(
        agent_id=5,
        capability=AgentCapability(
            agent_type=AgentType.ESCALATION,
            model_name="claude-3-5-sonnet-20241022",
            max_tokens=700,
            temperature=0.4,
            system_prompt="""You are an Escalation Agent handling sensitive customer complaints.
Be extremely empathetic, acknowledge frustrations, and offer concrete solutions or compensation.
You can handle any query type in escalation mode. Prioritize customer satisfaction and retention.""",
            max_concurrent_tasks=2,
            capabilities=[QueryType.ESCALATION, QueryType.REFUND, QueryType.TECHNICAL, QueryType.TRACKING]
        ),
        async_client=async_client
    ))

    return agents

print("‚úì Agent factory defined")


‚úì Agent factory defined


In [None]:
from abc import ABC, abstractmethod

class Router(ABC):
    """Abstract base class for routing strategies"""

    @abstractmethod
    def route(self, query: Query, agents: List[ClaudeAgent]) -> Optional[ClaudeAgent]:
        pass

    @abstractmethod
    def get_name(self) -> str:
        pass

class CapabilityMatchingRouter(Router):
    """Routes based on agent capabilities and performance"""

    def route(self, query: Query, agents: List[ClaudeAgent]) -> Optional[ClaudeAgent]:
        scored_agents = []

        for agent in agents:
            if not agent.can_handle(query):
                continue

            # Calculate comprehensive score
            availability_score = 1.0 if agent.is_available() else 0.3

            stats = agent.get_stats()
            success_score = stats['success_rate'] if stats['success_rate'] > 0 else 0.7

            # Model speed approximation (Haiku faster than Sonnet)
            speed_score = 1.0 if 'haiku' in agent.capability.model_name.lower() else 0.7

            load_score = 1.0 - (agent.current_tasks / agent.capability.max_concurrent_tasks)

            # Weighted combination
            total_score = (
                availability_score * 0.4 +
                success_score * 0.3 +
                speed_score * 0.2 +
                load_score * 0.1
            )

            scored_agents.append((agent, total_score))

        if not scored_agents:
            # Fallback: try escalation agent
            escalation = [a for a in agents if a.capability.agent_type == AgentType.ESCALATION]
            return escalation[0] if escalation else None

        return max(scored_agents, key=lambda x: x[1])[0]

    def get_name(self) -> str:
        return "Capability-Matching (Claude-Powered)"

class PriorityRouter(Router):
    """Priority-based routing with smart fallbacks"""

    def route(self, query: Query, agents: List[ClaudeAgent]) -> Optional[ClaudeAgent]:
        # High priority queries go to best available agent
        if query.priority >= 4:
            capable = [a for a in agents if a.can_handle(query)]
            if capable:
                # Prefer available agents, then least loaded
                available = [a for a in capable if a.is_available()]
                pool = available if available else capable
                return min(pool, key=lambda a: a.current_tasks)

        # Normal priority - standard matching
        capable = [a for a in agents if a.can_handle(query) and a.is_available()]

        if not capable:
            # All busy - find escalation
            escalation = [a for a in agents if a.capability.agent_type == AgentType.ESCALATION]
            return escalation[0] if escalation else None

        return random.choice(capable)

    def get_name(self) -> str:
        return "Priority-Based (Claude-Powered)"

print("‚úì Routing mechanisms defined")


‚úì Routing mechanisms defined


In [None]:
def generate_realistic_queries(num_queries: int = 10, load_spike: bool = False) -> List[Query]:
    """Generate realistic customer service queries"""
    queries = []

    # Distribution based on typical e-commerce patterns
    if load_spike:
        weights = [0.70, 0.20, 0.05, 0.05]  # Black Friday - lots of tracking
    else:
        weights = [0.50, 0.30, 0.15, 0.05]  # Normal distribution

    query_templates = {
        QueryType.TRACKING: [
            "Where is my package? Order #12345",
            "My order hasn't arrived yet. Can you check the status?",
            "When will my order be delivered? It's been 5 days.",
            "I need to track my shipment urgently"
        ],
        QueryType.REFUND: [
            "I want a refund for my order. The product is defective.",
            "Can I get my money back? This isn't what I ordered.",
            "How do I request a refund? Product arrived damaged.",
            "I'd like to return this item and get a refund"
        ],
        QueryType.TECHNICAL: [
            "Your app crashed when I tried to checkout!",
            "I can't login to my account. Getting error 500.",
            "The website keeps freezing when I add items to cart.",
            "Payment not processing - says invalid card but it works elsewhere"
        ],
        QueryType.ESCALATION: [
            "This is unacceptable! I've been waiting 3 weeks!",
            "I want to speak to a manager NOW. This is ridiculous.",
            "Worst customer service ever. I'm filing a complaint.",
            "I demand compensation for this terrible experience!"
        ]
    }

    contexts = [
        "Customer has ordered 5 times before",
        "First-time customer",
        "Premium member since 2023",
        "Had previous issue resolved successfully",
        None
    ]

    query_types = list(QueryType)

    for i in range(num_queries):
        qtype = random.choices(query_types, weights=weights)[0]
        template = random.choice(query_templates[qtype])
        priority = 5 if qtype == QueryType.ESCALATION else random.randint(1, 4)

        queries.append(Query(
            id=i + 1,
            content=template,
            query_type=qtype,
            priority=priority,
            timestamp=time.time(),
            customer_context=random.choice(contexts)
        ))

    return queries

print("‚úì Query generator defined")


‚úì Query generator defined


In [None]:
async def run_claude_simulation(
    router: Router,
    num_queries: int = 10,
    show_responses: bool = True,
    rate_limit_delay: float = 0.5  # Delay between requests to respect rate limits
):
    """Run simulation with actual Claude API calls"""

    print(f"\n{'='*70}")
    print(f"üöÄ RUNNING SIMULATION: {router.get_name()}")
    print(f"üìä Processing {num_queries} queries with real Claude models")
    print(f"{'='*70}\n")

    # Initialize
    agents = create_claude_agent_pool(async_client)
    queries = generate_realistic_queries(num_queries)

    # Track metrics
    results = []
    total_tokens = 0
    start_time = time.time()

    # Process queries with rate limit consideration
    for idx, query in enumerate(queries):
        print(f"\n[Query {query.id}/{num_queries}] Type: {query.query_type.value} | Priority: {query.priority}")
        print(f"  üìù Content: {query.content}")

        # Route query
        agent = router.route(query, agents)

        if agent is None:
            print(f"  ‚ùå No agent available")
            results.append({'success': False, 'reason': 'no_agent'})
            continue

        print(f"  ‚û°Ô∏è  Routed to: Agent {agent.agent_id} ({agent.capability.agent_type.value}) using {agent.capability.model_name}")

        # Process with Claude
        result = await agent.process_query(query)
        results.append(result)

        if result['success']:
            total_tokens += result.get('tokens_used', 0)
            print(f"  ‚úÖ Success | Time: {result['processing_time']:.2f}s | Tokens: {result.get('tokens_used', 0)}")

            if show_responses:
                response_preview = result['response'][:200] + "..." if len(result['response']) > 200 else result['response']
                print(f"  üí¨ Response: {response_preview}\n")
        else:
            print(f"  ‚ùå Failed: {result.get('error', 'unknown')} - {result.get('message', '')}")

        # Rate limiting: Add delay between requests
        if idx < len(queries) - 1:
            await asyncio.sleep(rate_limit_delay)

    total_time = time.time() - start_time

    # Print summary
    print(f"\n{'='*70}")
    print(f"üìà SIMULATION SUMMARY")
    print(f"{'='*70}")

    successful = sum(1 for r in results if r.get('success', False))
    failed = len(results) - successful

    print(f"Total Queries: {len(results)}")
    print(f"‚úÖ Successful: {successful} ({successful/len(results)*100:.1f}%)")
    print(f"‚ùå Failed: {failed}")
    print(f"‚è±Ô∏è  Total Time: {total_time:.2f}s")
    print(f"üéØ Throughput: {len(results)/total_time:.2f} queries/second")
    print(f"ü™ô Total Tokens Used: {total_tokens:,}")

    # Agent performance
    print(f"\n{'='*70}")
    print(f"ü§ñ AGENT PERFORMANCE")
    print(f"{'='*70}")
    print(f"{'ID':<4} {'Type':<20} {'Model':<30} {'Done':<6} {'Failed':<6} {'Rate':<8}")
    print("-" * 70)

    for agent in agents:
        stats = agent.get_stats()
        print(f"{stats['agent_id']:<4} {stats['type']:<20} {stats['model']:<30} "
              f"{stats['completed']:<6} {stats['failed']:<6} {stats['success_rate']:<8.1%}")

    print(f"{'='*70}\n")

    return results, agents

print("‚úì Simulation function defined")

‚úì Simulation function defined


In [None]:
# Run a small simulation with real Claude API calls
# Start with just 5 queries to test

router = CapabilityMatchingRouter()
results, agents = await run_claude_simulation(
    router=router,
    num_queries=5,
    show_responses=True,  # Set to False to hide detailed responses
    rate_limit_delay=0.5  # Adjust based on your rate limits
)


üöÄ RUNNING SIMULATION: Capability-Matching (Claude-Powered)
üìä Processing 5 queries with real Claude models


[Query 1/5] Type: refund | Priority: 4
  üìù Content: Can I get my money back? This isn't what I ordered.
  ‚û°Ô∏è  Routed to: Agent 2 (refund) using claude-3-5-sonnet-20241022
  ‚ùå Failed: NotFoundError - Error code: 404 - {'type': 'error', 'error': {'type': 'not_found_error', 'message': 'model: claude-3-5-sonnet-20241022'}, 'request_id': 'req_011CV6KnHWxxJwjv3J8rz8ew'}

[Query 2/5] Type: tracking | Priority: 1
  üìù Content: Where is my package? Order #12345
  ‚û°Ô∏è  Routed to: Agent 0 (order_status) using claude-3-5-haiku-20241022
  ‚úÖ Success | Time: 2.58s | Tokens: 191
  üí¨ Response: Hi there! I'll help you track Order #12345 right away.

‚úÖ Order Status: Currently in transit
üìç Current Location: Shipped from local distribution center
üöö Estimated Delivery: Within 2-3 business days

...


[Query 3/5] Type: escalation | Priority: 5
  üìù Content: Worst cus

In [None]:
async def test_edge_case_all_busy():
    """Test what happens when all specialized agents are busy"""

    print("\n" + "="*70)
    print("üß™ EDGE CASE TEST: All Specialized Agents Busy")
    print("="*70 + "\n")

    agents = create_claude_agent_pool(async_client)
    router = CapabilityMatchingRouter()

    # Simulate all refund agents being at capacity
    for agent in agents:
        if agent.capability.agent_type == AgentType.REFUND:
            agent.current_tasks = agent.capability.max_concurrent_tasks
            print(f"‚ö†Ô∏è  Agent {agent.agent_id} (REFUND) set to maximum capacity")

    # Try to route a refund query
    test_query = Query(
        id=999,
        content="I need a refund urgently!",
        query_type=QueryType.REFUND,
        priority=4,
        timestamp=time.time()
    )

    routed_agent = router.route(test_query, agents)

    if routed_agent:
        print(f"\n‚úÖ Fallback Successful!")
        print(f"   Query routed to: Agent {routed_agent.agent_id}")
        print(f"   Agent Type: {routed_agent.capability.agent_type.value}")
        print(f"   Strategy: {'Escalation Agent' if routed_agent.capability.agent_type == AgentType.ESCALATION else 'Alternative Agent'}")

        # Actually process the query
        result = await routed_agent.process_query(test_query)

        if result['success']:
            print(f"\nüí¨ Response Preview:")
            print(f"   {result['response'][:300]}...")
    else:
        print("\n‚ùå Routing failed - no fallback available")

# Run edge case test
await test_edge_case_all_busy()



üß™ EDGE CASE TEST: All Specialized Agents Busy

‚ö†Ô∏è  Agent 2 (REFUND) set to maximum capacity
‚ö†Ô∏è  Agent 3 (REFUND) set to maximum capacity

‚úÖ Fallback Successful!
   Query routed to: Agent 5
   Agent Type: escalation
   Strategy: Escalation Agent


In [None]:
def estimate_costs(results: List[dict]):
    """Estimate API costs based on token usage"""

    # Pricing as of Nov 2025 (check current pricing at https://www.anthropic.com/pricing)
    pricing = {
        'claude-3-5-haiku-20241022': {
            'input': 0.80 / 1_000_000,   # $0.80 per MTok
            'output': 4.00 / 1_000_000    # $4.00 per MTok
        },
        'claude-3-5-sonnet-20241022': {
            'input': 3.00 / 1_000_000,    # $3.00 per MTok
            'output': 15.00 / 1_000_000   # $15.00 per MTok
        }
    }

    total_cost = 0.0
    token_breakdown = defaultdict(int)

    for result in results:
        if result.get('success') and 'tokens_used' in result:
            model = result.get('model', 'claude-3-5-sonnet-20241022')
            tokens = result['tokens_used']

            # Approximate 60% input, 40% output split
            input_tokens = int(tokens * 0.6)
            output_tokens = int(tokens * 0.4)

            if model in pricing:
                cost = (input_tokens * pricing[model]['input'] +
                       output_tokens * pricing[model]['output'])
                total_cost += cost
                token_breakdown[model] += tokens

    print("\n" + "="*70)
    print("üí∞ COST ESTIMATION")
    print("="*70)
    print(f"Total Estimated Cost: ${total_cost:.4f}")
    print(f"\nToken Usage by Model:")
    for model, tokens in token_breakdown.items():
        print(f"  {model}: {tokens:,} tokens")
    print("="*70 + "\n")

    return total_cost

# Run cost estimation on previous results
if 'results' in locals():
    estimate_costs(results)



üí∞ COST ESTIMATION
Total Estimated Cost: $0.0012

Token Usage by Model:
  claude-3-5-haiku-20241022: 574 tokens

