# Основной Jupyter Notebook
Версия: _1.2.0 от 26.09.2025_
# Введение

## Соревновательная задача

Вам предстоит решить как задачи робота, так и задачи кибериммунности.

### Цели и предположения безопасности

## Цели безопасности
Устойчивость кибериммунных систем к атакам и сбоям выражается в том, что поставленные цели безопасности не нарушаются ни при каких условиях (то есть – выполняются задачи кибериммунности).  Для АНТС предъявлены следующие ЦБ:

1.	АНТС выполняет задачи только под управлением аутентичного программного обеспечения.
2.	АНТС выполняет только аутентичные команды Диспетчера.
3.	АНТС при любых обстоятельствах соблюдает ограничения из маршрутного задания и правила перемещения в особых зонах.
4.	АНТС при любых обстоятельствах предоставляет точные данные о своих действиях, намерениях и окружающих внешних факторах.

## Предположения безопасности
Гипотетическими разработчиками базового ПО модуля безопасности согласованы следующие предположения безопасности (далее – ПБ) – утверждения о смежных системах, которые снимают с разработчиков часть задач для обеспечения ЦБ:

1.	Только авторизованный персонал имеет физический доступ к критическим узлам АНТС и внешним вспомогательным системам, что исключает их компрометацию
2.	Только авторизованные операторы имеют доступ к системе планирования заданий. Эти операторы обладают необходимой квалификацией и благонадёжны, не имея намерений причинить ущерб системе или третьим лицам.

## Допущения, принятые в рамках учебной инфраструктуры
В рамках работы с учебной инфраструктурой, необходимой для выполнения КЗ, задача для конкурсантов была облегчена путём вноса следующих допущений (не являющихся правильными предположениями безопасности, но приемлемыми в учебном случае):

1.	Аппаратное обеспечение модуля безопасности и его исполнительные узлы являются доверенными компонентами.
2.	ПО модуля безопасности также является доверенным (исходя из условностей соревновательной задачи).

## Доверенные и недоверенные системные элементы
Согласно используемой архитектуре безопасности, системные элементы АНТС были разделены на два домена безопасности:
### Доверенные системные элементы:
1.	ПО и аппаратная составляющая модуля безопасности.
2.	Внешняя система навигации АНТС.
3.	Хранилище данных модуля безопасности.
4.	Система планирования заданий (интерфейс конкурсанта).
5.	Диспетчер АНТС.
6.	Среда симуляции (ЦД).

### Недоверенные системные элементы:
1.	ПО автопилота АНТС (включая хранилище данных).
2.	Исполнительные механизмы АНТС (приводы, захват).
3.	Система связи АНТС-Диспетчер.


## 1. Инициализация коннектора для системы выполнения кода

In [None]:
# Добавляем директорию для равномерного импорта всего с корневой локации
import sys

sys.path.append('../')

# Импортируем библиотеку, которая автоматически настраивает коннектор для системы выполнения кода
from src.libs.LCTWrapTwin import start_module

# 2. Решение задач робота и кибериммунности

В рамках решения задач, система предоставляет вам следующие заготовленные интерфейсы и возможности:

- `UserMissionHandler`, расширяющий класс `MissionHandler` : интерфейс для обработки кода для выполнения задач робота.  Он содержит следующие методы, которые помогут вам сразу перейти к решению задач, не затрачивая время на инициализацию среды:
    - `config_cyber_obstacles()` - функция, которая должна вернуть конфигурацию киберпрепятствий. Формат конфигурации должен быть представлен в виде словаря с булевыми значениями (пример указан в блоке кода ниже - его можно модифицировать).
    - `mission_code()` - функция, вызываемая автоматически при начале заезда (заезд начинается после подачи команды от АСО, через кнопку или через команду `/start`. Подробнее - в КЗ или через команду /help в АСО). В функции должна содержаться последовательность действий, требуемых для решения задач робота (аналогично примеру в блоке кода ниже).
- `UserTrustedHandler`, расширяющий класс `TrustedHandler` : интерфейс для обработки кода для выполнения задач робота.  Он содержит следующие методы, которые помогут вам сразу перейти к решению задач, не затрачивая время на инициализацию среды:
    - `make_next_short_message(prev_message)` - функция, которая должна вернуть короткое сообщение, автоматически отправляемое в АСО (см. КЗ, CybP_02). Возвращаться должна строка, методы генерации которой вам предстоит проработать.
    - `trusted_code()` - функция, вызываемая автоматически при начале заезда (заезд начинается после подачи команды от АСО, через кнопку или через команду `/start`. Подробнее - в КЗ или через команду /help в АСО). В функции должны содержаться блоки кода (функции, последовательности действий и т.д.), требуемые для решения задач кибериммунности (аналогично примеру в блоке кода ниже).

Также, вы сможете воспользоваться «Готовыми Функциями Автоматизации»<sup>TM</sup> от создателей «Межрегиональных соревнований»®, представленными в следующем блоке примеров кода.
> Конкретный формат данных, возвращаемых и принимаемых этими функциями был частично засекречен, так что вам придётся узнать его самостоятельно в процессе работы

### Функции, доступные в UserMissionHandler

```python
self.ap_hook.do_move({"x": float, "y": float})
```
- Отдаёт команду на перемещение робота в указанную точку на полигоне. Выполнение функции завершается в момент попадания робота в указанную точку с радиусом в 20 мм.

```python
self.ap_hook.do_rotate({"x": float, "y": float})
```
- Отдаёт команду на поворот робота в направлении указанной точки на полигоне. Выполнение функции завершается в момент совпадения векторов направлений робота и прямой, проведённой из точки его текущего положения к заданной точке с точностью в 2 градуса.

```python
self.ap_hook.do_wait(strategy: str = "flag"|"time", duration: float)
```
- Отдаёт команду на приостановку работы программы. В зависимости от параметра `strategy`, ожидание осуществляется до истечения указанного времени или до получения флага от Доверенного модуля (см. `set_ap_wait_lock_release()` в функциях для UserTrustedHandler).

```python
self.get_message_from_trusted_module() -> any
```
- Возвращает список значений, переданных МБ к АП, очищая буфер (см. `send_message_to_ap()` в функциях для UserTrustedHandler).

```python
self.set_barrier_toggle()
```
- Отправляет запрос на переключение статуса шалгбаума

```python
self.set_robot_speed(float)
```
- Отправляет запрос на установку скорости робота.

```python
self.set_brush_speed(int)
```
- Отправляет запрос на установку скорости вращения щётки робота.

```python
self.get_camera_frame()
```
- Возвращает кадр с камеры на носу робота в формате `np.uint8`

----------------------------------

### Функции, доступные в UserTrustedHandler

```python
self.get_ap_code_hash() -> dict
```
- Возвращает результат хеширования файлов автопилота на данный момент.

```python
self.set_ap_force_reset()
```
- Отправляет запрос на перезагрузку прошивки автопилота *(как оно происходит? кто его знает...)*.  Данные после перезагрузки восстанавливаются из защищённой от записи резервной копии.

```python
self.get_drive_data() -> dict
```
- Возвращает текущее состояние приводов робота, полученное от автопилота (по доверенному каналу связи).

```python
self.get_drive_data() -> dict
```
- Возвращает текущее состояние приводов робота, полученное от автопилота (по доверенному каналу связи).

```python
self.set_drive_force_reset(dict)
```
- Отправляет запрос на перезагрузку прошивки привода. Данные после перезагрузки восстанавливаются из защищённой от записи резервной копии.

```python
self.set_emergency_stop(bool)
```
- Аппаратным образом отключает ВСЕ приводы робота, используя встроенные механизмы МБ (True - выключить, False - включить).

```python
self.set_speed_controller_reset()
```
- Отправляет в АП запрос на сброс контроллера скорости, восстанавливая конфигурационные значения из защищённой от записи резервной копии.

```python
self.get_system_messages() -> dict
```
- Возвращает список сообщений системы АП, отправленных с момента последнего запроса.

```python
self.send_message_to_ap(any)
```
- Добавляет в буфер данных любой объект, который вы передадите в функцию. Доступ к буферу из АП может быть получен через `get_message_from_trusted_module()` в UserMissionHandler.

```python
self.set_ap_wait_lock_release()
```
- Отправляет в АП флаг, завершающий ожидание, заданное через `strategy="flag"` в `do_wait()`.

```python
self.get_robot_status() -> dict
```
- Возвращает текущий статус навигационной системы робота: координаты и угол поворота.

```python
self.get_camera_frame()
```
- Возвращает кадр с камеры на носу робота в формате `np.uint8`

```python
self.get_brush_speed()
```
- Возвращает текущую скорость вращения щётки робота.

```python
self.fix_brush_speed()
```
- Отправляет запрос на перезагрузку контроллера скорости вращения щётки робота (не смешно).

# Важно - конкурсная информация
Для вывода сообщений от вашего кода используйте функции `log()`, `warn()` и `error()` так, как показано в блоке ниже.
Это сохранит сообщения в файл.

Обычные `print()` не будут считаться абсолютно во всех случаях, когда потребуется вывод сообщений.

In [None]:
# ЭТОТ БЛОК КОДА МОЖНО И НУЖНО МОДИФИЦИРОВАТЬ
# Как обычно, подсказки - в импортах
import time
import cv2
import numpy as np
import threading

from src.libs.LCTWrapTwin.Modules.Handler import MissionHandler, TrustedHandler


class UserMissionHandler(MissionHandler):
    @staticmethod
    def config_cyber_obstacles():
        # Сервисная функция, в которой вы ОБЯЗАНЫ передать конфигурацию киберпрепятствий.
        # Изменять можно только сами значения флагов - если функция вернёт что-то кроме данного объекта, возникнет ошибка.

        return {
            "CybP_01": True,
            "CybP_02": True,
            "CybP_03": True,
            "CybP_04": True,
            "CybP_05": True,
            "CybP_06": True,
        }

    def detect_pedestrians(self, frame):
        # Распознавание пешеходов с помощью HOG
        hog = cv2.HOGDescriptor()
        hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
        boxes, weights = hog.detectMultiScale(frame, winStride=(8,8))
        return len(boxes) > 0

    def detect_barrier(self, frame):
        # Распознавание шлагбаума через обнаружение линий
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, minLineLength=100, maxLineGap=10)
        return lines is not None and len(lines) > 0

    def detect_traffic_light_color(self, frame):
        # Распознавание цвета светофора через NumPy
        h, w = frame.shape[:2]
        roi = frame[0:h//4, w//2 - 50: w//2 + 50]
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        # Красный
        lower_red = np.array([0,50,50])
        upper_red = np.array([10,255,255])
        mask_red = cv2.inRange(hsv, lower_red, upper_red)
        if cv2.countNonZero(mask_red) > 100:
            return 'red'
        # Зеленый
        lower_green = np.array([40,50,50])
        upper_green = np.array([80,255,255])
        mask_green = cv2.inRange(hsv, lower_green, upper_green)
        if cv2.countNonZero(mask_green) > 100:
            return 'green'
        return 'unknown'

    def check_speed_zone(self, x, y):
        # Проверка зоны скорости
        if x > 1.5 or y > 1.5:
            return 0.05
        return 0.1

    def mission_code(self):
        # Ваш код, функции и переменные для решения задач робота должны быть здесь.

        # Для вывода логов используйте эти функции - это позволит вам синхронизировать вывод сообщений с сохранением в файл
        self.lg.log("Сообщение обычное")
        self.lg.warn("Сообщение о предупреждении")
        self.lg.error("Сообщение об ошибке")

        self.lg.log("Запуск миссии с модулями безопасности")

        self.set_robot_speed(0.1)
        self.set_brush_speed(100)

        # Основной цикл миссии
        waypoints = [
            {"x": 0.6, "y": 0.6},
            {"x": 0.6, "y": 3.0},
            {"x": 0.6, "y": 0.6}
        ]

        for wp in waypoints:
            # Проверка камеры перед движением
            frame = self.get_camera_frame()
            if self.detect_pedestrians(frame):
                self.lg.warn("Пешеход обнаружен, остановка на 5 секунд")
                self.ap_hook.do_wait("time", 5)
            if self.detect_barrier(frame):
                self.lg.log("Шлагбаум обнаружен, запрос на открытие")
                self.set_barrier_toggle()
                self.ap_hook.do_wait("time", 2)  # Ждем открытия
            color = self.detect_traffic_light_color(frame)
            if color == 'red':
                self.lg.log("Красный свет, ожидание")
                self.ap_hook.do_wait("time", 10)
            elif color == 'green':
                self.lg.log("Зеленый свет, продолжаем")

            # Установка скорости в зависимости от зоны
            speed = self.check_speed_zone(wp["x"], wp["y"])
            self.set_robot_speed(speed)

            self.ap_hook.do_rotate(wp)
            self.ap_hook.do_move(wp)

    # Вы можете добавить ваши собственные функции, которые будут вызываться mission_code (подсказка - если вы вынесете функционал в отдельную функцию, его проще будет оценить в случае разногласий).
    # Автоматически вызывается только функция mission_code (один раз при вызове start_module())


class UserTrustedHandler(TrustedHandler):
    def __init__(self, context):
        super().__init__(context)
        self.expected_hash = None  # Для CybP_01
        self.drive_signatures = {}  # Для CybP_03

    def check_hash(self, current_hash):
        if self.expected_hash is None:
            self.expected_hash = current_hash
            return True
        return current_hash == self.expected_hash

    def check_drive_signature(self, drive_data):
        # Простая проверка подписи (CRC или хэш)
        import hashlib
        hash_func = hashlib.md5()
        hash_func.update(str(drive_data).encode())
        sig = hash_func.hexdigest()
        if 'drive' not in self.drive_signatures:
            self.drive_signatures['drive'] = sig
            return True
        return self.drive_signatures['drive'] == sig

    def visual_check_barrier(self, frame):
        # Визуальная проверка шлагбаума (простая - наличие линий)
        return self.detect_barrier(frame)  # Используем ту же функцию

    def detect_barrier(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, minLineLength=100, maxLineGap=10)
        return lines is not None and len(lines) > 0

    def is_speed_zone(self, x, y):
        return x > 1.5 or y > 1.5

    def trusted_code(self):
        # Ваш код, функции и переменные для решения задач кибериммунности должны быть здесь.

        self.lg.log("Запуск доверенного кода")

        def monitor():
            while True:
                # CybP_01: Проверка хэша кода АП
                hash_current = self.get_ap_code_hash()
                if not self.check_hash(hash_current):
                    self.lg.error("CybP_01: Код АП скомпрометирован")
                    self.set_ap_force_reset()

                # CybP_03: Проверка приводов
                drive = self.get_drive_data()
                if not self.check_drive_signature(drive):
                    self.lg.error("CybP_03: Приводы скомпрометированы")
                    self.set_drive_force_reset(drive)

                # CybP_04: Проверка скорости щетки
                brush_speed = self.get_brush_speed()
                if brush_speed > 120:
                    self.lg.warn("CybP_04: Скорость щетки нарушена")
                    self.fix_brush_speed()

                # CybP_05: Визуальная проверка шлагбаума
                frame = self.get_camera_frame()
                if not self.visual_check_barrier(frame):
                    self.lg.warn("CybP_05: Шлагбаум не открыт")

                # CybP_06: Проверка скорости в зонах
                status = self.get_robot_status()
                speed = status.get('speed', 0)
                if self.is_speed_zone(status['x'], status['y']) and speed > 0.05:
                    self.lg.warn("CybP_06: Нарушение скорости")
                    self.set_speed_controller_reset()

                # Отправка статуса в АП
                self.send_message_to_ap(status)

                time.sleep(1)

        threading.Thread(target=monitor, daemon=True).start()

    def make_next_short_message(self, prev_message: str):
        # Короткое сообщение для heartbeat (CybP_02)
        # Простая генерация на основе времени
        import hashlib
        msg = str(time.time()) + prev_message
        return hashlib.md5(msg.encode()).hexdigest()[:16]

    # Вы можете добавить ваши собственные функции, которые будут вызываться trusted_code (подсказка - если вы вынесете функционал в отдельную функцию, его проще будет оценить в случае разногласий).
    # Автоматически вызываются только функции trusted_code (один раз при вызове start_module()) и make_next_short_message (каждую секунду).

Для запуска заезда вам необходимо вызвать функцию `start_module()`, передав им ваши UserMissionHandler и UserTrustedHandler.

In [None]:
start_module(UserMissionHandler, UserTrustedHandler)

Завершить заезд можно при помощи команды `/stop`, подаваемой в АСО.

При желании начать новый заезд вам необходимо ввести команду `/reset`, которая сбросит инциализаторы АСО. Затем необходимо перезапустить ЦД (закрыть и открыть заново).

Перезапускать код робота удобнее всего при помощи функции перезапуска ядра Jupyter Notebook (стрелочка круглая) и выполнения всех ячеек кода (две стрелочки в панели инструментов).