In [None]:
import pdb
from multiprocessing import Queue
monitor_events_queue = Queue()

from dataclasses import dataclass


@dataclass
class Event:
    source: str       # отправитель
    destination: str  # получатель
    operation: str    # чего хочет (запрашиваемое действие)
    parameters: str   # с какими параметрами

In [None]:

from multiprocessing import Queue, Process
from multiprocessing.queues import Empty
import json

# формат управляющих команд для монитора
@dataclass
class ControlEvent:
    operation: str

# список разрешенных сочетаний сигналов светофора
# любые сочетания, отсутствующие в этом списке, запрещены
traffic_lights_allowed_configurations = [
    {"direction_1": "red", "direction_2": "green"},
    {"direction_1": "red", "direction_2": "red"},
    {"direction_1": "red", "direction_2": "yellow"},
    {"direction_1": "yellow", "direction_2": "yellow"},
    {"direction_1": "off", "direction_2": "off"},
    {"direction_1": "green", "direction_2": "red"},
    {"direction_1": "green", "direction_2": "yellow"},

    {"direction_1": "red", "direction_2": "yellow_blinking"},
    {"direction_1": "yellow_blinking", "direction_2": "yellow_blinking"},
    {"direction_1": "green", "direction_2": "yellow_blinking"},
    {"direction_1": "yellow_blinking", "direction_2": "green"},
    {"direction_1": "yellow_blinking", "direction_2": "red"},
    {"direction_1": "yellow", "direction_2": "yellow_blinking"},
]


pov_configuration = [
    {"direction_1": "red", "direction_2": "green"},
    {"direction_1": "green", "direction_2": "red"},
]

status_configuration = [
    {"status_before": "correct", "status_after": "correct"},
]

In [None]:
from multiprocessing import Queue, Process
from queue import Empty
from time import sleep
from dataclasses import dataclass

@dataclass
class Event:
    source: str
    destination: str
    operation: str
    parameters: list

@dataclass
class ControlEvent:
    operation: str

class Monitor(Process):
    """
    Процесс, осуществляющий мониторинг и контроль взаимодействия между компонентами системы.
    """

    def __init__(self, events_q: Queue):
        """
        Инициализирует процесс мониторинга.

        Args:
            events_q (Queue): Очередь для получения событий от других компонентов системы.
        """
        super().__init__()
        self._events_q = events_q
        self._control_q = Queue()
        self._entity_queues = {}  # Инициализация _entity_queues
        self._force_quit = False

    def add_entity_queue(self, entity_id: str, queue: Queue):
        """
        Регистрирует очередь сущности для мониторинга.

        Args:
            entity_id (str): Идентификатор сущности.
            queue (Queue): Очередь сущности.
        """
        print(f"[монитор] регистрируем сущность {entity_id}")
        self._entity_queues[entity_id] = queue

    def _check_policies(self, event: Event) -> bool:
        """
        Проверяет, соответствует ли событие политикам безопасности.

        Args:
            event (Event): Событие для проверки.

        Returns:
            bool: True, если событие соответствует политикам безопасности, False в противном случае.
        """
        print(f'[монитор] обрабатываем событие {event}')

        authorized = False

        if not isinstance(event, Event):
            print("[монитор] Событие не является экземпляром класса Event")
            return False

        # Упрощенные проверки. Здесь должна быть реальная логика проверки политик.
        if event.source == "CitySystemConnector" and event.destination == "ControlSystem" and event.operation == "get_instructions" and isinstance(event.parameters,list) and len(event.parameters) == 3:
            authorized = True
        elif event.source == "ControlSystem" and event.destination == "CitySystemConnector" and event.operation == "check_status" and isinstance(event.parameters,list) and len(event.parameters) == 3:
            authorized = True
        elif event.source == "ControlSystem" and event.destination == "CitySystemConnector" and event.operation == "diagnostics_output" and isinstance(event.parameters,list) and len(event.parameters) == 3:
            authorized = True
        elif event.source == "ControlSystem" and event.destination == "LightsGPIO" and event.operation == "set_mode" and isinstance(event.parameters,list) and len(event.parameters) == 3:
            authorized = True
        elif event.source == "LightsGPIO" and event.destination == "SelfDiagnostics" and event.operation == "diagnostics_input" and isinstance(event.parameters,list) and len(event.parameters) == 3:
            authorized = True
        elif event.source == "SelfDiagnostics" and event.destination == "ControlSystem" and event.operation == "diagnostics_output" and isinstance(event.parameters,list) and len(event.parameters) == 3:
            authorized = True
        else:
            print("[монитор] Событие не соответствует политикам безопасности.")
            authorized = False

        return authorized

    # выполнение разрешённого запроса
    # метод должен вызываться только после проверки политик безопасности
    def _proceed(self, event: Event):
        """
        Выполняет разрешенный запрос, отправляя его в очередь назначения.

        Args:
            event (Event): Событие, которое необходимо выполнить.
        """
        print(f'[монитор] отправляем запрос {event}')
        try:
            # найдём очередь получателя события
            dst_q: Queue = self._entity_queues[event.destination]
            # и положим запрос в эту очередь
            dst_q.put(event)
        except KeyError as e:
            # например, запрос пришёл от или для неизвестной сущности
            print(f"[монитор] Ошибка: нет очереди для {event.destination} - {e}")
        except Exception as e:
            print(f"[монитор] Общая ошибка при выполнении запроса - {e}")


    # основной код работы монитора безопасности
    def run(self):
        """
        Основной цикл работы монитора безопасности.
        """
        print('[монитор] старт')
        try:
            while not self._force_quit:  # Simplified condition
                event = None
                try:
                    # ожидание сделано неблокирующим, чтобы можно было
                    # обработать запрос на завершение работы
                    event: Event = self._events_q.get(timeout=0.5)  # Wait for 0.5 seconds
                    # сюда попадаем только в случае получение события,
                    # теперь нужно проверить политики безопасности
                    if self._check_policies(event):
                        # если политиками запрос авторизован - выполняем
                        self._proceed(event)
                except Empty:
                    # сюда попадаем, если новых сообщений ещё нет,
                    # в таком случае ничего не делаем
                    pass # No need to sleep, get(timeout) already does that
                except Exception as e:
                    # что-то пошло не так, выведем сообщение об ошибке
                    print(f"[монитор] Ошибка обработки события {e}, {event}")

                self._check_control_q()  # Check for control commands
        finally:
            print('[монитор] завершение работы')
            # Cleanup resources (queues, etc.) here if needed
            self._events_q.close()
            self._control_q.close()

    # запрос на остановку работы монитора безопасности для завершения работы
    # может вызываться вне процесса монитора
    def stop(self):
        """
        Запрашивает остановку работы монитора безопасности.
        """
        # поскольку монитор работает в отдельном процессе,
        # запрос помещается в очередь, которая проверяется из процесса монитора
        request = ControlEvent(operation='stop')
        self._control_q.put(request)

    # проверка наличия новых управляющих команд
    def _check_control_q(self):
        """
        Проверяет наличие управляющих команд в очереди управления.
        """
        try:
            request: ControlEvent = self._control_q.get_nowait()
            print(f"[монитор] проверяем запрос {request}")
            if isinstance(request, ControlEvent):
                if request.operation == 'stop':
                    # поступил запрос на остановку монитора, поднимаем "красный флаг"
                    self._force_quit = True
                    print("[монитор] получен запрос на остановку, завершаем работу...")

                # Add handling for other control events here if needed

        except Empty:
            # никаких команд не поступило, ну и ладно
            pass


In [None]:
from multiprocessing import Queue, Process
import json
import queue

class ControlSystem(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()

    # выдаёт собственную очередь для взаимодействия
    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):
        print(f'[{self.__class__.__name__}] старт')
        print(f'[{self.__class__.__name__}] отправляем тестовый запрос')
        while True:
            print(f'[{self.__class__.__name__}] старт')
            sleep(5)
            try:
                event: Event = self._own_queue.get_nowait()
            except queue.Empty:
                print("Пусто")
                break

            mode = json.loads(event.parameters[0])
            turn_left_mode = json.loads(event.parameters[1])
            turn_right_mode = json.loads(event.parameters[2])
            if event.operation == "get_instructions":
                event = Event(source=self.__class__.__name__,
                                      destination='LightsGPIO',
                                      operation='set_mode',
                                      parameters=[json.dumps(mode),
                                                  json.dumps(turn_left_mode),
                                                  json.dumps(turn_right_mode),
                                                 ])
                self.monitor_queue.put(event)

            if event.operation == "diagnostics_output":
                event = Event(source=self.__class__.__name__,
                                      destination='CitySystemConnector',
                                      operation='diagnostics_output',
                                      parameters=[json.dumps(mode),
                                                  json.dumps(turn_left_mode),
                                                  json.dumps(turn_right_mode),
                                                 ]
                                      )
                self.monitor_queue.put(event)
            else:
                print(f'[{self.__class__.__name__}] завершение работы')
                sleep(5)
        event = Event(source=self.__class__.__name__,
                                      destination='Monitor',
                                      operation='stop',
                                      parameters=[json.dumps(mode),
                                                  json.dumps(turn_left_mode),
                                                  json.dumps(turn_right_mode),
                                                 ]
                                      )
        self.monitor_queue.put(event)

In [None]:
from multiprocessing import Queue, Process
import json
import queue
from time import sleep
from dataclasses import dataclass

class LightsGPIO(Process):
    """
    Процесс, эмулирующий управление светодиодами на GPIO.
    """

    def __init__(self, monitor_queue: Queue):
        """
        Инициализирует процесс LightsGPIO.

        Args:
            monitor_queue (Queue): Очередь для связи с монитором безопасности.
        """
        super().__init__()
        self.monitor_queue = monitor_queue
        self._own_queue = Queue()
        self._current_mode = None  # Инициализация текущего режима

    def entity_queue(self):
        """
        Возвращает очередь для получения событий.

        Returns:
            Queue: Очередь для получения событий.
        """
        return self._own_queue

    def _send_diagnostics_input_event(self):
        """
        Отправляет событие diagnostics_input в очередь монитора.
        """
        status = {"status_before": "correct", "status_after": "correct"}
        event = Event(
            source=self.__class__.__name__,
            destination='SelfDiagnostics',
            operation='diagnostics_input',
            parameters=[
                json.dumps(status),
                json.dumps(status),
                json.dumps(status)
            ]
        )
        self.monitor_queue.put(event)

    def run(self):
        """
        Основной цикл процесса LightsGPIO.
        """
        print(f'[{self.__class__.__name__}] старт')

        while True:  # Бесконечный цикл ожидания сообщений
            try:
                event: Event = self._own_queue.get(timeout=5)  # Wait for 5 seconds
                if event.operation == "set_mode":
                    try:
                        print(f"[{self.__class__.__name__}] {event.source} запрашивает изменение режима светофора {event.parameters[0]}")
                        print(f"[{self.__class__.__name__}] {event.source} запрашивает изменение режима левой стрелки {event.parameters[1]}")
                        print(f"[{self.__class__.__name__}] {event.source} запрашивает изменение режима правой стрелки {event.parameters[2]}")

                        mode = json.loads(event.parameters[0])
                        turn_left_mode = json.loads(event.parameters[1])
                        turn_right_mode = json.loads(event.parameters[2])

                        if mode in traffic_lights_allowed_configurations:
                            print(f"[{self.__class__.__name__}] новый режим светофора: {event.parameters[0]} нерегулируемый")
                        else:
                            print(f"[{self.__class__.__name__}] новый режим светофора: {event.parameters[0]} регулируемый")
                        print(f"[{self.__class__.__name__}] новый режим левой стрелки: {turn_left_mode}!")
                        print(f"[{self.__class__.__name__}] новый режим правой стрелки: {turn_right_mode}!")

                        self._current_mode = mode #Устанавливаем значение для _current_mode

                    except (json.JSONDecodeError, TypeError, IndexError) as e:
                        print(f"[{self.__class__.__name__}] Ошибка при обработке параметров: {e}")
                else:
                    print(f"[{self.__class__.__name__}] Неизвестная операция: {event.operation}")

                self._send_diagnostics_input_event()  # Отправляем событие diagnostics_input после обработки события
                sleep(2)

            except queue.Empty:
                print(f"[{self.__class__.__name__}] очередь пуста, ждем...")
            except Exception as e:
                print(f"[{self.__class__.__name__}] произошла ошибка: {e}")
                break

        print(f'[{self.__class__.__name__}] завершение работы')

In [None]:
class SelfDiagnostics(Process):
    def __init__(self, monitor_queue):
        super().__init__()
        self.monitor_queue = monitor_queue
        self._own_queue = Queue()

    def entity_queue(self):
        return self._own_queue

    def run(self):
        print(f'[{self.__class__.__name__}] старт')
        attempts = 3
        while attempts > 0:
            try:
                event: Event = self._own_queue.get_nowait()
                print(f"[{self.__class__.__name__}] статус светофора: {event.parameters[0]}")
                print(f"[{self.__class__.__name__}] статус левой стрелки: {event.parameters[1]}")
                print(f"[{self.__class__.__name__}] статус правой стрелки: {event.parameters[2]}")

            except Empty:
                    sleep(0.5)
                    attempts -= 1

        event = Event(source=self.__class__.__name__,
                          destination='ControlSystem',
                          operation='diagnostics_output',
                          parameters=[{"status_before": "correct", "status_after": "correct"},
                                      {"status_before": "correct", "status_after": "correct"},
                                      {"status_before": "correct", "status_after": "correct"},
                                     ]
                          )

        self.monitor_queue.put(event)
        print(f'[{self.__class__.__name__}] завершение работы')

In [None]:
from multiprocessing import Queue, Process
import json
import queue
from time import sleep
from dataclasses import dataclass

@dataclass
class Event:
    source: str
    destination: str
    operation: str
    parameters: list

class CitySystemConnector(Process):
    """
    Процесс, представляющий коннектор к городской системе для получения инструкций и отправки статуса светофора.
    """

    def __init__(self, monitor_queue: Queue):
        """
        Инициализирует коннектор к городской системе.

        Args:
            monitor_queue (Queue): Очередь для связи с монитором безопасности.
        """
        super().__init__()
        self.monitor_queue = monitor_queue
        self._own_queue = Queue()

    def entity_queue(self):
        """
        Возвращает очередь для получения событий.

        Returns:
            Queue: Очередь для получения событий.
        """
        return self._own_queue

    def _send_get_instructions_event(self, mode, turn_left_mode, turn_right_mode):
        """
        Создает и отправляет событие 'get_instructions' в очередь монитора.

        Args:
            mode (dict): Режим светофора.
            turn_left_mode (dict): Режим левой стрелки.
            turn_right_mode (dict): Режим правой стрелки.
        """
        event = Event(
            source=self.__class__.__name__,
            destination='ControlSystem',
            operation='get_instructions',
            parameters=[
                json.dumps(mode),
                json.dumps(turn_left_mode),
                json.dumps(turn_right_mode),
            ]
        )
        self.monitor_queue.put(event)

    def run(self):
        """
        Основной цикл коннектора к городской системе.
        """
        print(f'[{self.__class__.__name__}] старт')

        try:
            while True:
                print(f'[{self.__class__.__name__}] отправляем запрос get_instructions')
                mode = {"direction_1": "red", "direction_2": "yellow_blinking"}
                turn_left_mode = {"direction_1": "red", "direction_2": "green"}
                turn_right_mode = {"direction_1": "green", "direction_2": "red"}

                self._send_get_instructions_event(mode, turn_left_mode, turn_right_mode)
                sleep(10)

                try:
                    event: Event = self._own_queue.get(timeout=5)  # Wait 5 seconds for diagnostic event

                    if event.operation == "diagnostics_output":
                        print(f"[{self.__class__.__name__}] статус светофора: {event.parameters[0]}")
                        print(f"[{self.__class__.__name__}] статус левой стрелки: {event.parameters[1]}")
                        print(f"[{self.__class__.__name__}] статус правой стрелки: {event.parameters[2]}")
                        sleep(2)

                        mode = {"direction_1": "green", "direction_2": "red"}
                        turn_left_mode = {"direction_1": "green", "direction_2": "red"}
                        turn_right_mode = {"direction_1": "red", "direction_2": "yellow_blinking"}
                        self._send_get_instructions_event(mode, turn_left_mode, turn_right_mode)
                    else:
                        print(f"[{self.__class__.__name__}] Неизвестная операция: {event.operation}")

                except queue.Empty:
                    print(f"[{self.__class__.__name__}] очередь пуста, продолжаем работу.")

        except Exception as e:
            print(f"[{self.__class__.__name__}] Произошла ошибка: {e}")

        finally:
            print(f'[{self.__class__.__name__}] завершение работы')

In [None]:
# Создаем очередь событий монитора
monitor_events_queue = Queue()

# Создаем процессы
monitor = Monitor(monitor_events_queue)
city_system_connector = CitySystemConnector(monitor_events_queue)
control_system = ControlSystem(monitor_events_queue)
lights_gpio = LightsGPIO(monitor_events_queue)
self_diagnostics = SelfDiagnostics(monitor_events_queue)

# Запускаем процессы
city_system_connector.start()
control_system.start()
lights_gpio.start()
self_diagnostics.start()
#Добавил небольшие задержки, чтобы все процессы успели создать свои очереди.
sleep(0.1)

# Добавляем очереди сущностей в монитор
monitor.add_entity_queue(city_system_connector.__class__.__name__, city_system_connector.entity_queue())
monitor.add_entity_queue(control_system.__class__.__name__, control_system.entity_queue())
monitor.add_entity_queue(lights_gpio.__class__.__name__, lights_gpio.entity_queue())
monitor.add_entity_queue(self_diagnostics.__class__.__name__, self_diagnostics.entity_queue())

# Запускаем монитор
monitor.start()

[ControlSystem] старт[CitySystemConnector] старт
[ControlSystem] отправляем тестовый запрос

[LightsGPIO] старт[CitySystemConnector] отправляем запрос get_instructions

[ControlSystem] старт
[SelfDiagnostics] старт
[монитор] регистрируем сущность CitySystemConnector
[монитор] регистрируем сущность ControlSystem
[монитор] регистрируем сущность LightsGPIO
[монитор] регистрируем сущность SelfDiagnostics


In [None]:
from multiprocessing import Queue, Process
from time import sleep
from dataclasses import dataclass

# Определения классов Event, ControlEvent, Monitor, CitySystemConnector, ...

def main():
    # Создаем очередь событий монитора
    monitor_events_queue = Queue()

    # Создаем процессы
    monitor = Monitor(monitor_events_queue)
    city_system_connector = CitySystemConnector(monitor_events_queue)
    control_system = ControlSystem(monitor_events_queue)
    lights_gpio = LightsGPIO(monitor_events_queue)
    self_diagnostics = SelfDiagnostics(monitor_events_queue)

    # Добавляем очереди сущностей в монитор
    monitor.add_entity_queue(city_system_connector.__class__.__name__, city_system_connector.entity_queue())
    monitor.add_entity_queue(control_system.__class__.__name__, control_system.entity_queue())
    monitor.add_entity_queue(lights_gpio.__class__.__name__, lights_gpio.entity_queue())
    monitor.add_entity_queue(self_diagnostics.__class__.__name__, self_diagnostics.entity_queue())

    # Запускаем процессы
    print("Запускаем city_system_connector")
    city_system_connector.start()
    print("Запускаем control_system")
    control_system.start()
    print("Запускаем lights_gpio")
    lights_gpio.start()
    print("Запускаем self_diagnostics")
    self_diagnostics.start()
    print("Запускаем monitor")
    monitor.start()

    sleep(5)  # Даем процессам поработать

    monitor.stop() #Остановка монитора
    #Теперь нужно дождаться, когда все процессы остановятся
    control_system.join() #Завершение работы ControlSystem
    lights_gpio.join() #Завершение работы LightsGPIO
    self_diagnostics.join() #Завершение работы SelfDiagnostics
    monitor.join() #Завершение работы монитора

if __name__ == "__main__":
    main()