# Кибериммунная автономность$\\$Создание конструктивно защищённого автономного наземного транспортного средства$\\$Модуль 3

## О документе

Версия 1.03

Модуль 3 для регионального этапа соревнований по кибериммунной автономности


### Модуль 3. Внедрение блоков функциональной и информационной безопасности

#### Общая идея

Ключевая идея кибериммунной автономности заключается в том, чтобы уже на этапе проектирования внедрить защиту от кибератак как снаружи, так и изнутри - т.е. через уязвимые бортовые информационные системы.
Мы исходим из того, что абсолютно защищённых систем не бывает, чем больше и сложнее код, тем проще одним ("плохим") в нём спрятать уязвимости, а другим ("хорошим", но уставшим) эти уязвимости не найти.

Поэтому необходимо сделать так, чтобы наиболее критичный код был отделён от некритичного и протестирован особенно тщательно - таким образом можно снизить вероятность успешной атаки и затруднить задачу атакующим.


#### Цели и предположения безопасности

##### Цели безопасности

Устойчивость кибериммунных систем к атакам выражается в том, что поставленные заказчиком **цели безопасности** (ЦБ) не нарушаются даже в условиях кибератак (снаружи и изнутри).

Для наших автономных машинок заказчиком поставлены следующие ЦБ:

1. При любых обстоятельствах АНТС осуществляет перемещения в пределах заданных ограничений

2. При любых обстоятельствах АНТС выполняет только аутентичный (подлинный) маршрут

3. При любых обстоятельствах АНТС оставляет груз только в авторизованном пункте назначения

Ниже используется цифро-буквенное обозначение целей безопасности - например, ЦБ1 - первая цель безопасности из списка выше.

##### Предположения безопасности

При этом заказчик согласовал с разработчиками следующие **предположения безопасности** (ПБ) - утверждения о смежных системах, которые снимают с разработчиков часть задач для обеспечения целей безопасности:

1. При любых обстоятельствах только авторизованный персонал имеет физический доступ к критическим узлам АНТС

2. При любых обстоятельствах только аутентичные и авторизованные операторы имеют доступ к системе планирования заданий

3. Аутентичные и авторизованные операторы обладают необходимой квалификацией и являются благонадёжными (т.е. не пытаются намеренно причинить ущерб системе или третьим лицам, используя доступ к АНТС)

Ниже используется цифро-буквенное обозначение предположений безопасности - например, ПБ1 - первое предположение безопасности из списка выше.


### Киберпрепятствия

Прежде чем мы начнём внедрять механизмы безопасности, посмотрим, что произойдёт с нашей машинкой, если бортовые системы начнут вести себя не так, как мы ожидаем.

Для этого активируем специальный режим работы системы управления и снова запустим машинку по тому же маршруту, который был в конце модуля 2.

После инициализации системы управления добавьте следующую строку

```python
control_system.enable_surprises()
```


Но вначале нужно из модуля 2 скопировать сюда ваши текущие наработки, а именно ваши реализации классов коммуникационного шлюза (CommunicationGateway), систему навигации (NavigationSystem), управления (ControlSystem), также будем использовать маршрут, который вы создали.


In [12]:
from src.queues_dir import QueuesDirectory
from src.mission_type import Mission, GeoSpecificSpeedLimit
from geopy import Point as GeoPoint
from src.wpl_parser import WPLParser


queues_dir = QueuesDirectory()

[ИНФО][QUEUES] создан каталог очередей


Если у вас настроена и работает СУПА, установите в True значение переменной afcs_present


In [13]:
afcs_present = True

parser = WPLParser("module2.wpl")

points = parser.parse()

home = points[0]

speed_limits = [
    GeoSpecificSpeedLimit(0, 20),
    GeoSpecificSpeedLimit(1, 60),
    GeoSpecificSpeedLimit(2, 50),
    GeoSpecificSpeedLimit(3, 20),
    GeoSpecificSpeedLimit(5, 60),
    GeoSpecificSpeedLimit(6, 20),
]

mission = Mission(home=home, waypoints=points, speed_limits=speed_limits, armed=True)

Поменяем идентификатор машинки для этого модуля


In [14]:
car_id = "m3"
afcs_present = True

In [15]:
from src.config import *

Итак, в отличие от, надеемся, успешного завершения маршрута в модуле 2, сейчас ваша машинка ведёт себя совсем иначе. Например, она внезапно едет в совершенно другую сторону (это особенно хорошо видно при использовании СУПА).
И это ещё не всё, что может пойти не так..

В этой реализации не очень много кода, теоретически, вы можете его весь изучить и устранить "незадокументированные возможности", однако в реальных системах кода гораздо больше. Например, кодовая база проекта с открытым исходным кодом ArduPilot, реализующего в том числе логику автопилота, составляет примерно миллион строк.

Сколько времени вам понадобится, чтобы изучить и протестировать миллион строк кода? А ведь придётся проводить анализ и каждого обновления.. При этом неприятности могут быть хорошо спрятанными и на первый взгляд безобидными, вряд ли злоумышленники облегчат задачу и назовут свои изменения, например, "hacking_the_system" или "allowing_secret_remote_access". Вредоносный код может выглядеть совершенно безобидно или маскироваться под программный дефект.

Поэтому тщательный анализ потребует большого количества ресурсов (человеческих, вычислительных, временныз), что стоит дорого.

Логичным выходом является выделение критичных для целей безопасности проверок в отдельный блок, который протестировать должно быть гораздо проще.


#### Задания

В этом модуле задание будет состоять из двух частей разной сложности:

**3.1. добавить новый блок - "Ограничитель"** - этот модуль должен будет обеспечить соблюдение ограничений, указанных в маршрутном задании - это необходимо для обеспечения ЦБ1

**З.2. _(задание повышенной сложности)_ добавить монитор безопасности (МБ)** - он должен контролировать все взаимодействия _бортовых систем_ (т.е. не включая планировщик и симулятор) и не давать блокам взаимодействовать друг с другом непредусмотренным образом - например, блок №5 может принимать команды только от №3, попытка передать команду в №5 от №2 должна блокироваться нашим монитором безопасности - потенциально это необходимо для обеспечения всех ЦБ


Для выполнения первой части задачи 3.1. необходимо для начала изменить архитектуру АНТС как показано на рис. 5

![Архитектура системы с ограничителем](images/ciac-arch-m31.png)

Рис. 5. Архитектура АНТС с ограничителем (для ЦБ1)


Код базового класса Ограничителя находится в файле src/safety_block.py

По аналогии с заданием в модуле 1, необходимо создать на основе этого класса свой собственный под названием SafetyBlock, в котором реализовать передачу команд в приводы движения, только перед эти необходимо ещё и проконтролировать их безопасность.

Также необходимо реализовать передачу маршрутного задания из CommunicationGateway в SafetyBlock и координат из модуля Navigation в соответствии со стрелками на рис.5 выше.

Если всё сделано правильно, то Ограничителю будет известно следующее

- \_mission - текущее маршрутное задание
- \_position - текущие координаты

После проверки нужно установить значение внутренних переменных

- \_speed - текущая (допустимая) скорость
- \_direction - текущее (допустимое) направление

Желательно завершить следование по маршруту. В критической ситуации, если продолжение маршрута невозможно, следует установить значение скорости и направления в 0, после чего передать эти команды в блок приводов (Servos)


Базовый сценарий для новой архитектуры можно представить следующим образом (см. рис. 6)

![Базовый сценарий с блоком безопасности](images/basic-scenario-with-safety-block1.png)

Рис. 6. Базовый сценарий с блоком безопасности


Заготовка кода SafetyBlock

```python
class SafetyBlock(BaseSafetyBlock):
    """ класс ограничений безопасности """

    def _set_new_direction(self, direction: float):
        """ установка нового направления перемещения """
        self._log_message(LOG_INFO, f"текущие координаты: {self._position}")
        self._log_message(LOG_DEBUG, f"маршрутное задание: {self._mission}")
        self._log_message(LOG_DEBUG, f"состояние маршруте: {self._route}")
        # TODO реализовать контроль безопасности изменения направления
        self._direction = direction
        self._send_direction_to_consumers()

    def _set_new_speed(self, speed: float):
        """ установка новой скорости """
        # TODO реализовать контроль безопасности изменения скорости
        self._speed = speed
        self._send_speed_to_consumers()


    def _send_speed_to_consumers(self):
        self._log_message(LOG_DEBUG, "отправляем скорость получателям")
        servos_q_name = SERVOS_QUEUE_NAME
        servos_q: Queue = self._queues_dir.get_queue(servos_q_name)

        # отправка сообщения с желаемой скоростью
        event_speed = Event(source=self.event_source_name,
                            destination=servos_q_name,
                            operation="set_speed",
                            parameters=self._speed
                            )
        servos_q.put(event_speed)

    def _send_direction_to_consumers(self):
        self._log_message(LOG_DEBUG, "отправляем направление получателям")

        servos_q_name = SERVOS_QUEUE_NAME
        servos_q: Queue = self._queues_dir.get_queue(servos_q_name)

        # отправка сообщения с желаемой скоростью
        event_speed = Event(source=self.event_source_name,
                            destination=servos_q_name,
                            operation="set_direction",
                            parameters=self._direction
                            )
        servos_q.put(event_speed)
```


In [16]:
from src.safety_block import BaseSafetyBlock
import math
from geopy import Point as GeoPoint
from src.mission_type import GeoSpecificSpeedLimit
from src.event_types import Event
from multiprocessing import Queue


class SafetyBlock(BaseSafetyBlock):

    def _set_new_direction(self, direction: float):
        """
        Установка нового направления перемещения

        Args:
            direction (float): напрваление полученное от системного блока

        Returns:
            None:
        """

        self._log_message(LOG_INFO, "Установка нового направления")

        # Проверка завершен ли маршрут
        if self._route.route_finished:
            self._direction = 0
            self._send_direction_to_consumers()
            return

        next_point = self._route.next_point()
        correct_direction = self._calculate_bearing(self._position, next_point)

        direction_diff = abs(direction - correct_direction)

        # Проверяем на сколько отличается расчитанное направления от полученного
        if direction_diff > 5 and direction_diff < 355:
            self._log_message(LOG_INFO, "Принудительное изменение направления")
            self._direction = correct_direction
        else:
            self._direction = direction

        self._send_direction_to_consumers()

    def _set_new_speed(self, speed: float):
        """
        Установка новой скорости

        Args:
            speed (float): значение скорости полученное от системы контроля

        Returns:
            None
        """

        if speed > self._route.calculate_speed():
            self._log_message(LOG_INFO, "Принудительное изменение скорости")
            speed_limits = self._route.calculate_speed()
            self._speed = speed_limits
        else:
            self._speed = speed

        self._send_speed_to_consumers()

    def _lock_cargo(self, _):
        """блокировка грузового отсека"""

    def _release_cargo(self, _):
        """разблокировка грузового отсека"""

    def _send_speed_to_consumers(self):
        """Метод для отправки скорости в привод движения"""

        event_to_consumers = Event(
            self.event_source_name,
            SERVOS_QUEUE_NAME,
            "set_speed",
            parameters=self._speed,
        )

        servos_q: Queue = self._queues_dir.get_queue(SERVOS_QUEUE_NAME)
        servos_q.put(event_to_consumers)

    def _send_direction_to_consumers(self):
        """
        Метод для отправки направления в блок привода
        """

        event_to_consumers = Event(
            self.event_source_name,
            SERVOS_QUEUE_NAME,
            "set_direction",
            parameters=self._direction,
        )

        servos_q: Queue = self._queues_dir.get_queue(SERVOS_QUEUE_NAME)
        servos_q.put(event_to_consumers)

    def _send_lock_cargo_to_consumers(self):
        pass

    def _send_release_cargo_to_consumers(self):
        pass

    def _calculate_bearing(self, start: GeoPoint, end: GeoPoint) -> float:
        """_calculate_bearing возвращает направление перемещения

        Args:
            start (GeoPoint): откуда
            end (GeoPoint): куда

        Returns:
            float: направление в градусах 0..360
        """
        delta_longitude = end.longitude - start.longitude
        x = math.sin(math.radians(delta_longitude)) * math.cos(
            math.radians(end.latitude)
        )
        y = math.cos(math.radians(start.latitude)) * math.sin(
            math.radians(end.latitude)
        ) - math.sin(math.radians(start.latitude)) * math.cos(
            math.radians(end.latitude)
        ) * math.cos(
            math.radians(delta_longitude)
        )

        initial_bearing_rad = math.atan2(x, y)

        # Преобразуем радианы в градусы
        initial_bearing_deg = math.degrees(initial_bearing_rad)

        # Нормализуем значение в диапазоне [0, 360]
        compass_bearing = (initial_bearing_deg + 360) % 360

        return compass_bearing

### Блок говна

Класс CommunicationGateway был сделан для говна

1. Классная функция отправки работает на божьей силе
1. Вторая классная функция работает просто так

In [17]:
from src.config import *
from src.communication_gateway import BaseCommunicationGateway
from src.event_types import Event
from multiprocessing import Queue


class CommunicationGateway(BaseCommunicationGateway):

    def _send_mission_to_consumers(self):
        """
        Метод для отправки данных в систему управления и блока безопасности

        """

        # Создание события для блока безопасности
        event_to_safety = Event(
            self.event_source_name,
            SAFETY_BLOCK_QUEUE_NAME,
            operation="set_mission",
            parameters=self._mission,
        )

        # Создания события для системы контроля
        event_to_control = Event(
            self.event_source_name,
            CONTROL_SYSTEM_QUEUE_NAME,
            operation="set_mission",
            parameters=self._mission,
        )

        # Получение очереди блока безопасности
        safety_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)

        # Отправка события в очередь блока безопасности
        safety_q.put(event_to_safety)

        # Получение очереди системы управление
        control_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)

        # Отправка события в очередь
        control_q.put(event_to_control)


from src.control_system import BaseControlSystem


class ControlSystem(BaseControlSystem):

    def _send_speed_and_direction_to_consumers(self, speed: float, direction: float):
        """
        Метод для отправки значений скорости и направление в систему привода

        Args:
            speed (float): значение скорости
            direction (float): значение направления в градусах

        Returns:
            None: ничего не возвращает
        """

        # Создание события для отправки скорости
        event_to_servos_speed = Event(
            self.event_source_name,
            SAFETY_BLOCK_QUEUE_NAME,
            operation="set_speed",
            parameters=speed,
        )

        # Создание события для отправки напрваление
        event_to_servos_direction = Event(
            self.event_source_name,
            SAFETY_BLOCK_QUEUE_NAME,
            operation="set_direction",
            parameters=direction,
        )

        # Получение очереди привода управления
        safety_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)

        # Отправка событий в очередь
        safety_q.put(event_to_servos_direction)
        safety_q.put(event_to_servos_speed)

    def _release_cargo(self):
        """Метод для отправки команды открытия грузового отсека"""

        event_to_cargo = Event(
            self.event_source_name,
            SAFETY_BLOCK_QUEUE_NAME,
            operation="release_cargo",
            parameters=None,
        )

        safety_block: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        safety_block.put(event_to_cargo)

    def _lock_cargo(self):
        """Метод для отправки команды закрытия грузового отсека"""

        event_to_cargo = Event(
            self.event_source_name,
            SAFETY_BLOCK_QUEUE_NAME,
            operation="lock_cargo",
            parameters=None,
        )

        # Получение очереди привода управления
        safety_block: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)

        # Отправка событий в очередь
        safety_block.put(event_to_cargo)


from src.navigation_system import BaseNavigationSystem


class NavigationSystem(BaseNavigationSystem):
    def _send_position_to_consumers(self):
        """Метод для отправки позиции в систему управления"""

        # Создание события для обновления позиции
        event_to_control = Event(
            self.event_source_name,
            CONTROL_SYSTEM_QUEUE_NAME,
            operation="position_update",
            parameters=self._position,
        )

        event_to_safety = Event(
            self.event_source_name,
            SAFETY_BLOCK_QUEUE_NAME,
            operation="position_update",
            parameters=self._position,
        )

        safety_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)

        safety_q.put(event_to_safety)
        # Получение очереди привода управления
        control_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)

        # Отправка событий в очередь
        control_q.put(event_to_control)

In [18]:
navigation_system = NavigationSystem(queues_dir=queues_dir)
control_system = ControlSystem(queues_dir=queues_dir)
communication_gateway = CommunicationGateway(queues_dir=queues_dir)

[ИНФО][QUEUES] регистрируем очередь navigation
[ИНФО][NAVIGATION] создан компонент навигации
[ИНФО][QUEUES] регистрируем очередь control
[ИНФО][CONTROL] создана система управления
[ИНФО][QUEUES] регистрируем очередь communication
[ИНФО][COMMUNICATION] создан компонент связи


In [19]:
from src.mission_planner_mqtt import MissionSender
from src.mission_planner import MissionPlanner
from src.sitl_mqtt import TelemetrySender
from src.sitl import SITL
from src.cargo_bay import CargoBay
from src.servos import Servos
from src.system_wrapper import SystemComponentsContainer
from time import sleep

# mission_sender = MissionSender(queues_dir=queues_dir, client_id=car_id)
# telemetry_sender = TelemetrySender(queues_dir=queues_dir, client_id=car_id)

# sitl = SITL(
#     queues_dir=queues_dir, position=home, car_id=car_id, post_telemetry=afcs_present
# )
# mission_planer = MissionPlanner(
#     queues_dir=queues_dir, afcs_present=afcs_present, mission=mission
# )
# cargo = CargoBay(queues_dir=queues_dir)
# servos = Servos(queues_dir=queues_dir)

# safety_block = SafetyBlock(queues_dir=queues_dir)

# system_wrapper = SystemComponentsContainer(
#     [
#         mission_sender,
#         telemetry_sender,
#         sitl,
#         mission_planer,
#         navigation_system,
#         servos,
#         cargo,
#         communication_gateway,
#         control_system,
#         safety_block,
#     ]
# )

# control_system.enable_surprises()
# system_wrapper.start()

# sleep(100)

# system_wrapper.stop()

# system_wrapper.clean()

#### Обеспечение ЦБ3: контроль доставки груза

С помощью логики ограничителя выше мы реализовали обеспечение ЦБ1 (соблюдение заданных ограничений), однако есть ещё ЦБ3 (отгрузка только в заданной точке), не говоря о ЦБ2 (следование только по аутентичному маршруту).

Для обеспечения ЦБ3 необходимо сделать следующее

1. изменить последовательность передачи команд, чтобы в блок управления грузовым отсеком (CargoBay) команды приходили только после проверки в Ограничителе
2. доработайте реализацию Ограничителя, чтобы команда на выгрузку передавалась в CargoBay только в конечной точке маршрута
3. продумайте логику работы блока безопасности, если система управления полностью перестанет работать (т.е. перестанет отправлять новые команды).

Изменённая архитектура приведена на рис. 7.

![Ограничитель для обеспечения ЦБ1 и ЦБ3](images/ciac-arch-m312.png)

Рис. 7. Ограничитель для обеспечения ЦБ1 и ЦБ3


In [20]:
class SafetyBlock(BaseSafetyBlock):

    def _set_new_direction(self, direction: float):
        """
        Установка нового направления перемещения

        Args:
            direction (float): напрваление полученное от системного блока

        Returns:
            None:
        """

        self._log_message(LOG_INFO, "Установка нового направления")

        # Проверка завершен ли маршрут
        if self._route.route_finished:
            self._direction = 0
            self._send_direction_to_consumers()
            return

        next_point = self._route.next_point()
        correct_direction = self._calculate_bearing(self._position, next_point)

        direction_diff = abs(direction - correct_direction)

        # Проверяем на сколько отличается расчитанное направления от полученного
        if direction_diff > 5 and direction_diff < 355:
            self._log_message(LOG_INFO, "Принудительное изменение направления")
            self._direction = correct_direction
        else:
            self._direction = direction

        self._send_direction_to_consumers()

    def _set_new_speed(self, speed: float):
        """
        Установка новой скорости

        Args:
            speed (float): значение скорости полученное от системы контроля

        Returns:
            None
        """

        if speed > self._route.calculate_speed():
            self._log_message(LOG_INFO, "Принудительное изменение скорости")
            speed_limits = self._route.calculate_speed()
            self._speed = speed_limits
        else:
            self._speed = speed

        self._send_speed_to_consumers()

    def _lock_cargo(self, _):
        """
        Блокировка грузового отсека

        Разрешаем блокировку всегда, потому что вы не дали условия
        """
        self._send_lock_cargo_to_consumers()

    def _release_cargo(self, _):
        """разблокировка грузового отсека"""

        if self._route.route_finished:
            self._log_message(LOG_DEBUG, "По кайфу, можно разгружать")
            self._send_release_cargo_to_consumers()
        else:
            self._log_message(LOG_INFO, "ИДИ НАХУЙ")

    def _send_speed_to_consumers(self):
        """Метод для отправки скорости в привод движения"""

        event_to_consumers = Event(
            self.event_source_name,
            SERVOS_QUEUE_NAME,
            "set_speed",
            parameters=self._speed,
        )

        servos_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        servos_q.put(event_to_consumers)

    def _send_direction_to_consumers(self):
        """
        Метод для отправки направления в блок привода
        """

        event_to_consumers = Event(
            self.event_source_name,
            SERVOS_QUEUE_NAME,
            "set_direction",
            parameters=self._direction,
        )

        servos_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        servos_q.put(event_to_consumers)

    def _send_lock_cargo_to_consumers(self):
        """Метод для разблокировку грузового отсека"""

        event_to_cargo = Event(
            self.event_source_name,
            CARGO_BAY_QUEUE_NAME,
            operation="lock_cargo",
            parameters=None,
        )

        cargo_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        cargo_q.put(event_to_cargo)

    def _send_release_cargo_to_consumers(self):
        """Метод для отправки события открытия грузового отсека"""

        event_to_cargo = Event(
            self.event_source_name,
            CARGO_BAY_QUEUE_NAME,
            operation="release_cargo",
            parameters=None,
        )
        cargo_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        cargo_q.put(event_to_cargo)

    def _calculate_bearing(self, start: GeoPoint, end: GeoPoint) -> float:
        """_calculate_bearing возвращает направление перемещения

        Args:
            start (GeoPoint): откуда
            end (GeoPoint): куда

        Returns:
            float: направление в градусах 0..360
        """
        delta_longitude = end.longitude - start.longitude
        x = math.sin(math.radians(delta_longitude)) * math.cos(
            math.radians(end.latitude)
        )
        y = math.cos(math.radians(start.latitude)) * math.sin(
            math.radians(end.latitude)
        ) - math.sin(math.radians(start.latitude)) * math.cos(
            math.radians(end.latitude)
        ) * math.cos(
            math.radians(delta_longitude)
        )

        initial_bearing_rad = math.atan2(x, y)

        # Преобразуем радианы в градусы
        initial_bearing_deg = math.degrees(initial_bearing_rad)

        # Нормализуем значение в диапазоне [0, 360]
        compass_bearing = (initial_bearing_deg + 360) % 360

        return compass_bearing

#### _Опциональное задание повышенной сложности_ Обеспечение ЦБ2

**Перед началом работы над этой задачей рекомендуем сохранить все текущие изменения в git репозитории (git commit, git push).**

Напомним цель безопасности №2 (ЦБ2): "При любых обстоятельствах АНТС выполняет только аутентичный (подлинный) маршрут"

Обеспечение ЦБ2 потребует

- внедрения механизма **контроля аутентичности маршрутного задания** (mission), а значит
- помимо изменения **блока безопасности** (SafetyBlock), модификации
  - системы **планирования задания** (MissionPlanner)
  - **коммуникационного шлюза** (CommunicationGateway)
  - и, возможно, **системы управления** (ControlSystem).

Для решения этой задачи допустимо менять код соответствующих модулей в папке src.

Возможным решением является внедрение механизма цифровой подписи, верификацией которой будет заниматься блок безопасности. В этой ситуации если коммуникационный шлюз (блок "1. Связь" на рис. 5 в модуле 1) оказался скомпрометированным и изменил маршрутное задание, оно не должно пройти проверку подлинности в блоке безопасности.

**Рекомендуем эту задачу делать в последнюю очередь**, _после успешного выполнения модуля 4_, поскольку успешное выполнение модуля 4 принесёт больше баллов, чем внедрение цифровой подписи и обеспечение ЦБ2.
К сведению, реализованные в этой версии киберпрепятствия не предполагают компрометацию маршрутного задания.

Конечно, если будут решены все основные задачи и к тому же обеспечен контроль аутентичности маршрутного задания, такая работа с высокой вероятностью окажется лучшей.


In [21]:
from src.mission_type import Mission, GeoSpecificSpeedLimit
from geopy import Point
import json
import hashlib
from copy import deepcopy


class MissionSignature:

    def __serilaze_mission(mission) -> dict:

        mission_dict = {}
        points_list = []
        speed_limits = []

        for key, value in mission.__dict__.items():

            if key == "signature":
                continue

            if key == "waypoints":

                for value_point in value:
                    value_point: Point
                    points_list.append(value_point.format())
                    
                mission_dict[key] = points_list

            elif key == "home":
                value: Point
                mission_dict[key] = value.format()

            elif key == "speed_limits":
                for value_speed_limit in value:
                    value_speed_limit: GeoSpecificSpeedLimit
                    speed_limits.append(
                        {
                            "speed_limit": value_speed_limit.speed_limit,
                            "waypoint_index": value_speed_limit.waypoint_index,
                        }
                    )

                mission_dict[key] = speed_limits

        mission_serilaze = json.dumps(mission_dict, sort_keys=True).encode()

        return mission_serilaze

    @classmethod
    def signature_mission(cls, mission: Mission, secret_key: str) -> Mission:

        mission_serilaze = cls.__serilaze_mission(mission=mission)
        mission_copy = deepcopy(mission)
        signature = hashlib.sha256(mission_serilaze + secret_key.encode()).hexdigest()
        mission_copy.signature = signature

        return mission_copy

    @classmethod
    def verify_signature(cls, mission: Mission, secret_key: str) -> Mission:

        mission_serilaze = cls.__serilaze_mission(mission=mission)
        mission_copy = deepcopy(mission)
        signature = hashlib.sha256(mission_serilaze + secret_key.encode()).hexdigest()

        if mission_copy.signature != signature:
            return False
        
        return True


class SafetyBlock(BaseSafetyBlock):

    def _set_new_direction(self, direction: float):
        """
        Установка нового направления перемещения

        Args:
            direction (float): напрваление полученное от системного блока

        Returns:
            None:
        """
        # На случай компрометации миссии, мы не выполняем эти команды пока не придет правильная миссия
        if self._ignore_event_speed_direction:
            self._log_message(LOG_ERROR, 'Выполнение команды невозможно, миссия скомпроментирована')
            return

        # Проверка завершен ли маршрут
        if self._route.route_finished:
            self._direction = 0
            self._send_direction_to_consumers()
            return

        next_point = self._route.next_point()
        correct_direction = self._calculate_bearing(self._position, next_point)

        direction_diff = abs(direction - correct_direction)

        # Проверяем на сколько отличается расчитанное направления от полученного
        if direction_diff > 5 and direction_diff < 355:
            self._log_message(LOG_INFO, "Принудительное изменение направления")
            self._direction = correct_direction
        else:
            self._direction = direction

        self._send_direction_to_consumers()

    def _set_new_speed(self, speed: float):
        """
        Установка новой скорости

        Args:
            speed (float): значение скорости полученное от системы контроля

        Returns:
            None
        """

        # На случай компрометации миссии, мы не выполняем эти команды пока не придет правильная миссия
        if self._ignore_event_speed_direction:
            self._log_message(LOG_ERROR, 'Выполнение команды невозможно, миссия неправильная')
            return

        if speed > self._route.calculate_speed():
            self._log_message(LOG_INFO, "Принудительное изменение скорости")
            speed_limits = self._route.calculate_speed()
            self._speed = speed_limits
        else:
            self._speed = speed

        self._send_speed_to_consumers()

    def _lock_cargo(self, _):
        """
        Блокировка грузового отсека

        Разрешаем блокировку всегда, потому что вы не дали условия
        """
        self._send_lock_cargo_to_consumers()

    def _release_cargo(self, _):
        """разблокировка грузового отсека"""

        if self._route.route_finished:
            self._log_message(LOG_DEBUG, "По кайфу, можно разгружать")
            self._send_release_cargo_to_consumers()
        else:
            self._log_message(LOG_INFO, "ИДИ НАХУЙ")

    def _send_speed_to_consumers(self):
        """Метод для отправки скорости в привод движения"""

        event_to_consumers = Event(
            self.event_source_name,
            SERVOS_QUEUE_NAME,
            "set_speed",
            parameters=self._speed,
        )

        servos_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        servos_q.put(event_to_consumers)

    def _send_direction_to_consumers(self):
        """
        Метод для отправки направления в блок привода
        """

        event_to_consumers = Event(
            self.event_source_name,
            SERVOS_QUEUE_NAME,
            "set_direction",
            parameters=self._direction,
        )

        servos_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        servos_q.put(event_to_consumers)

    def _send_lock_cargo_to_consumers(self):
        """Метод для разблокировку грузового отсека"""

        event_to_cargo = Event(
            self.event_source_name,
            CARGO_BAY_QUEUE_NAME,
            operation="lock_cargo",
            parameters=None,
        )

        cargo_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        cargo_q.put(event_to_cargo)

    def _send_release_cargo_to_consumers(self):
        """Метод для отправки события открытия грузового отсека"""

        event_to_cargo = Event(
            self.event_source_name,
            CARGO_BAY_QUEUE_NAME,
            operation="release_cargo",
            parameters=None,
        )
        cargo_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        cargo_q.put(event_to_cargo)

    def _calculate_bearing(self, start: GeoPoint, end: GeoPoint) -> float:
        """_calculate_bearing возвращает направление перемещения

        Args:
            start (GeoPoint): откуда
            end (GeoPoint): куда

        Returns:
            float: направление в градусах 0..360
        """
        delta_longitude = end.longitude - start.longitude
        x = math.sin(math.radians(delta_longitude)) * math.cos(
            math.radians(end.latitude)
        )
        y = math.cos(math.radians(start.latitude)) * math.sin(
            math.radians(end.latitude)
        ) - math.sin(math.radians(start.latitude)) * math.cos(
            math.radians(end.latitude)
        ) * math.cos(
            math.radians(delta_longitude)
        )

        initial_bearing_rad = math.atan2(x, y)

        # Преобразуем радианы в градусы
        initial_bearing_deg = math.degrees(initial_bearing_rad)

        # Нормализуем значение в диапазоне [0, 360]
        compass_bearing = (initial_bearing_deg + 360) % 360

        return compass_bearing


### Задание 3.2. Монитор безопасности


В текущей реализации каждый блок сам решает куда и какие данные ему отправлять. Это приемлемо, когда каждый блок рассматривается как благонадёжный. Однако нельзя исключать ситуацию успешной компрометации какого-либо из блоков (одного или нескольких), в результате которой злоумышленник может получить полный контроль над частью системы.

В такой ситуации необходимо выстроить внутреннюю "поверхность защиты", частью этой системы уже является блок "Ограничитель", однако необходимо сделать так, чтобы соответствующие запросы проходили только через него, а не в обход.

Для решения этой задачи необходимо добавить новый блок "Монитор безопасности". Его задача - обеспечить контроль направления информационных потоков, допустить передачу данных только между теми блоками и в тех направлениях, которые задуманы архитектором системы. Тогда, например, попытка передать команду из системы управления напрямую в сервоприводы в обход ограничителя будет заблокирована.

Возможно, возникнет вопрос, а что помешает злоумышленнику отправить команду в обход этого монитора безопасности. В рамках этой учебной инфраструктуры особо ничего, однако при разработке реальной системы можно использовать специализированные микроядерные операционные системы, такие как, например, KasperskyOS, которые обеспечат обязательное вовлечение монитора безопасности в контроль взаимодействия между блоками.

Архитектура системы для решения задачи 3.2 должна выглядеть как на рис.8:

![Архитектура системы с монитором безопасности](images/ciac-arch-m32.png)

Рис. 8. Архитектура бортовых систем с монитором безопасности


Базовый класс модуля безопасности находится в файле src/security_monitor.py, его менять не нужно.

В следующей ячейке находится реализация класса SecurityMonitor


In [22]:
from src.security_monitory import BaseSecurityMonitor
from src.security_policy_type import SecurityPolicy


class SecurityMonitor(BaseSecurityMonitor):
    
    def __init__(self, queues_dir: QueuesDirectory):
        
        default_policies = [
            # Система связи -> Блок безопасности: миссия
            SecurityPolicy(
                COMMUNICATION_GATEWAY_QUEUE_NAME,
                SAFETY_BLOCK_QUEUE_NAME,
                'set_mission'
            ),
            # Система связи -> система контроля: миссия
            SecurityPolicy(
                COMMUNICATION_GATEWAY_QUEUE_NAME,
                CONTROL_SYSTEM_QUEUE_NAME,
                'set_mission'
            ),
            # Система контроля -> Блок безопасности: скорость
            SecurityPolicy(
                CONTROL_SYSTEM_QUEUE_NAME,
                SAFETY_BLOCK_QUEUE_NAME,
                'set_speed'
            ),
            # Система контроля -> Блок безопасности: направление
            SecurityPolicy(
                CONTROL_SYSTEM_QUEUE_NAME,
                SAFETY_BLOCK_QUEUE_NAME,
                'set_direction'
            ),
            # Система навигации -> Система контроля: позиция
            SecurityPolicy(
                NAVIGATION_QUEUE_NAME,
                CONTROL_SYSTEM_QUEUE_NAME,
                'position_update'
            ),
            # Система навигации -> Блок безопасности: позиция
            SecurityPolicy(
                NAVIGATION_QUEUE_NAME,
                SAFETY_BLOCK_QUEUE_NAME,
                'position_update'
            ),
            # Блок безопасности -> Привод движения: скорость
            SecurityPolicy(
                SAFETY_BLOCK_QUEUE_NAME,
                SERVOS_QUEUE_NAME,
                'set_speed'
            ),
            # Блок безопасности -> Привод движени: направление
            SecurityPolicy(
                SAFETY_BLOCK_QUEUE_NAME,
                SERVOS_QUEUE_NAME,
                'set_direction'
            ),
            # Блок безопасности -> Грузовой отсек: команда на открытие
            SecurityPolicy(
                SAFETY_BLOCK_QUEUE_NAME,
                CARGO_BAY_QUEUE_NAME,
                'release_cargo'
            ),
            # Блок безопасности -> Грузовой отсек: команда на закрытие
            SecurityPolicy(
                SAFETY_BLOCK_QUEUE_NAME,
                CARGO_BAY_QUEUE_NAME,
                'lock_cargo'
            )
        ]
        
        super().__init__(queues_dir)
        
        self._security_policies = []
        self._set_security_policies(default_policies)

    def _check_event(self, event: Event):
        """проверка события на допустимость политиками безопасности"""
        
        event_security_policie = SecurityPolicy(
            source=event.source,
            destination=event.destination,
            operation=event.operation
        )
        
        if event_security_policie in self._security_policies:
            return True
        
        return False
    
    def _set_security_policies(self, new_policies: list[SecurityPolicy]):
        self._security_policies += new_policies

Обратите внимание на этот код:

```python
    def _init_set_security_policies(self):
        """ инициализация политик безопасности """
        default_policies = [
            SecurityPolicy(
                source=COMMUNICATION_GATEWAY_QUEUE_NAME,
                destination=CONTROL_SYSTEM_QUEUE_NAME,
                operation='set_mission')
        ]
        self.set_security_policies(policies=default_policies)
```

В нём инициализируется политика безопасности, разрешающая запрос set_mission от коммуникационного шлюза в систему управления.

Код политики безопасности в строка 6-9:

```python
SecurityPolicy(
                source=COMMUNICATION_GATEWAY_QUEUE_NAME,
                destination=CONTROL_SYSTEM_QUEUE_NAME,
                operation='set_mission')
```

Шаблон политики безопасности описан в файле src/security_policy_type.py:

```python
@dataclass
class SecurityPolicy:
    """ политика безопасности """
    source: str         # отправитель запроса
    destination: str    # получатель
    operation: str      # запрашиваемая операция

```

По аналогии в методе \_init_set_security_policies допишите другие политики безопасности, чтобы разрешить взаимодействия согласно диаграмме на рис.7. выше.


Теперь необходимо изменить взаимодействие между блоками так, чтобы оно всегда проходило через монитор безопасности. Для этого потребуется изменить соответствующие реализации внутренних блоков.


Ниже пример для блока навигации. Обратите внимание на выделенные строки и комментарии.

```python
from src.navigation_system import BaseNavigationSystem
from src.config import CONTROL_SYSTEM_QUEUE_NAME, SECURITY_MONITOR_QUEUE_NAME


class NavigationSystem(BaseNavigationSystem):
    """ класс навигационного блока """
    def _send_position_to_consumers(self):
        # событие для системы управления,
        # поэтому в destination указываем CONTROL_SYSTEM_QUEUE_NAME
        event = Event(source=self.event_source_name,
>>>                      destination=CONTROL_SYSTEM_QUEUE_NAME,
                      operation="position_update", parameters=self._position
                      )
        # но событие отправляем сначала на проверку в монитор безопасности,
        # поэтому используем очередь с именем SECURITY_MONITOR_QUEUE_NAME
>>>        security_monitor_q: Queue = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event)
        # а в систему управления наше событие монитор безопасности отправит сам, если найдёт разрешающую эту операцию политику безопасности

navigation_system = NavigationSystem(queues_dir=queues_dir)
```


from src.navigation_system import BaseNavigationSystem
from src.config import CONTROL_SYSTEM_QUEUE_NAME, SECURITY_MONITOR_QUEUE_NAME

class NavigationSystem(BaseNavigationSystem):
""" класс навигационного блока """
def \_send_position_to_consumers(self):  
 event = Event(source=self.event_source_name,
destination=CONTROL_SYSTEM_QUEUE_NAME,
operation="position_update", parameters=self.\_position
)
security_monitor_q: Queue = self.\_queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
security_monitor_q.put(event)

navigation_system = NavigationSystem(queues_dir=queues_dir)


В файле tests/module/test_security_monitor.py можно увидеть тест логики работы монитора безопасности. Запустить его можно следующей командой:


In [23]:
!pytest -svk security_monitor

platform linux -- Python 3.12.3, pytest-8.3.5, pluggy-1.5.0 -- /home/user/p/.venv/bin/python3
cachedir: .pytest_cache
rootdir: /mnt/duma2
configfile: pytest.ini
testpaths: tests
plugins: anyio-4.9.0
collected 1 item                                                               [0m

tests/module/test_security_monitor.py::test_security_policies [ИНФО][QUEUES] создан каталог очередей
[ИНФО][QUEUES] регистрируем очередь security
[ИНФО][SECURITY] создан монитор безопасности
[ИНФО][SECURITY] изменение политик безопасности: [SecurityPolicy(source='communication', destination='control', operation='set_mission')]
[ИНФО][SECURITY] изменение политик безопасности: [SecurityPolicy(source='communication', destination='control', operation='set_mission'), SecurityPolicy(source='communication', destination='control', operation='set_mission'), SecurityPolicy(source='communication', destination='safety', operation='set_mission'), SecurityPolicy(source='control', destination='safety', operation='set_spee

В одном кодовом блоке ниже (или нескольких - как будет удобнее) сделайте следующее

1. Измените реализацию монитора безопасности в части правил проверки передаваемых сообщений в мониторе безопасности
2. Измените параметры отправки сообщений между элементами бортовых систем (блоками 1-7), чтобы они всегда отправлялись в очередь монитора безопасности (отправитель и получатель при этом не меняются)
3. После всех изменений убедитесь в том, что машинка по-прежнему успешно выполняет маршрутное задание и доставляет груз
4. Дополнительные баллы принесут автоматические тесты правил безопасности в файле tests/module/test_security_monitor.py - можно проверить, что все новые политики работают как ожидаются, а недопустимые взаимодействия (т.е. таких стрелок нет на рис. 8) блокируются


In [24]:
mission_sender = MissionSender(queues_dir=queues_dir, client_id=car_id)
telemetry_sender = TelemetrySender(queues_dir=queues_dir, client_id=car_id)

sitl = SITL(
    queues_dir=queues_dir, position=home, car_id=car_id, post_telemetry=afcs_present
)
mission_planer = MissionPlanner(
    queues_dir=queues_dir, afcs_present=afcs_present, mission=mission
)
cargo = CargoBay(queues_dir=queues_dir)
servos = Servos(queues_dir=queues_dir)

safety_block = SafetyBlock(queues_dir=queues_dir)

security_monitor = SecurityMonitor(queues_dir)

system_wrapper = SystemComponentsContainer(
    [   security_monitor,
        mission_sender,
        telemetry_sender,
        sitl,
        mission_planer,
        navigation_system,
        servos,
        cargo,
        communication_gateway,
        control_system,
        safety_block,
    ]
)

control_system.enable_surprises()
system_wrapper.start()

sleep(100)

system_wrapper.stop()

system_wrapper.clean()

[ИНФО][QUEUES] регистрируем очередь planner.mqtt
[ИНФО][QUEUES] регистрируем очередь sitl.mqtt
[ИНФО][QUEUES] регистрируем очередь sitl
[ИНФО][SITL] симулятор создан, ID m3
[ИНФО][QUEUES] регистрируем очередь planner
[ИНФО][MISSION PLANNER] создана система планирования заданий
[ИНФО][QUEUES] регистрируем очередь cargo
[ИНФО][CARGO] создан компонент грузового отсека, отсек заблокирован
[ИНФО][QUEUES] регистрируем очередь servos
[ИНФО][SERVOS] создан компонент сервоприводов
[ИНФО][QUEUES] регистрируем очередь safety
[ИНФО][SAFETY] создан ограничитель
[ИНФО][QUEUES] регистрируем очередь security
[ИНФО][SECURITY] создан монитор безопасности
[ИНФО][SECURITY] старт блока грузового отсека
[ИНФО][MISSION_PLANNER.MQTT] старт клиента телеметрии
[ИНФО][MISSION_PLANNER.MQTT] клиент отправки маршрута создан и запущен[ИНФО][SITL.MQTT] старт клиента телеметрии

[ИНФО][SITL.MQTT] клиент отправки телеметрии создан и запущен[ИНФО][SITL] [SITL] старт симуляции

[ИНФО][MISSION PLANNER] старт системы плани

На этом модуль 3 завершён!

Если у вас всё получилось, то вы создали защищённую автономную машинку - ещё уже не так-то просто использовать для нанесения критического ущерба людям и имуществу, потому что самые критические режимы проверяются вашим Ограничителем, а если вы ещё и внедрили Монитор Безопасности, то теперь злоумышленнику гораздо сложнее атаковать внутренние блоки, даже если он смог проникнуть в какую-то бортовую систему или внедрить вредоносный код.

Осталось проверить эффективность получившейся защиты на полной трассе - в этом и заключается модуль 4 задания, для перехода к которому нужно открыть блокнот с именем cyberimmunity--autonomous-car-m4. Успеха!
