In [2]:
import asyncio
import random
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class AgentType(Enum):
    ANALYZER = "Анализатор"
    PLANNER = "Планировщик"
    EXECUTOR = "Исполнитель"
    VALIDATOR = "Валидатор"

class CommunicationMode(Enum):
    RANDOM = "случайный"
    SEQUENTIAL = "последовательный"
    HIERARCHICAL = "иерархический"

@dataclass
class Message:
    sender: AgentType
    recipient: Optional[AgentType]
    content: str
    timestamp: float

class AsyncAgent:
    def __init__(self, agent_type: AgentType, communication_delay: float = 0.3):
        self.agent_type = agent_type
        self.communication_delay = communication_delay
        self.message_queue = asyncio.Queue()
        self.received_messages: List[Message] = []
        self.sent_messages: List[Message] = []
        self.is_active = True

    async def process_message(self, message: Message) -> Optional[Message]:
        await asyncio.sleep(self.communication_delay * random.uniform(0.7, 1.3))

        processing_map = {
            AgentType.ANALYZER: self._analyze_data,
            AgentType.PLANNER: self._plan_actions,
            AgentType.EXECUTOR: self._execute_task,
            AgentType.VALIDATOR: self._validate_result
        }

        handler = processing_map.get(self.agent_type)
        if handler:
            response_content = handler(message.content)
        else:
            response_content = "Неизвестная операция"

        if response_content:
            return Message(
                sender=self.agent_type,
                recipient=message.sender,
                content=response_content,
                timestamp=time.time()
            )
        return None

    def _analyze_data(self, data: str) -> str:
        analysis_results = [
            "Обнаружены аномалии в данных",
            "Данные соответствуют ожидаемым параметрам",
            "Требуется дополнительная информация",
            "Выявлены закономерности во временных рядах",
            "Анализ не выявил значимых отклонений"
        ]
        return f"Анализ завершен: {random.choice(analysis_results)}"

    def _plan_actions(self, data: str) -> str:
        plans = [
            "Разработан поэтапный план действий",
            "Создана стратегия с учетом ограничений",
            "Определены критические точки контроля",
            "Сформирован график выполнения задач",
            "План адаптирован под текущие условия"
        ]
        return f"Планирование: {random.choice(plans)}"

    def _execute_task(self, data: str) -> str:
        executions = [
            "Задача выполнена успешно",
            "Выполнение завершено с частичными отклонениями",
            "Требуется вмешательство для завершения",
            "Операция выполнена в установленные сроки",
            "Исполнение приостановлено для уточнения"
        ]
        return f"Исполнение: {random.choice(executions)}"

    def _validate_result(self, data: str) -> str:
        validations = [
            "Результаты соответствуют критериям качества",
            "Обнаружены несоответствия требованиям",
            "Валидация пройдена с замечаниями",
            "Все проверки завершены успешно",
            "Требуется повторная проверка после доработок"
        ]
        return f"Валидация: {random.choice(validations)}"

    async def listen_and_process(self, timeout: float = 0.5):
        while self.is_active:
            try:
                message = await asyncio.wait_for(self.message_queue.get(), timeout=timeout)
                self.received_messages.append(message)

                response = await self.process_message(message)
                if response:
                    self.sent_messages.append(response)
                    yield response

                self.message_queue.task_done()
            except asyncio.TimeoutError:
                break

    async def send_message(self, message: Message):
        await self.message_queue.put(message)

    def stop(self):
        self.is_active = False

class MultiAgentSystem:
    def __init__(self, mode: CommunicationMode = CommunicationMode.RANDOM):
        self.agents: Dict[AgentType, AsyncAgent] = {}
        self.communication_mode = mode
        self.communication_log: List[Message] = []

        for agent_type in AgentType:
            self.agents[agent_type] = AsyncAgent(agent_type, communication_delay=0.2)

    def _select_next_recipient(self, sender: AgentType) -> AgentType:
        if self.communication_mode == CommunicationMode.RANDOM:
            available_agents = [a for a in AgentType if a != sender]
            return random.choice(available_agents)

        elif self.communication_mode == CommunicationMode.SEQUENTIAL:
            agent_order = list(AgentType)
            current_idx = agent_order.index(sender)
            next_idx = (current_idx + 1) % len(agent_order)
            return agent_order[next_idx]

        elif self.communication_mode == CommunicationMode.HIERARCHICAL:
            hierarchy = {
                AgentType.ANALYZER: AgentType.PLANNER,
                AgentType.PLANNER: AgentType.EXECUTOR,
                AgentType.EXECUTOR: AgentType.VALIDATOR,
                AgentType.VALIDATOR: AgentType.ANALYZER
            }
            return hierarchy[sender]

    async def initiate_communication(self, rounds: int = 12):
        print(f"\nИнициализация коммуникации в режиме: {self.communication_mode.value}")

        initial_message = Message(
            sender=AgentType.ANALYZER,
            recipient=AgentType.PLANNER,
            content="Инициализация системы взаимодействия агентов",
            timestamp=time.time()
        )

        self.communication_log.append(initial_message)
        await self.agents[AgentType.PLANNER].send_message(initial_message)

        current_sender = AgentType.PLANNER

        tasks = []
        for agent in self.agents.values():
            task = asyncio.create_task(self._agent_listener(agent))
            tasks.append(task)

        try:
            for round_num in range(rounds):
                print(f"\n--- Раунд {round_num + 1}/{rounds} ---")

                recipient = self._select_next_recipient(current_sender)

                message_content = f"Запрос от {current_sender.value} для {recipient.value}"
                message = Message(
                    sender=current_sender,
                    recipient=recipient,
                    content=message_content,
                    timestamp=time.time()
                )

                self.communication_log.append(message)
                await self.agents[recipient].send_message(message)

                await asyncio.sleep(0.2)

                async for response in self.agents[recipient].listen_and_process():
                    self.communication_log.append(response)
                    print(f"[{time.strftime('%H:%M:%S', time.localtime(response.timestamp))}] "
                          f"{response.sender.value} → {response.recipient.value if response.recipient else 'ВСЕ'}: "
                          f"{response.content}")

                    if response.recipient:
                        await self.agents[response.recipient].send_message(response)

                if self.communication_mode == CommunicationMode.RANDOM:
                    current_sender = random.choice(list(AgentType))
                else:
                    current_sender = recipient

                await asyncio.sleep(0.1)

        finally:
            for agent in self.agents.values():
                agent.stop()

            for task in tasks:
                task.cancel()

            try:
                await asyncio.gather(*tasks, return_exceptions=True)
            except:
                pass

    async def _agent_listener(self, agent: AsyncAgent):
        try:
            async for response in agent.listen_and_process():
                pass
        except asyncio.CancelledError:
            pass

    def generate_report(self):
        print("\n" + "="*70)
        print("ОТЧЕТ О ВЗАИМОДЕЙСТВИИ АГЕНТОВ")
        print("="*70)

        print(f"\nРежим коммуникации: {self.communication_mode.value}")
        print(f"Всего сообщений в системе: {len(self.communication_log)}")

        print("\nСтатистика по агентам:")
        print("-" * 50)

        total_received = 0
        total_sent = 0

        for agent_type in AgentType:
            agent = self.agents[agent_type]
            received = len(agent.received_messages)
            sent = len(agent.sent_messages)
            total_received += received
            total_sent += sent

            print(f"{agent_type.value}:")
            print(f"  Получено сообщений: {received}")
            print(f"  Отправлено сообщений: {sent}")
            print(f"  Активность: {'Активен' if agent.is_active else 'Остановлен'}")

        print("\n" + "="*70)
        print(f"ИТОГО: Получено: {total_received}, Отправлено: {total_sent}")
        print("="*70)

        if self.communication_log:
            print("\nПоследние взаимодействия:")
            print("-" * 50)
            for msg in self.communication_log[-8:]:
                recipient_name = msg.recipient.value if msg.recipient else "ВСЕМ"
                time_str = time.strftime('%H:%M:%S', time.localtime(msg.timestamp))
                print(f"[{time_str}] {msg.sender.value} → {recipient_name}: {msg.content[:50]}...")

async def run_agent_simulation():
    """Основная функция для запуска в Google Colab"""
    print("="*70)
    print("СИСТЕМА АСИНХРОННЫХ АГЕНТОВ")
    print("="*70)

    modes = [
        CommunicationMode.RANDOM,
        CommunicationMode.SEQUENTIAL,
        CommunicationMode.HIERARCHICAL
    ]

    for i, mode in enumerate(modes, 1):
        print(f"\n{'='*70}")
        print(f"ТЕСТ {i}: РЕЖИМ - {mode.value.upper()}")
        print('='*70)

        system = MultiAgentSystem(mode=mode)

        start_time = time.time()
        await system.initiate_communication(rounds=10)
        elapsed_time = time.time() - start_time

        system.generate_report()
        print(f"\nВремя выполнения режима: {elapsed_time:.2f} секунд")

        if i < len(modes):
            print("\n" + "="*70)
            print("Переход к следующему режиму...")
            await asyncio.sleep(1)

# Функция для запуска в Google Colab
def main():
    """
    Запуск системы асинхронных агентов в Google Colab.
    Эта функция корректно работает в среде с уже запущенным event loop.
    """
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # В Google Colab event loop уже запущен
            print("Обнаружен запущенный event loop (Google Colab)")
            print("Использую asyncio.create_task() для запуска")

            # Создаем задачу в существующем loop
            task = loop.create_task(run_agent_simulation())

            # Ожидаем завершения задачи
            loop.run_until_complete(task)
        else:
            # Если loop не запущен (локальное выполнение)
            asyncio.run(run_agent_simulation())
    except RuntimeError as e:
        # Альтернативный способ для Google Colab
        print(f"RuntimeError: {e}")
        print("Использую альтернативный метод запуска...")

        async def colab_main():
            await run_agent_simulation()

        # Пробуем запустить через nest_asyncio если установлен
        try:
            import nest_asyncio
            nest_asyncio.apply()
            asyncio.run(colab_main())
        except ImportError:
            # Если nest_asyncio не установлен, используем простой метод
            print("Установка nest_asyncio для работы в Colab...")
            import subprocess
            import sys

            subprocess.check_call([sys.executable, "-m", "pip", "install", "nest_asyncio", "-q"])

            import nest_asyncio
            nest_asyncio.apply()
            asyncio.run(colab_main())

if __name__ == "__main__":
    print("Запуск системы асинхронных агентов...")
    main()
    print("\n" + "="*70)
    print("СИМУЛЯЦИЯ ЗАВЕРШЕНА")
    print("="*70)

Запуск системы асинхронных агентов...
Обнаружен запущенный event loop (Google Colab)
Использую asyncio.create_task() для запуска
RuntimeError: This event loop is already running
Использую альтернативный метод запуска...
СИСТЕМА АСИНХРОННЫХ АГЕНТОВ

ТЕСТ 1: РЕЖИМ - СЛУЧАЙНЫЙ

Инициализация коммуникации в режиме: случайный

--- Раунд 1/10 ---
СИСТЕМА АСИНХРОННЫХ АГЕНТОВ

ТЕСТ 1: РЕЖИМ - СЛУЧАЙНЫЙ

Инициализация коммуникации в режиме: случайный

--- Раунд 1/10 ---

--- Раунд 2/10 ---

--- Раунд 2/10 ---
[21:22:04] Планировщик → Анализатор: Планирование: Сформирован график выполнения задач
[21:22:04] Валидатор → Планировщик: Валидация: Все проверки завершены успешно

--- Раунд 3/10 ---

--- Раунд 3/10 ---
[21:22:05] Валидатор → Анализатор: Валидация: Результаты соответствуют критериям качества
[21:22:05] Исполнитель → Планировщик: Исполнение: Требуется вмешательство для завершения

--- Раунд 4/10 ---

--- Раунд 4/10 ---
[21:22:06] Планировщик → Валидатор: Планирование: Разработан поэтапный