In [1]:
!pip install --upgrade --quiet pydantic

In [None]:
from typing import List, Dict, Optional, Union
from pydantic import BaseModel, Field
from enum import Enum
import asyncio
from datetime import datetime
import json
from dataclasses import dataclass
from collections import deque

class MessageType(Enum):
    TASK_ASSIGNMENT = "task_assignment"
    STATUS_UPDATE = "status_update"
    TECHNICAL_QUERY = "technical_query"
    TECHNICAL_RESPONSE = "technical_response"
    FEEDBACK = "feedback"
    IMPROVEMENT_SUGGESTION = "improvement_suggestion"
    TEST_RESULT = "test_result"

class Expertise(Enum):
    FRONTEND = "frontend"
    BACKEND = "backend"
    DEVOPS = "devops"
    DATABASE = "database"
    TESTING = "testing"

@dataclass
class TechnologyStack:
    name: str
    version: str
    expertise_area: Expertise
    required_skills: List[str]

class Message(BaseModel):
    sender: str
    receiver: str
    msg_type: MessageType
    content: Dict
    timestamp: datetime = Field(default_factory=datetime.now)
    priority: int = 0
    context: Optional[Dict] = None

class Task(BaseModel):
    id: str
    title: str
    description: str
    required_expertise: List[Expertise]
    assigned_to: Optional[str] = None
    status: str = "pending"
    dependencies: List[str] = Field(default_factory=list)
    estimated_time: float
    actual_time: Optional[float] = None
    learning_opportunities: List[str] = Field(default_factory=list)

class AgentMetrics(BaseModel):
    tasks_completed: int = 0
    avg_completion_time: float = 0.0
    skill_improvements: Dict[str, float] = Field(default_factory=dict)
    learning_activities: List[str] = Field(default_factory=list)
    feedback_rating: float = 0.0

class BaseAgent:
    def __init__(self, name: str, model_path: str):
        self.name = name
        self.model_path = model_path
        self.message_queue = asyncio.Queue()
        self.metrics = AgentMetrics()
        self.knowledge_base = {}

    async def send_message(self, receiver: str, msg_type: MessageType, content: Dict):
        message = Message(
            sender=self.name,
            receiver=receiver,
            msg_type=msg_type,
            content=content
        )
        await self.message_queue.put(message)

    async def receive_message(self) -> Message:
        return await self.message_queue.get()

class ArchitectAgent(BaseAgent):
    def __init__(self, name: str):
        super().__init__(name, model_path="large_model_path")  # Using large model
        self.technology_knowledge = {}
        self.system_design_patterns = {}
        self.architecture_decisions = []

    async def evaluate_technical_query(self, query: Dict) -> Dict:
        """Evaluate complex technical queries using the large model"""
        # Simplified for example
        return {
            "solution": "Detailed technical solution",
            "rationale": "Architecture considerations",
            "implications": ["Impact 1", "Impact 2"]
        }

    async def review_system_design(self, design: Dict) -> Dict:
        """Review and improve system design"""
        # Implementation using large model for complex reasoning
        pass

    async def maintain_technical_standards(self) -> None:
        """Update and maintain technical standards"""
        pass

class ProjectManagerAgent(BaseAgent):
    def __init__(self, name: str):
        super().__init__(name, model_path="small_model_path")  # Using small model
        self.task_queue = deque()
        self.team_metrics = {}
        self.learning_programs = {}

    async def assign_task(self, task: Task, team: Dict[str, 'ProgrammerAgent']) -> None:
        """Assign tasks based on expertise and current workload"""
        best_programmer = None
        min_workload = float('inf')

        for programmer in team.values():
            if (any(exp in programmer.expertise for exp in task.required_expertise) and 
                len(programmer.current_tasks) < min_workload):
                best_programmer = programmer
                min_workload = len(programmer.current_tasks)

        if best_programmer:
            task.assigned_to = best_programmer.name
            await self.send_message(
                best_programmer.name,
                MessageType.TASK_ASSIGNMENT,
                {"task": task.dict()}
            )

    async def monitor_progress(self, team: Dict[str, 'ProgrammerAgent']) -> None:
        """Monitor team progress and provide feedback"""
        for programmer in team.values():
            metrics = programmer.metrics
            if metrics.tasks_completed > 0:
                await self.send_message(
                    programmer.name,
                    MessageType.FEEDBACK,
                    {
                        "performance": metrics.dict(),
                        "suggestions": self.generate_improvement_suggestions(metrics)
                    }
                )

    def generate_improvement_suggestions(self, metrics: AgentMetrics) -> List[str]:
        """Generate personalized improvement suggestions using small model"""
        # Implementation using small model for basic analysis
        pass

class ProgrammerAgent(BaseAgent):
    def __init__(self, name: str, expertise: List[Expertise]):
        super().__init__(name, model_path="small_model_path")  # Using small model
        self.expertise = expertise
        self.current_tasks: List[Task] = []
        self.learning_goals = {}
        self.test_results = []

    async def work_on_task(self, task: Task) -> None:
        """Work on assigned task"""
        # Implementation using small model for specific technology tasks
        self.current_tasks.append(task)
        
        # Simulate task work
        await asyncio.sleep(task.estimated_time)
        
        # Update metrics
        self.metrics.tasks_completed += 1
        self.metrics.avg_completion_time = (
            (self.metrics.avg_completion_time * (self.metrics.tasks_completed - 1) + 
             task.estimated_time) / self.metrics.tasks_completed
        )

    async def run_tests(self) -> None:
        """Run tests in spare time"""
        if not self.current_tasks:  # Only run tests when not working on tasks
            test_result = {
                "coverage": 85.5,
                "passed": 42,
                "failed": 3,
                "learning_points": ["Point 1", "Point 2"]
            }
            self.test_results.append(test_result)
            
            # Update learning metrics
            self.metrics.learning_activities.append("Unit Testing")
            self.metrics.skill_improvements["testing"] = \
                self.metrics.skill_improvements.get("testing", 0) + 0.1

    async def request_technical_help(self, problem: Dict) -> None:
        """Request help from architect for complex issues"""
        await self.send_message(
            "architect",
            MessageType.TECHNICAL_QUERY,
            {
                "problem": problem,
                "context": {
                    "expertise": [e.value for e in self.expertise],
                    "current_task": self.current_tasks[-1].dict() if self.current_tasks else None
                }
            }
        )

class MultiAgentSystem:
    def __init__(self):
        self.architect = ArchitectAgent("architect")
        self.project_manager = ProjectManagerAgent("project_manager")
        self.programmers: Dict[str, ProgrammerAgent] = {}
        self.message_broker = asyncio.Queue()

    def add_programmer(self, name: str, expertise: List[Expertise]) -> None:
        """Add a new programmer to the system"""
        self.programmers[name] = ProgrammerAgent(name, expertise)

    async def route_message(self, message: Message) -> None:
        """Route messages between agents"""
        if message.receiver == "architect":
            await self.architect.message_queue.put(message)
        elif message.receiver == "project_manager":
            await self.project_manager.message_queue.put(message)
        elif message.receiver in self.programmers:
            await self.programmers[message.receiver].message_queue.put(message)

    async def run(self) -> None:
        """Run the multi-agent system"""
        # Start all agent tasks
        tasks = [
            self.architect.maintain_technical_standards(),
            self.project_manager.monitor_progress(self.programmers)
        ]
        
        # Add programmer tasks
        for programmer in self.programmers.values():
            tasks.extend([
                programmer.run_tests(),  # Run tests in spare time
                self._process_agent_messages(programmer)
            ])

        await asyncio.gather(*tasks)

    async def _process_agent_messages(self, agent: BaseAgent) -> None:
        """Process messages for an agent"""
        while True:
            message = await agent.receive_message()
            # Route message to appropriate handler based on type
            if message.msg_type == MessageType.TASK_ASSIGNMENT:
                await self._handle_task_assignment(agent, message)
            elif message.msg_type == MessageType.TECHNICAL_QUERY:
                await self._handle_technical_query(message)
            # Add other message type handlers as needed

    async def _handle_task_assignment(self, agent: ProgrammerAgent, message: Message) -> None:
        """Handle task assignment messages"""
        task = Task(**message.content["task"])
        await agent.work_on_task(task)

    async def _handle_technical_query(self, message: Message) -> None:
        """Handle technical query messages"""
        response = await self.architect.evaluate_technical_query(message.content)
        await self.route_message(
            Message(
                sender="architect",
                receiver=message.sender,
                msg_type=MessageType.TECHNICAL_RESPONSE,
                content=response
            )
        )

# Example usage
async def main():
    system = MultiAgentSystem()
    
    # Add programmers with different expertise
    system.add_programmer("frontend_dev", [Expertise.FRONTEND])
    system.add_programmer("backend_dev", [Expertise.BACKEND, Expertise.DATABASE])
    system.add_programmer("devops_dev", [Expertise.DEVOPS])
    
    # Create some tasks
    tasks = [
        Task(
            id="task1",
            title="Implement user authentication",
            description="Implement OAuth2 authentication flow",
            required_expertise=[Expertise.BACKEND],
            estimated_time=4.0
        ),
        Task(
            id="task2",
            title="Design responsive UI",
            description="Create responsive dashboard layout",
            required_expertise=[Expertise.FRONTEND],
            estimated_time=3.0
        )
    ]
    
    # Assign tasks
    for task in tasks:
        await system.project_manager.assign_task(task, system.programmers)
    
    # Run the system
    await system.run()

if __name__ == "__main__":
    await main()

<ipython-input-2-c74cc90946c2>:129: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  {"task": task.dict()}


[<coroutine object ArchitectAgent.maintain_technical_standards at 0x7924a281bd10>, <coroutine object ProjectManagerAgent.monitor_progress at 0x7924a281be60>, <coroutine object ProgrammerAgent.run_tests at 0x7924a281bdf0>, <coroutine object MultiAgentSystem._process_agent_messages at 0x7924a281b990>, <coroutine object ProgrammerAgent.run_tests at 0x7924a281ba70>, <coroutine object MultiAgentSystem._process_agent_messages at 0x7924a281bed0>, <coroutine object ProgrammerAgent.run_tests at 0x7924a281b920>, <coroutine object MultiAgentSystem._process_agent_messages at 0x7924a281bae0>]
