# Безопасное обновление системы
## Организация обновления через канал связи с удалённым сервером

Этот ноутбук реализует систему безопасного обновления с использованием Монитора Безопасности и разделённой системы управления.

In [1]:
# Импорт необходимых библиотек
from multiprocessing import Queue
import os
import hashlib
import importlib.util

# Модули для эмуляции событий и очередей (упрощённая версия)
class Event:
    def __init__(self, source, destination, operation, parameters):
        self.source = source
        self.destination = destination
        self.operation = operation
        self.parameters = parameters

class QueuesDirectory:
    def __init__(self):
        self._queues = {"security": Queue(), "control": Queue(), "updater": Queue()}
    
    def get_queue(self, name):
        return self._queues.get(name, Queue())

In [2]:
# Определение SecurityMonitor
class SecurityMonitor:
    def __init__(self, queues_dir, source_dir="./source", dest_dir="./destination", log_level=1):
        self._queues_dir = queues_dir
        self._log_level = log_level
        self._source_dir = source_dir
        self._dest_dir = dest_dir
        self._allowed_interactions = {
            "communication": ["control"],
            "control": ["navigation", "servos", "cargo", "safety", "security"],
            "navigation": ["control", "safety"],
            "sitl": ["navigation"],
            "updater": ["security"],
        }
        self._log_message(1, "Монитор Безопасности создан")

    def _log_message(self, level, message):
        if level <= self._log_level:
            print(f"[{['ERROR', 'INFO', 'DEBUG'][level]}][SECURITY] {message}")

    def check_interaction(self, event: Event) -> bool:
        source = event.source.split('.')[0]
        destination = event.destination

        if source in ["planner", "sitl.mqtt"] or destination in ["planner", "sitl.mqtt"]:
            return True

        if source not in self._allowed_interactions or destination not in self._allowed_interactions.get(source, []):
            self._log_message(0, f"Недопустимое взаимодействие: {source} -> {destination}")
            return False
        return True

    def process_update(self, event: Event):
        if event.operation == "fetch_update":
            self._fetch_and_verify_update()
        elif event.operation == "use_new_version":
            self._load_new_version()
        elif event.operation == "sign_and_store":
            self._sign_and_store(event.parameters)

    def _fetch_and_verify_update(self):
        source_file = os.path.join(self._source_dir, "control_logic.py")
        if not os.path.exists(source_file):
            self._log_message(0, "Файл обновления не найден")
            return

        with open(source_file, 'rb') as f:
            content = f.read()
            file_hash = hashlib.sha256(content).hexdigest()

        sig_file = source_file + ".sig"
        if os.path.exists(sig_file):
            with open(sig_file, 'r') as f:
                expected_hash = f.read().strip()
                if file_hash != expected_hash:
                    self._log_message(0, "Подпись не совпадает, обновление отклонено")
                    return
                self._log_message(1, "Подпись подтверждена, копируем файл")
                dest_file = os.path.join(self._dest_dir, "control_logic.py")
                with open(dest_file, 'wb') as f_dest:
                    f_dest.write(content)
        else:
            self._log_message(0, "Файл подписи не найден")

    def _load_new_version(self):
        module_path = os.path.join(self._dest_dir, "control_logic.py")
        if not os.path.exists(module_path):
            self._log_message(0, "Новая версия не найдена")
            return

        spec = importlib.util.spec_from_file_location("control_logic", module_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        self._log_message(1, "Новая версия загружена и активирована")

    def _sign_and_store(self, content):
        file_hash = hashlib.sha256(content.encode()).hexdigest()
        sig_file = os.path.join(self._dest_dir, "control_logic.sig")
        with open(sig_file, 'w') as f:
            f.write(file_hash)
        self._log_message(1, "Цифровая подпись создана и сохранена")

# Инициализация монитора
queues_dir = QueuesDirectory()
security_monitor = SecurityMonitor(queues_dir)

[INFO][SECURITY] Монитор Безопасности создан


In [3]:
# Определение ControlSystemExecutor
class ControlSystemExecutor:
    def __init__(self, queues_dir, log_level=1):
        self._queues_dir = queues_dir
        self._log_level = log_level
        self._load_control_logic()
        self._log_message(1, "Модуль управления создан")

    def _log_message(self, level, message):
        if level <= self._log_level:
            print(f"[{['ERROR', 'INFO', 'DEBUG'][level]}][CONTROL] {message}")

    def _load_control_logic(self):
        module_path = os.path.join("./destination", "control_logic.py")
        if not os.path.exists(module_path):
            self._log_message(0, "Файл control_logic.py не найден, используется дефолтная логика")
            self._control_logic = lambda speed, direction: self._send_to_consumers(speed, direction)
        else:
            spec = importlib.util.spec_from_file_location("control_logic", module_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            self._control_logic = module.control_logic

    def _send_to_consumers(self, speed, direction):
        security_q = self._queues_dir.get_queue("security")
        event_speed = Event(source="control", destination="security", operation="set_speed", parameters=speed)
        event_direction = Event(source="control", destination="security", operation="set_direction", parameters=direction)
        security_q.put(event_speed)
        security_q.put(event_direction)

    def update_and_execute(self, speed, direction):
        self._load_control_logic()
        self._control_logic(speed, direction)

# Инициализация системы управления
control_system = ControlSystemExecutor(queues_dir)

[INFO][CONTROL] Модуль управления создан


In [4]:
# Пример выполнения обновления и управления
# Предполагается, что в ./source/control_logic.py есть файл с функцией control_logic,
# и рядом лежит control_logic.sig с правильным хэшем

# Имитация команды обновления
update_event = Event(source="updater", destination="security", operation="fetch_update", parameters=None)
security_monitor.process_update(update_event)

# Имитация использования новой версии
use_event = Event(source="updater", destination="security", operation="use_new_version", parameters=None)
security_monitor.process_update(use_event)

# Выполнение команды управления
control_system.update_and_execute(50, 90)

[INFO][SECURITY] Подпись подтверждена, копируем файл
[INFO][CONTROL] Управление: скорость=50, направление=90


In [5]:
# Остановка системы (очистка)
del security_monitor
del control_system
del queues_dir