# L05: Multi-Agent Message Passing

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Digital-AI-Finance/agentic-artificial-intelligence/blob/main/L05_Multi_Agent_Architectures/notebooks/L05_message_passing.ipynb)

**Week 5 - Multi-Agent Architectures**

## Learning Objectives
- Implement message passing between agents
- Build different communication patterns
- Understand serialization and routing
- Create a basic multi-agent conversation

In [None]:
# Colab setup
import sys
if 'google.colab' in sys.modules:
    !pip install -q python-dotenv
    from google.colab import userdata
    import os
    os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import json
from datetime import datetime

print("Dependencies loaded")

## 1. Message Structure

In [None]:
class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    BROADCAST = "broadcast"
    ERROR = "error"

@dataclass
class Message:
    """Standard message format for agent communication."""
    sender: str
    receiver: str  # "*" for broadcast
    content: str
    msg_type: MessageType = MessageType.REQUEST
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    
    def to_dict(self) -> dict:
        return {
            "sender": self.sender,
            "receiver": self.receiver,
            "content": self.content,
            "type": self.msg_type.value,
            "metadata": self.metadata,
            "timestamp": self.timestamp
        }

# Example message
msg = Message(
    sender="orchestrator",
    receiver="coder",
    content="Please write a function to sort a list",
    metadata={"task_id": "001"}
)
print(json.dumps(msg.to_dict(), indent=2))

## 2. Agent Base Class

In [None]:
class Agent:
    """Base agent with message handling."""
    
    def __init__(self, name: str, role: str):
        self.name = name
        self.role = role
        self.inbox: List[Message] = []
        self.outbox: List[Message] = []
    
    def receive(self, message: Message):
        """Receive a message into inbox."""
        self.inbox.append(message)
    
    def send(self, receiver: str, content: str, msg_type: MessageType = MessageType.REQUEST) -> Message:
        """Create and queue a message."""
        msg = Message(
            sender=self.name,
            receiver=receiver,
            content=content,
            msg_type=msg_type
        )
        self.outbox.append(msg)
        return msg
    
    def process(self) -> Optional[Message]:
        """Process next message in inbox. Override in subclasses."""
        if not self.inbox:
            return None
        msg = self.inbox.pop(0)
        response_content = self._handle_message(msg)
        if response_content:
            return self.send(msg.sender, response_content, MessageType.RESPONSE)
        return None
    
    def _handle_message(self, msg: Message) -> Optional[str]:
        """Handle a single message. Override in subclasses."""
        return f"[{self.name}] Received: {msg.content[:50]}..."

print("Agent base class defined")

## 3. Message Router (Orchestrator)

In [None]:
class MessageRouter:
    """Routes messages between agents."""
    
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.message_log: List[Message] = []
    
    def register(self, agent: Agent):
        """Register an agent with the router."""
        self.agents[agent.name] = agent
        print(f"Registered agent: {agent.name} ({agent.role})")
    
    def route(self, message: Message):
        """Route a message to its destination."""
        self.message_log.append(message)
        
        if message.receiver == "*":
            # Broadcast to all except sender
            for name, agent in self.agents.items():
                if name != message.sender:
                    agent.receive(message)
        elif message.receiver in self.agents:
            self.agents[message.receiver].receive(message)
        else:
            print(f"Warning: Unknown receiver '{message.receiver}'")
    
    def step(self):
        """Process one round of messages."""
        for agent in self.agents.values():
            response = agent.process()
            if response:
                self.route(response)
            # Route any queued outbox messages
            while agent.outbox:
                self.route(agent.outbox.pop(0))
    
    def run(self, max_steps: int = 10):
        """Run message processing for multiple steps."""
        for step in range(max_steps):
            self.step()
            if all(len(a.inbox) == 0 for a in self.agents.values()):
                break

print("MessageRouter defined")

## 4. Specialized Agents

In [None]:
class CoderAgent(Agent):
    """Agent that generates code."""
    
    def __init__(self):
        super().__init__("coder", "code_generation")
    
    def _handle_message(self, msg: Message) -> str:
        # Simulated code generation
        return f"def solution():\n    # Implementation for: {msg.content}\n    pass"

class ReviewerAgent(Agent):
    """Agent that reviews code."""
    
    def __init__(self):
        super().__init__("reviewer", "code_review")
    
    def _handle_message(self, msg: Message) -> str:
        # Simulated code review
        if "def " in msg.content:
            return "Code review: Looks good, but consider adding error handling."
        return "Please provide code to review."

class OrchestratorAgent(Agent):
    """Agent that coordinates workflow."""
    
    def __init__(self):
        super().__init__("orchestrator", "coordination")
        self.workflow_state = "start"
    
    def _handle_message(self, msg: Message) -> Optional[str]:
        print(f"[Orchestrator] Received from {msg.sender}: {msg.content[:50]}...")
        return None  # Orchestrator doesn't auto-reply

print("Specialized agents defined")

## 5. Running a Multi-Agent Conversation

In [None]:
# Create router and agents
router = MessageRouter()
orchestrator = OrchestratorAgent()
coder = CoderAgent()
reviewer = ReviewerAgent()

router.register(orchestrator)
router.register(coder)
router.register(reviewer)

# Start workflow: Orchestrator -> Coder
initial_msg = Message(
    sender="user",
    receiver="coder",
    content="Write a function to calculate fibonacci numbers"
)
router.route(initial_msg)

# Process messages
print("\n--- Message Processing ---")
router.run(max_steps=5)

# Show message log
print("\n--- Message Log ---")
for msg in router.message_log:
    print(f"{msg.sender} -> {msg.receiver}: {msg.content[:60]}...")

## 6. Key Takeaways

1. **Standardized Messages**: Consistent format enables interoperability
2. **Message Router**: Central point for routing and logging
3. **Agent Specialization**: Each agent handles specific message types
4. **Async Processing**: Inbox/outbox pattern decouples sending from receiving

## Next Steps
- Add LLM integration for message generation
- Implement priority queues for urgent messages
- Add persistence for message history