# Основной Jupyter Notebook
Версия: _1.2.0 от 26.09.2025_
# Введение

## Соревновательная задача

Вам предстоит решить как задачи робота, так и задачи кибериммунности.

### Цели и предположения безопасности

## Цели безопасности
Устойчивость кибериммунных систем к атакам и сбоям выражается в том, что поставленные цели безопасности не нарушаются ни при каких условиях (то есть – выполняются задачи кибериммунности).  Для АНТС предъявлены следующие ЦБ:

1.	АНТС выполняет задачи только под управлением аутентичного программного обеспечения.
2.	АНТС выполняет только аутентичные команды Диспетчера.
3.	АНТС при любых обстоятельствах соблюдает ограничения из маршрутного задания и правила перемещения в особых зонах.
4.	АНТС при любых обстоятельствах предоставляет точные данные о своих действиях, намерениях и окружающих внешних факторах.

## Предположения безопасности
Гипотетическими разработчиками базового ПО модуля безопасности согласованы следующие предположения безопасности (далее – ПБ) – утверждения о смежных системах, которые снимают с разработчиков часть задач для обеспечения ЦБ:

1.	Только авторизованный персонал имеет физический доступ к критическим узлам АНТС и внешним вспомогательным системам, что исключает их компрометацию
2.	Только авторизованные операторы имеют доступ к системе планирования заданий. Эти операторы обладают необходимой квалификацией и благонадёжны, не имея намерений причинить ущерб системе или третьим лицам.

## Допущения, принятые в рамках учебной инфраструктуры
В рамках работы с учебной инфраструктурой, необходимой для выполнения КЗ, задача для конкурсантов была облегчена путём вноса следующих допущений (не являющихся правильными предположениями безопасности, но приемлемыми в учебном случае):

1.	Аппаратное обеспечение модуля безопасности и его исполнительные узлы являются доверенными компонентами.
2.	ПО модуля безопасности также является доверенным (исходя из условностей соревновательной задачи).

## Доверенные и недоверенные системные элементы
Согласно используемой архитектуре безопасности, системные элементы АНТС были разделены на два домена безопасности:
### Доверенные системные элементы:
1.	ПО и аппаратная составляющая модуля безопасности.
2.	Внешняя система навигации АНТС.
3.	Хранилище данных модуля безопасности.
4.	Система планирования заданий (интерфейс конкурсанта).
5.	Диспетчер АНТС.
6.	Среда симуляции (ЦД).

### Недоверенные системные элементы:
1.	ПО автопилота АНТС (включая хранилище данных).
2.	Исполнительные механизмы АНТС (приводы, захват).
3.	Система связи АНТС-Диспетчер.


## 1. Инициализация коннектора для системы выполнения кода

In [1]:
# Добавляем директорию для равномерного импорта всего с корневой локации
import sys

sys.path.append('../')

# Импортируем библиотеку, которая автоматически настраивает коннектор для системы выполнения кода
from src.libs.LCTWrapTwin import start_module

# 2. Решение задач робота и кибериммунности

В рамках решения задач, система предоставляет вам следующие заготовленные интерфейсы и возможности:

- `UserMissionHandler`, расширяющий класс `MissionHandler` : интерфейс для обработки кода для выполнения задач робота.  Он содержит следующие методы, которые помогут вам сразу перейти к решению задач, не затрачивая время на инициализацию среды:
    - `config_cyber_obstacles()` - функция, которая должна вернуть конфигурацию киберпрепятствий. Формат конфигурации должен быть представлен в виде словаря с булевыми значениями (пример указан в блоке кода ниже - его можно модифицировать).
    - `mission_code()` - функция, вызываемая автоматически при начале заезда (заезд начинается после подачи команды от АСО, через кнопку или через команду `/start`. Подробнее - в КЗ или через команду /help в АСО). В функции должна содержаться последовательность действий, требуемых для решения задач робота (аналогично примеру в блоке кода ниже).
- `UserTrustedHandler`, расширяющий класс `TrustedHandler` : интерфейс для обработки кода для выполнения задач робота.  Он содержит следующие методы, которые помогут вам сразу перейти к решению задач, не затрачивая время на инициализацию среды:
    - `make_next_short_message(prev_message)` - функция, которая должна вернуть короткое сообщение, автоматически отправляемое в АСО (см. КЗ, CybP_02). Возвращаться должна строка, методы генерации которой вам предстоит проработать.
    - `trusted_code()` - функция, вызываемая автоматически при начале заезда (заезд начинается после подачи команды от АСО, через кнопку или через команду `/start`. Подробнее - в КЗ или через команду /help в АСО). В функции должны содержаться блоки кода (функции, последовательности действий и т.д.), требуемые для решения задач кибериммунности (аналогично примеру в блоке кода ниже).

Также, вы сможете воспользоваться «Готовыми Функциями Автоматизации»<sup>TM</sup> от создателей «Межрегиональных соревнований»®, представленными в следующем блоке примеров кода.
> Конкретный формат данных, возвращаемых и принимаемых этими функциями был частично засекречен, так что вам придётся узнать его самостоятельно в процессе работы

### Функции, доступные в UserMissionHandler

```python
self.ap_hook.do_move({"x": float, "y": float})
```
- Отдаёт команду на перемещение робота в указанную точку на полигоне. Выполнение функции завершается в момент попадания робота в указанную точку с радиусом в 20 мм.

```python
self.ap_hook.do_rotate({"x": float, "y": float})
```
- Отдаёт команду на поворот робота в направлении указанной точки на полигоне. Выполнение функции завершается в момент совпадения векторов направлений робота и прямой, проведённой из точки его текущего положения к заданной точке с точностью в 2 градуса.

```python
self.ap_hook.do_wait(strategy: str = "flag"|"time", duration: float)
```
- Отдаёт команду на приостановку работы программы. В зависимости от параметра `strategy`, ожидание осуществляется до истечения указанного времени или до получения флага от Доверенного модуля (см. `set_ap_wait_lock_release()` в функциях для UserTrustedHandler).

```python
self.get_message_from_trusted_module() -> any
```
- Возвращает список значений, переданных МБ к АП, очищая буфер (см. `send_message_to_ap()` в функциях для UserTrustedHandler).

```python
self.set_barrier_toggle()
```
- Отправляет запрос на переключение статуса шалгбаума

```python
self.set_robot_speed(float)
```
- Отправляет запрос на установку скорости робота.

```python
self.set_brush_speed(int)
```
- Отправляет запрос на установку скорости вращения щётки робота.

```python
self.get_camera_frame()
```
- Возвращает кадр с камеры на носу робота в формате `np.uint8`

----------------------------------

### Функции, доступные в UserTrustedHandler

```python
self.get_ap_code_hash() -> dict
```
- Возвращает результат хеширования файлов автопилота на данный момент.

```python
self.set_ap_force_reset()
```
- Отправляет запрос на перезагрузку прошивки автопилота *(как оно происходит? кто его знает...)*.  Данные после перезагрузки восстанавливаются из защищённой от записи резервной копии.

```python
self.get_drive_data() -> dict
```
- Возвращает текущее состояние приводов робота, полученное от автопилота (по доверенному каналу связи).

```python
self.get_drive_data() -> dict
```
- Возвращает текущее состояние приводов робота, полученное от автопилота (по доверенному каналу связи).

```python
self.set_drive_force_reset(dict)
```
- Отправляет запрос на перезагрузку прошивки привода. Данные после перезагрузки восстанавливаются из защищённой от записи резервной копии.

```python
self.set_emergency_stop(bool)
```
- Аппаратным образом отключает ВСЕ приводы робота, используя встроенные механизмы МБ (True - выключить, False - включить).

```python
self.set_speed_controller_reset()
```
- Отправляет в АП запрос на сброс контроллера скорости, восстанавливая конфигурационные значения из защищённой от записи резервной копии.

```python
self.get_system_messages() -> dict
```
- Возвращает список сообщений системы АП, отправленных с момента последнего запроса.

```python
self.send_message_to_ap(any)
```
- Добавляет в буфер данных любой объект, который вы передадите в функцию. Доступ к буферу из АП может быть получен через `get_message_from_trusted_module()` в UserMissionHandler.

```python
self.set_ap_wait_lock_release()
```
- Отправляет в АП флаг, завершающий ожидание, заданное через `strategy="flag"` в `do_wait()`.

```python
self.get_robot_status() -> dict
```
- Возвращает текущий статус навигационной системы робота: координаты и угол поворота.

```python
self.get_camera_frame()
```
- Возвращает кадр с камеры на носу робота в формате `np.uint8`

```python
self.get_brush_speed()
```
- Возвращает текущую скорость вращения щётки робота.

```python
self.fix_brush_speed()
```
- Отправляет запрос на перезагрузку контроллера скорости вращения щётки робота (не смешно).

# Важно - конкурсная информация
Для вывода сообщений от вашего кода используйте функции `log()`, `warn()` и `error()` так, как показано в блоке ниже.
Это сохранит сообщения в файл.

Обычные `print()` не будут считаться абсолютно во всех случаях, когда потребуется вывод сообщений.

In [2]:
import time
import hashlib
import cv2
import numpy as np
from src.libs.LCTWrapTwin.Modules.Handler import MissionHandler, TrustedHandler

class UserMissionHandler(MissionHandler):
    @staticmethod
    def config_cyber_obstacles():
        
        return {
            "CybP_01": True,  
            "CybP_02": True,  
            "CybP_03": True,  
            "CybP_04": True,  
            "CybP_05": True,  
            "CybP_06": True,  
        }

    def mission_code(self):
        """Основной код миссии робота"""
        
        self.lg.log("Начало выполнения миссии уборки снега")
        
        self.set_robot_speed(0.15)  
        self.set_brush_speed(100)   
        
        waypoints = [
            
            {"x": 0.6, "y": 0.6},
            {"x": 3.0, "y": 0.6},
            {"x": 3.0, "y": 1.0},
            {"x": 0.6, "y": 1.0},
            
            
            {"x": 0.6, "y": 1.4},
            {"x": 3.0, "y": 1.4},
            {"x": 3.0, "y": 1.8},
            {"x": 0.6, "y": 1.8},
            
            
            {"x": 0.6, "y": 2.2},
            {"x": 3.0, "y": 2.2},
            {"x": 3.0, "y": 2.6},
            {"x": 0.6, "y": 2.6},
            
            
            {"x": 0.6, "y": 0.6}
        ]
        
        for i, point in enumerate(waypoints):
            self.lg.log(f"Движение к точке {i+1}/{len(waypoints)}: {point}")
            
            
            self.check_for_obstacles()
            
            
            self.ap_hook.do_rotate(point)
            
            # Проверяем зоны перед движением
            current_zone = self.get_current_zone(point)
            if current_zone == "pedestrian":
                self.handle_pedestrian_zone()
            elif current_zone == "speed_limited":
                self.handle_speed_limited_zone()
            elif current_zone == "barrier":
                self.handle_barrier_zone()
            
            self.ap_hook.do_move(point)
            
            
            trusted_messages = self.get_message_from_trusted_module()
            if trusted_messages:
                self.process_trusted_messages(trusted_messages)
        
        self.lg.log("Миссия завершена, возврат в сервисную зону")
        self.set_brush_speed(0)  

    def check_for_obstacles(self):
        """Проверка препятствий с помощью компьютерного зрения"""
        frame = self.get_camera_frame()
        if frame is not None:

            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            
            
            lower_red = np.array([0, 120, 70])
            upper_red = np.array([10, 255, 255])
            mask = cv2.inRange(hsv, lower_red, upper_red)
            
            if np.sum(mask) > 5000:  
                self.lg.warn("Обнаружен пешеход, остановка")
                self.ap_hook.do_wait("time", 3.0)

    def get_current_zone(self, point):
        """Определение текущей зоны на основе координат"""
        x, y = point["x"], point["y"]
        
        if 1.2 <= x <= 1.8 and 1.2 <= y <= 1.8:
            return "pedestrian"
        elif 2.0 <= x <= 2.4 and 0.8 <= y <= 1.2:
            return "speed_limited"
        elif x == 1.6 and y == 2.0:
            return "barrier"
        return "normal"

    def handle_pedestrian_zone(self):
        """Обработка пешеходной зоны"""
        self.lg.log("Вход в пешеходную зону")
        self.set_robot_speed(0.05)  

    def handle_speed_limited_zone(self):
        """Обработка зоны ограниченной скорости"""
        self.lg.log("Вход в зону ограниченной скорости")
        self.set_robot_speed(0.1)

    def handle_barrier_zone(self):
        """Обработка зоны со шлагбаумом"""
        self.lg.log("Подъезд к шлагбауму")
        self.set_barrier_toggle()
        self.ap_hook.do_wait(strategy="time", duration=2.0)

    def process_trusted_messages(self, messages):
        """Обработка сообщений от доверенного модуля"""
        for msg in messages:
            if msg.get("type") == "emergency_stop":
                self.lg.error("Получена команда экстренной остановки")
                self.ap_hook.do_wait(strategy="flag", duration=10.0)
            elif msg.get("type") == "speed_correction":
                new_speed = msg.get("speed", 0.1)
                self.lg.warn(f"Корректировка скорости: {new_speed}")
                self.set_robot_speed(new_speed)


class UserTrustedHandler(TrustedHandler):
    def __init__(self, context):
        super().__init__(context)
        self.last_ap_hash = None
        self.message_counter = 0
        self.drive_signatures = {}
        self.normal_brush_speed = 100
        
    def trusted_code(self):
        """Основной код модуля безопасности"""
        
        self.lg.log("CybO_01: Инициализация модуля безопасности")
        
        while True:
            try:
                # CybP_01: Проверка целостности кода автопилота
                self.check_autopilot_integrity()
                
                # CybP_03: Проверка подписей приводов
                self.check_drive_signatures()
                
                # CybP_04: Мониторинг навесного оборудования
                self.monitor_brush_speed()
                
                # CybP_05: Проверка шлагбаумов (визуально)
                self.verify_barrier_status()
                
                # CybP_06: Контроль скоростного режима
                self.monitor_speed_limits()
                
                time.sleep(60)  
                
            except Exception as e:
                self.lg.error(f"Ошибка в trusted_code: {e}")
                time.sleep(1)  

    def check_autopilot_integrity(self):
        """CybP_01: Проверка целостности кода автопилота"""
        current_hash = self.get_ap_code_hash()
        
        if self.last_ap_hash is None:
            self.last_ap_hash = current_hash
            self.lg.log("CybP_01: Инициализация хеша автопилота")
            return
        
        if current_hash != self.last_ap_hash:
            self.lg.error("CybP_01: Обнаружена компрометация кода автопилота!")
            
            # Экстренная остановка
            self.set_emergency_stop(True)
            time.sleep(0.1)
            
            # Перезагрузка автопилота
            self.set_ap_force_reset()
            self.lg.log("CybP_01: Выполнена перезагрузка автопилота")
            
            # Возобновление работы
            time.sleep(1.0)
            self.set_emergency_stop(False)
            self.last_ap_hash = self.get_ap_code_hash()

    def check_drive_signatures(self):
        """CybP_03: Проверка подписей приводов"""
        try:
            drive_data = self.get_drive_data()
            
            if not drive_data:
                return
            for drive_info in drive_data:
                self.process_single_drive(drive_info)
                        
        except Exception as e:
            self.lg.error(f"CybP_03: Ошибка проверки приводов: {e}")
    
    def process_single_drive(self, drive_info):
        """Обработка данных одного привода"""
        drive_id = drive_info.get("drive_id", "unknown")
        signature = drive_info.get("signature")
        serial = drive_info.get("serial")
        
        if serial and signature is not None:
            expected_sig = self.calculate_crc8(str(serial).encode())
            
            if signature != expected_sig:
                self.lg.warn(f"CybP_03: Некорректная подпись привода {drive_id}")
                
                # Перезагрузка привода
                self.set_drive_force_reset({"drive_id": drive_id})
                self.lg.log(f"CybP_03: Перезагрузка привода {drive_id}")

    def monitor_brush_speed(self):
        """CybP_04: Мониторинг скорости щетки"""
        try:
            current_speed = self.get_brush_speed().get("speed")
            
            if current_speed is None:
                return
                    
            if current_speed > self.normal_brush_speed * 1.5:
                self.lg.warn(f"CybP_04: Превышение скорости щетки: {current_speed}")
                
                
                self.fix_brush_speed()
                
                
                self.send_message_to_ap({
                    "type": "brush_anomaly",
                    "action": "speed_reset"
                })
        except Exception as e:
            self.lg.error(f"CybP_04: Ошибка мониторинга щетки: {e}")

    def verify_barrier_status(self):
        """CybP_05: Визуальная проверка шлагбаумов"""
        frame = self.get_camera_frame()
        
        if frame is None:
            return
        
        gray = np.dot(frame[...,:3], [0.299, 0.587, 0.114]).astype(np.uint8)
        
        
        height, width = gray.shape
        mid_height = height // 2
        
        
        mid_section = gray[mid_height-10:mid_height+10, :]
        avg_brightness = np.mean(mid_section)
        
        if avg_brightness < 100:  
            self.lg.warn("CybP_05: Обнаружен закрытый шлагбаум!")
            
            # Блокируем движение
            self.send_message_to_ap({
                "type": "emergency_stop",
                "reason": "barrier_closed"
            })

    def monitor_speed_limits(self):
        """CybP_06: Контроль скоростного режима"""
        try:
            robot_status = self.get_robot_status()
            
            if not robot_status:
                return
            
            
            if isinstance(robot_status, dict):
                x = robot_status.get("x", 0)
                y = robot_status.get("y", 0) 
                speed = robot_status.get("speed", 0)
            elif isinstance(robot_status, (list, tuple)) and len(robot_status) >= 3:
                x, y, speed = robot_status[0], robot_status[1], robot_status[2]
            else:
                return
            
            
            try:
                x = float(x)
                y = float(y)
                speed = float(speed)
            except (ValueError, TypeError):
                return
            
            
            max_speed = self.get_max_speed_for_zone(x, y)
            
            if speed > max_speed:
                self.lg.warn(f"CybP_06: Превышение скорости: {speed} > {max_speed}")
                
                
                self.set_speed_controller_reset()
                
                
                self.send_message_to_ap({
                    "type": "speed_correction",
                    "speed": max_speed * 0.9
                })
        except Exception as e:
            self.lg.error(f"CybP_06: Ошибка контроля скорости: {e}")

    def get_max_speed_for_zone(self, x, y):
        """Определение максимальной скорости для зоны"""
        
        if 1.2 <= x <= 1.8 and 1.2 <= y <= 1.8:
            return 0.05
        
        elif 2.0 <= x <= 2.4 and 0.8 <= y <= 1.2:
            return 0.1
        
        return 0.2

    def make_next_short_message(self, prev_message: str):
        """CybP_02: Генерация коротких сообщений для heartbeat"""
        
        if not prev_message:
            return "INIT"
        
        
        crc = self.calculate_crc8(prev_message.encode())
        
        self.message_counter += 1
        
        
        if self.message_counter % 10 == 0:
            # Каждое 10-е сообщение проверяем более тщательно
            system_msgs = self.get_system_messages()
            if system_msgs and "comm_error" in str(system_msgs):
                self.lg.warn("CybP_02: Обнаружена компрометация канала связи")
                
                
                self.set_emergency_stop(True)
                time.sleep(5)
                self.set_emergency_stop(False)
        
        return f"{crc:02x}"

    @staticmethod
    def calculate_crc8(data):
        """Вычисление CRC8 контрольной суммы"""
        crc = 0
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x80:
                    crc = (crc << 1) ^ 0x07
                else:
                    crc <<= 1
                crc &= 0xFF
        return crc

Для запуска заезда вам необходимо вызвать функцию `start_module()`, передав им ваши UserMissionHandler и UserTrustedHandler.

In [None]:
start_module(UserMissionHandler, UserTrustedHandler)

[2025-10-02 23:37:09]::[AwC]::INIT::LCT-WRAP-CLIENT, версия 1.2.0
[2025-10-02 23:37:09]::[AwC]::INIT::UDP(c) <- (uid=KRdP) инициализирован.
[2025-10-02 23:37:09]::[AwC]::INIT::UDP(c) <- (uid=XSwB) инициализирован.
[2025-10-02 23:37:09]::[AwC]::INIT::Загрузка завершена.
[2025-10-02 23:37:09]::[AwC]::INIT::UDP(c) -> (uid=aJdc) инициализирован.
[2025-10-02 23:37:09]::[AwC]::INFO::(AP) Заезд инициализирован - ожидание старта
[2025-10-02 23:37:09]::[AwC]::INIT::Адрес сервера (command_receiver): http://127.0.0.1:13500
[2025-10-02 23:37:09]::[AwC]::INFO::(TM) Заезд инициализирован - ожидание старта
[2025-10-02 23:37:26]::[AwC]::INFO::Код заезда инициализирован
[2025-10-02 23:37:26]::[AwC]::INFO::Начало выполнения миссии уборки снега
[2025-10-02 23:37:26]::[AwC]::INFO::Движение к точке 1/13: {'x': 0.6, 'y': 0.6}
[2025-10-02 23:37:26]::[AwC]::INFO::Доверенный код инициализирован
[2025-10-02 23:37:26]::[AwC]::INFO::CybO_01: Инициализация модуля безопасности
[2025-10-02 23:37:26]::[AwC]::INFO::Cy

Exception in thread Thread-11 (mission_code):
Traceback (most recent call last):
  File "C:\Python312\Lib\threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "G:\Projects\LCT_hack\lct-client\.venv\Lib\site-packages\ipykernel\ipkernel.py", line 772, in run_closure
    _threading_Thread_run(self)
  File "C:\Python312\Lib\threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\karim\AppData\Local\Temp\ipykernel_16336\3371923906.py", line 59, in mission_code
  File "C:\Users\karim\AppData\Local\Temp\ipykernel_16336\3371923906.py", line 97, in check_for_obstacles
AttributeError: 'AGTSHookAp' object has no attribute 'do_wait'


[2025-10-02 23:38:16]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:16]::[AwC]::INFO::Движение к точке 3/13: {'x': 3.0, 'y': 1.0}
[2025-10-02 23:38:16]::[AwC]::WARN::Обнаружен пешеход, остановка
[2025-10-02 23:38:17]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:17]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:18]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:18]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:19]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:19]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:20]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:20]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:21]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:21]::[AwC]::WARN::CybP_04: Превышение скорости щетки: 100
[2025-10-02 23:38:2

Завершить заезд можно при помощи команды `/stop`, подаваемой в АСО.

При желании начать новый заезд вам необходимо ввести команду `/reset`, которая сбросит инциализаторы АСО. Затем необходимо перезапустить ЦД (закрыть и открыть заново).

Перезапускать код робота удобнее всего при помощи функции перезапуска ядра Jupyter Notebook (стрелочка круглая) и выполнения всех ячеек кода (две стрелочки в панели инструментов).