# Кибериммунная автономность$\\$Создание конструктивно защищённого автономного наземного транспортного средства$\\$Модуль 4

## О документе

Версия 1.03

Модуль 4 для регионального этапа соревнований по кибериммунной автономности

### Модуль 4. Следование по трассе с киберпрепятствиями

Для успешного выполнения этого задания необходимое активировать специальный режим работы виртуальной машинки - в рамках этого задания можно менять только блоки, отвечающие за безопасность (ограничитель, монитор безопасности), **другие блоки менять запрещено**.

При прохождении маршрута будут имитироваться атаки со стороны злоумышленников, которые будут пытаться нарушить цели безопасности. Важно, чтобы им это не удалось.

1. Запустите свою машинку и убедитесь, что она проходит всю трассу без нарушений ограничений скорости. При необходимости измените логику работы блока безопасности
2. Добавьте контроль доставки груза в модуле SafetyBlock - убедитесь, что груз доставляется до конечной точки маршрута.
3. Если в модуле 3 вы реализовали монитор безопасности - не забудьте его перенести в этот модуль, это принесёт дополнительные баллы!

Активация киберпрепятствий в системе управления:

после инициализации системы управления добавьте следующую строку
```python
control_system.enable_surprises()
```

В этом блоке добавьте все ваши реализации изменённых бортовых систем

In [None]:
from src.queues_dir import QueuesDirectory
from src.security_monitory import BaseSecurityMonitor
from src.security_policy_type import SecurityPolicy
from src.communication_gateway import BaseCommunicationGateway
from src.control_system import BaseControlSystem
from src.safety_block import BaseSafetyBlock
from src.config import LOG_ERROR, LOG_INFO, LOG_DEBUG
from src.navigation_system import BaseNavigationSystem
from src.config import CONTROL_SYSTEM_QUEUE_NAME, SECURITY_MONITOR_QUEUE_NAME, SAFETY_BLOCK_QUEUE_NAME, COMMUNICATION_GATEWAY_QUEUE_NAME, NAVIGATION_QUEUE_NAME, CARGO_BAY_QUEUE_NAME, SERVOS_QUEUE_NAME
from src.event_types import Event
import math



class SafetyBlock(BaseSafetyBlock):
    """ класс ограничений безопасности """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._cargo_locked = True

    def _set_new_direction(self, direction: float):
        """ установка нового направления перемещения """
       # self._log_message(LOG_INFO, f"текущие координаты: {self._position}")
       # self._log_message(LOG_DEBUG, f"маршрутное задание: {self._mission}")
       # self._log_message(LOG_DEBUG, f"состояние маршруте: {self._route}")
        
        if not self._mission or not self._position or not self._route:
            self._log_message(LOG_ERROR, "Неизвестный маршрут или положение")
            self._stop_afcs()
            self._send_speed_to_consumers()
            self._send_lock_cargo_to_consumers()
            return
        
        if not self._route.next_point():
            self._log_message(LOG_INFO, "Нет следующей точки маршрута.")
            self._stop_afcs()
            self._send_direction_to_consumers()
            self._send_speed_to_consumers()
            self._send_lock_cargo_to_consumers()
            return       

        next_direction = self._calculate_direction()
        error_direction = 10

        if (abs(next_direction - direction) > error_direction):
            self._log_message(LOG_ERROR, f"Внештатное изменение направления движения. Запрошенное {direction}, расчётное {next_direction}")
            self._send_lock_cargo_to_consumers()

        self._direction = next_direction
        self._send_direction_to_consumers()

    def _set_new_speed(self, speed: float):
        """ установка новой скорости """
        
        if not self._mission or not self._position or not self._route or not self._route.next_point():
            self._stop_afcs()
            self._send_speed_to_consumers()
            self._send_lock_cargo_to_consumers()
            return
        
        allowed_speed = self._route.calculate_speed()
        if allowed_speed < speed:
            self._log_message(LOG_ERROR, f"Внештатное превышение скорости: {speed}. Разрешенная скорость: {allowed_speed}")
            self._speed = allowed_speed
            self._send_lock_cargo_to_consumers()
        else:
            self._speed = speed
        
        self._send_speed_to_consumers()
        


    def _send_speed_to_consumers(self):
        self._log_message(LOG_DEBUG, "отправляем скорость получателям")
        servos_q_name = SERVOS_QUEUE_NAME

        # отправка сообщения с желаемой скоростью
        event_speed = Event(
            source=self.event_source_name,
            destination=servos_q_name,
            operation="set_speed",
            parameters=self._speed
        )

        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event_speed)

    def _send_direction_to_consumers(self):
        self._log_message(LOG_DEBUG, "отправляем направление получателям")

        servos_q_name = SERVOS_QUEUE_NAME

        event_direction = Event(
            source=self.event_source_name,
            destination=servos_q_name,
            operation="set_direction",
            parameters=self._direction
        )
        
        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event_direction)
    
    def _stop_afcs(self):
        self._direction = 0
        self._speed = 0
        self._log_message(LOG_ERROR, "АНТС принудительно остановлено.")

    def _calculate_direction(self):
        current_lattitude = math.radians(self._position.latitude)
        next_lattitude = math.radians(self._route.next_point().latitude)
        delta_longtitude = math.radians(self._route.next_point().longitude - self._position.longitude)

        x = math.sin(delta_longtitude) * math.cos(next_lattitude)
        y = math.cos(current_lattitude) * math.sin(next_lattitude) - \
            math.sin(current_lattitude) * math.cos(next_lattitude) * math.cos(delta_longtitude)

        return (math.degrees(math.atan2(x, y)) + 360) % 360
    
    def _lock_cargo(self, _):
        self._send_lock_cargo_to_consumers()

    def _release_cargo(self, _):
        if not self._mission or not self._position or not self._route:
            self._log_message(LOG_ERROR, "Неизвестный путь или местоположение!")
            self._speed = 0
            self._send_speed_to_consumers()
            self._send_lock_cargo_to_consumers()
            return 

        if not self._route.route_finished:
            self._log_message(LOG_INFO, "Маршрут не завершён, выгрузка невозможна!")
            self._send_lock_cargo_to_consumers()
            return
        
        self._send_release_cargo_to_consumers()

    def _send_lock_cargo_to_consumers(self):
        """ Блокировка грузового отсека (если он не заблокирован) """
        if self._cargo_locked:
            return

        self._log_message(LOG_INFO, "Блокировка грузового отсека")
        self._cargo_locked = True  # Отмечаем, что отсек заблокирован
        
        event = Event(
            source=self.event_source_name,
            destination=CARGO_BAY_QUEUE_NAME,
            operation="lock_cargo",
            parameters=None
        )
        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event)

    def _send_release_cargo_to_consumers(self):
        """ Разблокировка грузового отсека (если он заблокирован) """
        if not self._cargo_locked:
            return

        self._log_message(LOG_INFO, "Разблокировка грузового отсека")
        self._cargo_locked = False  # Отмечаем, что отсек разблокирован

        event = Event(
            source=self.event_source_name,
            destination=CARGO_BAY_QUEUE_NAME,
            operation="release_cargo",
            parameters=None
        )
        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event)

class SecurityMonitor(BaseSecurityMonitor):
    """ класс монитора безопасности """

    def __init__(self, queues_dir):
        super().__init__(queues_dir)
        self._init_set_security_policies()

    def _init_set_security_policies(self):
        """ инициализация политик безопасности """
        default_policies = [
            SecurityPolicy(
                source=COMMUNICATION_GATEWAY_QUEUE_NAME,
                destination=CONTROL_SYSTEM_QUEUE_NAME,
                operation='set_mission'
            ),
            SecurityPolicy(
                source=COMMUNICATION_GATEWAY_QUEUE_NAME,
                destination=SAFETY_BLOCK_QUEUE_NAME,
                operation='set_mission'
            ),
            SecurityPolicy(
                source=CONTROL_SYSTEM_QUEUE_NAME,
                destination=SAFETY_BLOCK_QUEUE_NAME,
                operation='set_speed'
            ),
            SecurityPolicy(
                source=CONTROL_SYSTEM_QUEUE_NAME,
                destination=SAFETY_BLOCK_QUEUE_NAME,
                operation='set_direction'
            ),
            SecurityPolicy(
                source=CONTROL_SYSTEM_QUEUE_NAME,
                destination=SAFETY_BLOCK_QUEUE_NAME,
                operation="release_cargo"
            ),
            SecurityPolicy(
                source=CONTROL_SYSTEM_QUEUE_NAME,
                destination=SAFETY_BLOCK_QUEUE_NAME,
                operation="lock_cargo"
            ),
            SecurityPolicy(
                source=CONTROL_SYSTEM_QUEUE_NAME,
                destination=NAVIGATION_QUEUE_NAME,
                operation="request_position"
            ),
            SecurityPolicy(
                source=SAFETY_BLOCK_QUEUE_NAME,
                destination=SERVOS_QUEUE_NAME,
                operation='set_speed'
            ),
            SecurityPolicy(
                source=SAFETY_BLOCK_QUEUE_NAME,
                destination=SERVOS_QUEUE_NAME,
                operation='set_direction'
            ),
            SecurityPolicy(
                source=SAFETY_BLOCK_QUEUE_NAME,
                destination=CARGO_BAY_QUEUE_NAME,
                operation='lock_cargo'
            ),
            SecurityPolicy(
                source=SAFETY_BLOCK_QUEUE_NAME,
                destination=CARGO_BAY_QUEUE_NAME,
                operation='release_cargo'
            ),
            SecurityPolicy(
                source=NAVIGATION_QUEUE_NAME,
                destination=SAFETY_BLOCK_QUEUE_NAME,
                operation="position_update"
            ),
            SecurityPolicy(
                source=NAVIGATION_QUEUE_NAME,
                destination=CONTROL_SYSTEM_QUEUE_NAME,
                operation="position_update"
            ),
        ]
        self.set_security_policies(policies=default_policies)        
        
    def set_security_policies(self, policies):
        """ установка новых политик безопасности """
        self._security_policies = policies
        self._log_message(LOG_INFO, f"изменение политик безопасности: {policies}")

    def _check_event(self, event: Event):
        """ проверка входящих событий """
        self._log_message(LOG_DEBUG, f"проверка события {event}, по умолчанию выполнение запрещено")

        authorized = False
        request = SecurityPolicy(
            source=event.source,
            destination=event.destination,
            operation=event.operation
        )

        if request in self._security_policies:
            self._log_message(LOG_DEBUG, "событие разрешено политиками, выполняем")
            authorized = True

        if authorized is False:
            self._log_message(LOG_ERROR, f"событие не разрешено политиками безопасности! {event}")
        return authorized

class CommunicationGateway(BaseCommunicationGateway):
    """CommunicationGateway класс для реализации логики взаимодействия
    с системой планирования заданий

    Работает в отдельном процессе, поэтому создаётся как наследник класса Process
    """
    def _send_mission_to_consumers(self):
        """ метод для отправки сообщения с маршрутным заданием в систему управления """
        
        control_q_name = CONTROL_SYSTEM_QUEUE_NAME
        safety_q_name = SAFETY_BLOCK_QUEUE_NAME

        event = Event(
            source=BaseCommunicationGateway.event_source_name,
            destination=control_q_name,
            operation="set_mission", 
            parameters=self._mission
        )
        safety_event = Event(
            source=BaseCommunicationGateway.event_source_name,
            destination=safety_q_name,
            operation="set_mission",
            parameters=self._mission
        )

        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event)
        security_monitor_q.put(safety_event)



class ControlSystem(BaseControlSystem):
    """ControlSystem блок расчёта управления """

    def _send_speed_and_direction_to_consumers(self, speed, direction):
        safety_q_name = SAFETY_BLOCK_QUEUE_NAME

        event_speed = Event(
            source=self.event_source_name,
            destination=safety_q_name,
            operation="set_speed",
            parameters=speed
        )
        event_direction = Event(
            source=self.event_source_name,
            destination=safety_q_name,
            operation="set_direction",
            parameters=direction
        )

        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event_direction)
        security_monitor_q.put(event_speed)

    def _lock_cargo(self):
        """ заблокировать грузовой отсек """
        safety_q_name = SAFETY_BLOCK_QUEUE_NAME

        safety_event = Event(
            source=BaseControlSystem.event_source_name,
            destination=safety_q_name,
            operation="lock_cargo",
            parameters=None
        )

        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(safety_event)


    def _release_cargo(self):
        """ открыть грузовой отсек """
        safety_q_name = SAFETY_BLOCK_QUEUE_NAME
        safety_event = Event(
            source=BaseControlSystem.event_source_name,
            destination=safety_q_name,
            operation="release_cargo",
            parameters=None
        )

        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(safety_event)


class NavigationSystem(BaseNavigationSystem):
    """ класс навигационного блока """
    def _send_position_to_consumers(self):        
        control_q_name = CONTROL_SYSTEM_QUEUE_NAME
        safety_q_name = SAFETY_BLOCK_QUEUE_NAME

        event = Event(
            source=BaseNavigationSystem.event_source_name,
            destination=control_q_name,
            operation="position_update",
            parameters=self._position
        )
        safety_event = Event(
            source=BaseNavigationSystem.event_source_name,
            destination=safety_q_name,
            operation="position_update",
            parameters=self._position
        )

        security_monitor_q = self._queues_dir.get_queue(SECURITY_MONITOR_QUEUE_NAME)
        security_monitor_q.put(event)
        security_monitor_q.put(safety_event)

Если у вас настроена и работает СУПА, установите в True значение переменной afcs_present

In [None]:
afcs_present = True

Поменяем идентификатор машинки для этого модуля

In [None]:
car_id = "m4"

В следующем блоке измените маршрут на ваш, его можно скопировать из модуля 2.
 
Для проверки работы систем безопасности будут активированы киберпрепятствия

```python
control_system.enable_critical_surprises()
```

In [None]:
wpl_file_content =   """QGC WPL 110
0	0	3	16	0	5	0	0	59.797417  30.272833	0	1
1	0	3	16	0	5	0	0	59.796326  30.272271	0	1
2	0	3	16	0	5	0	0	59.795488  30.273582	0	1
3	1	3	16	0	5	0	0	59.796054  30.281757	0	1
4	0	3	16	0	5	0	0	59.793197  30.301331	0	1
5	0	3	16	0	5	0	0	59.790573  30.319421	0	1
6	0	3	16	0	5	0	0	59.790740  30.322878	0	1
7	0	3	16	0	5	0	0	59.791476  30.325086	0	1
8	0	3	16	0	5	0	0	59.792483  30.326700	0	1
9	0	3	16	0	5	0	0	59.793644  30.327449	0	1
10	0	3	16	0	5	0	0	59.794873  30.327295	0	1
11	0	3	16	0	5	0	0	59.796180  30.326181	0	1
12	0	3	16	0	5	0	0	59.797457  30.324914	0	1
13	0	3	16	0	5	0	0	59.798261  30.324664	0	1
14	0	3	16	0	5	0	0	59.805876  30.324288	0	1
15	0	3	16	0	5	0	0	59.807699  30.324409	0	1
16	0	3	16	0	5	0	0	59.808593  30.325071	0	1
17	0	3	16	0	5	0	0	59.809175  30.326384	0	1
18	0	3	16	0	5	0	0	59.809566  30.328536	0	1
19	0	3	16	0	5	0	0	59.810362  30.337626	0	1
20	0	3	16	0	5	0	0	59.811015  30.340912	0	1
21	0	3	16	0	5	0	0	59.813538  30.348603	0	1
22	0	3	16	0	5	0	0	59.814648  30.353608	0	1
23	0	3	16	0	5	0	0	59.815124  30.360039	0	1
24	0	3	16	0	5	0	0	59.814962  30.368629	0	1
25	0	3	16	0	5	0	0	59.815292  30.373556	0	1
26	0	3	16	0	5	0	0	59.815549  30.381010	0	1
27	0	3	16	0	5	0	0	59.815004  30.382473	0	1
28	0	3	16	0	5	0	0	59.814160  30.382929	0	1
29	0	3	16	0	5	0	0	59.813526  30.382001	0	1
30	0	3	16	0	5	0	0	59.814168  30.380607	0	1
31	0	3	16	0	5	0	0	59.814885  30.380658	0	1
32	0	3	16	0	5	0	0	59.816907  30.381505	0	1
33	0	3	16	0	5	0	0	59.817948  30.381302	0	1
34	0	3	16	0	5	0	0	59.823817  30.378484	0	1
35	0	3	16	0	5	0	0	59.827748  30.375428	0	1
36	0	3	16	0	5	0	0	59.829736  30.374360	0	1
"""


wpl_file = "module4.wpl"

with open(wpl_file, "w") as f:
    f.write(wpl_file_content)

In [None]:
from time import sleep
from geopy import Point as GeoPoint

from src.servos import Servos
from src.sitl import SITL
from src.cargo_bay import CargoBay
from src.mission_planner import Mission
from src.mission_planner import MissionPlanner
from src.mission_planner_mqtt import MissionSender
from src.sitl_mqtt import TelemetrySender
from src.system_wrapper import SystemComponentsContainer
from src.wpl_parser import WPLParser
from src.mission_type import GeoSpecificSpeedLimit

wpl_file = "module4.wpl"

parser = WPLParser(wpl_file)    
points = parser.parse()
print(points)

speed_limits = [
    GeoSpecificSpeedLimit(0, 60),
    GeoSpecificSpeedLimit(19, 110),
    GeoSpecificSpeedLimit(26, 60),
]

home = GeoPoint(latitude=points[0].latitude, longitude=points[0].longitude) 
mission = Mission(home=home, waypoints=points, speed_limits=speed_limits, armed=True)

queues_dir = QueuesDirectory()  

if afcs_present:
    mission_sender = MissionSender(
        queues_dir=queues_dir, 
        client_id=car_id, 
        log_level=LOG_ERROR
    )
    telemetry_sender = TelemetrySender(
        queues_dir=queues_dir, 
        client_id=car_id, 
        log_level=LOG_ERROR
    )

mission_planner = MissionPlanner(
    queues_dir, 
    afcs_present=afcs_present, 
    mission=mission
)

sitl = SITL(
    queues_dir=queues_dir, 
    position=home,
    car_id=car_id, 
    post_telemetry=afcs_present, 
    log_level=LOG_ERROR
)

communication_gateway = CommunicationGateway(queues_dir=queues_dir, log_level=LOG_ERROR)
control_system = ControlSystem(queues_dir=queues_dir, log_level=LOG_INFO)
navigation_system = NavigationSystem(queues_dir=queues_dir, log_level=LOG_ERROR)
servos = Servos(queues_dir=queues_dir, log_level=LOG_ERROR)
cargo_bay = CargoBay(queues_dir=queues_dir, log_level=LOG_INFO)
safety_block = SafetyBlock(queues_dir=queues_dir, log_level=LOG_INFO)
security = SecurityMonitor(queues_dir=queues_dir)

system_components = SystemComponentsContainer(
components=[
        mission_sender,
        telemetry_sender,
        sitl,
        mission_planner,
        navigation_system,
        servos,
        cargo_bay,
        communication_gateway,
        control_system,
        safety_block,
        security
    ] if afcs_present else [
        sitl,
        mission_planner,
        navigation_system,
        servos,
        cargo_bay,
        communication_gateway,
        control_system,
        safety_block,
        security
    ]
)


control_system.enable_surprises()
system_components.start()

sleep(630)

system_components.stop()

system_components.clean()

Убедитесь, что 
1. ваша машинка успешно прошла весь заданный маршрут
2. не превысила ограничения скорости
3. успешно доставила груз

Если всё так - поздравляем, вы справились с заданием! Обязательно зафиксируйте все изменения в репозитории!



Мы будем признательны за обратную связь - любые комментарии, которые вы можете дать по итогам выполнения этого задания. 

Например, 

- было ли задание понятным по шкале 1..10 (1 - ничего не понятно, 10 - вопросов вообще не было, всё понятно)?
- было ли задание интересным по шкале 1..10 (1 - скука смертная, 10 - лучшее, что вам пока встречалось на олимпиадах)? 
- что бы вы предложили изменить, чтобы сделает его более интересным?
- по шкале 1..10 насколько сложным оно было для вас?
- что было самым трудным в задании? 

Авторы наиболее развёрнутых и интересных комментариев получат особенный приз от Лаборатории Касперского!

Дополнительная информация о кибериммунной разработке
- https://os.kaspersky.ru/cyber-immune-development/ 
- https://github.com/sergey-sobolev/cyberimmune-systems/wiki/%D0%9A%D0%B8%D0%B1%D0%B5%D1%80%D0%B8%D0%BC%D0%BC%D1%83%D0%BD%D0%B8%D1%82%D0%B5%D1%82
- канал в телеграм: https://t.me/learning_cyberimmunity
