In [None]:
import random
from typing import List, Dict, Any
from enum import Enum
from dataclasses import dataclass
from queue import Queue
import time

class TaskType(Enum):
    COLLECT = "collect"
    PROCESS = "process"
    ANALYZE = "analyze"
    REPORT = "report"

@dataclass
class Task:
    task_id: int
    task_type: TaskType
    data: Any
    status: str = "pending"
    result: Any = None

class Message:
    def __init__(self, sender: str, receiver: str, content: Any):
        self.sender = sender
        self.receiver = receiver
        self.content = content
        self.timestamp = time.time()

In [None]:
class Agent:
    def __init__(self, agent_id: str, agent_type: str, capabilities: List[TaskType]):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.capabilities = capabilities
        self.message_queue = Queue()
        self.task_queue = Queue()
        self.knowledge_base = {}

    def receive_message(self, message: Message):
        self.message_queue.put(message)

    def send_message(self, receiver: str, content: Any) -> Message:
        return Message(self.agent_id, receiver, content)

    def process_task(self, task: Task) -> Task:
        if task.task_type not in self.capabilities:
            return Task(task.task_id, task.task_type, task.data, "failed", "Agent not capable")

        # Simulate processing based on task type
        if task.task_type == TaskType.COLLECT:
            task.result = f"Collected data: {task.data}"
        elif task.task_type == TaskType.PROCESS:
            task.result = f"Processed data: {task.data}"
        elif task.task_type == TaskType.ANALYZE:
            task.result = f"Analysis result: {task.data}"
        elif task.task_type == TaskType.REPORT:
            task.result = f"Report generated for: {task.data}"

        task.status = "completed"
        return task

    def step(self):
        # Process messages
        while not self.message_queue.empty():
            message = self.message_queue.get()
            print(f"Agent {self.agent_id} received message from {message.sender}: {message.content}")

        # Process tasks
        if not self.task_queue.empty():
            task = self.task_queue.get()
            processed_task = self.process_task(task)
            print(f"Agent {self.agent_id} processed task {task.task_id}: {processed_task.result}")
            return processed_task
        return None



In [None]:
class MultiAgentSystem:
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.task_counter = 0

    def add_agent(self, agent: Agent):
        self.agents[agent.agent_id] = agent

    def create_task(self, task_type: TaskType, data: Any) -> Task:
        self.task_counter += 1
        return Task(self.task_counter, task_type, data)

    def assign_task(self, task: Task):
        # Find capable agent
        capable_agents = [
            agent for agent in self.agents.values()
            if task.task_type in agent.capabilities
        ]

        if capable_agents:
            chosen_agent = random.choice(capable_agents)
            chosen_agent.task_queue.put(task)
            print(f"Task {task.task_id} assigned to agent {chosen_agent.agent_id}")
        else:
            print(f"No capable agent found for task {task.task_id}")

    def broadcast_message(self, sender: str, content: Any):
        for agent_id, agent in self.agents.items():
            if agent_id != sender:
                message = Message(sender, agent_id, content)
                agent.receive_message(message)

    def step(self):
        results = []
        for agent in self.agents.values():
            result = agent.step()
            if result:
                results.append(result)
        return results



In [None]:
# Example usage
def run_example():
    # Create multi-agent system
    mas = MultiAgentSystem()

    # Create agents with different capabilities
    collector = Agent("collector_1", "collector", [TaskType.COLLECT])
    processor = Agent("processor_1", "processor", [TaskType.PROCESS])
    analyzer = Agent("analyzer_1", "analyzer", [TaskType.ANALYZE])
    reporter = Agent("reporter_1", "reporter", [TaskType.REPORT])

    # Add agents to system
    for agent in [collector, processor, analyzer, reporter]:
        mas.add_agent(agent)

    # Create and assign tasks
    tasks = [
        mas.create_task(TaskType.COLLECT, "sensor_data"),
        mas.create_task(TaskType.PROCESS, "raw_data"),
        mas.create_task(TaskType.ANALYZE, "processed_data"),
        mas.create_task(TaskType.REPORT, "analysis_results")
    ]

    for task in tasks:
        mas.assign_task(task)

    # Broadcast a message
    mas.broadcast_message("system", "Starting simulation")

    # Run simulation for a few steps
    for _ in range(5):
        results = mas.step()
        if results:
            print("\nSimulation step:")
            print("Task results:", results)

In [None]:
if __name__ == "__main__":
    run_example()

Task 1 assigned to agent collector_1
Task 2 assigned to agent processor_1
Task 3 assigned to agent analyzer_1
Task 4 assigned to agent reporter_1
Agent collector_1 received message from system: Starting simulation
Agent collector_1 processed task 1: Collected data: sensor_data
Agent processor_1 received message from system: Starting simulation
Agent processor_1 processed task 2: Processed data: raw_data
Agent analyzer_1 received message from system: Starting simulation
Agent analyzer_1 processed task 3: Analysis result: processed_data
Agent reporter_1 received message from system: Starting simulation
Agent reporter_1 processed task 4: Report generated for: analysis_results

Simulation step:
Task results: [Task(task_id=1, task_type=<TaskType.COLLECT: 'collect'>, data='sensor_data', status='completed', result='Collected data: sensor_data'), Task(task_id=2, task_type=<TaskType.PROCESS: 'process'>, data='raw_data', status='completed', result='Processed data: raw_data'), Task(task_id=3, tas