# Управление БЛА с помощью Pymavlink

## Описание проекта

Этот проект представляет собой пример управления беспилотным летательным аппаратом (БЛА) с использованием библиотеки pymavlink на Python. Проект включает в себя подключение к БЛА, получение телеметрии, выполнение различных команд управления, таких как взлет, посадка, перемещение в заданную точку, изменение высоты, получение информации о состоянии дрона и другие полезные команды.

1. Импорт необходимых библиотек

Импортируем все необходимые библиотеки для работы с БЛА и управления им.

In [None]:
from datetime import datetime
from pymavlink import mavutil
import time
import threading

2. Создание подключения к БЛА

Создаем подключение по протоколу MavLink к полетному контроллеру.

Для установления связи с БЛА и отправки команд управления.

Мы используем **mavutil.mavlink_connection** для создания подключения и ожидаем **heartbeat** сигнал для подтверждения, что соединение установлено.

In [None]:
def do_connect(connect='tcp:127.0.0.1:14550'):
    master = mavutil.mavlink_connection(connect)
    master.wait_heartbeat()
    return master

3. Получение телеметрии

Запрашиваем информацию из полетного контроллера и обрабатываем полученные сообщения телеметри

Для мониторинга состояния БЛА в реальном времени.

Мы используем отдельный поток для получения телеметрии, чтобы не блокировать основной поток выполнения команд.и.

In [None]:
def get_telemetry():
    while True:
        try:
            msg = master.recv_match(blocking=True, timeout=1)
            if msg is not None:
                process_telemetry(msg)
        except Exception as e:
            print(f'Потеряно подключение телеметрии: {e}')

def process_telemetry(msg):
    if msg.get_type() == 'GLOBAL_POSITION_INT':
        altitude = round(msg.relative_alt / 1000, 1)
        gps_altitude = round(msg.alt / 1000, 1)
        longitude = msg.lon / 1e7
        latitude = msg.lat / 1e7
        print(get_time())
        print(f'Высота: {altitude}м', f'Высота (GPS): {gps_altitude}м', f'Lon: {longitude}', f'Lat: {latitude}', sep='\n')
        print()

def get_time():
    now = datetime.now()
    return f'{now.hour:02d}:{now.minute:02d}:{now.second:02d}'

4. Подключение к БЛА и запуск потока телеметрии

Подключаемся к БЛА и запускаем поток для получения телеметри

Для начала работы с БЛА и мониторинга его состояния.

Мы создаем подключение и запускаем поток телеметрии, чтобы начать получать данные о состоянии БЛА.и.

In [None]:
master = do_connect()
telemetry_thread = threading.Thread(target=get_telemetry)
telemetry_thread.daemon = True
telemetry_thread.start()

5. Реализация цикла с командами управления БЛА

Реализуем цикл, который принимает команды от пользователя и выполняет их.

Для интерактивного управления БЛА через командную строку.

Мы используем цикл **while True**, который ждет ввода команды от пользователя и выполняет соответствующую функцию.

In [None]:
import drone_commands as cmd

while True:
    command = input('введите команду: ')

    if command == 'exit':
        break
    elif command == 'arm':
        cmd.arm(master)
    elif command == 'takeoff':
        cmd.takeoff(master, 10)
    elif command == 'land':
        cmd.land(master)
    elif command.startswith('goto'):
        try:
            _, lat, lon, alt = command.split()
            cmd.goto(master, float(lat), float(lon), float(alt))
        except ValueError:
            print("Неверный формат команды. Используйте: goto <lat> <lon> <alt>")
    elif command.startswith('change_altitude'):
        try:
            _, alt = command.split()
            cmd.change_altitude(master, float(alt))
        except ValueError:
            print("Неверный формат команды. Используйте: change_altitude <alt>")
    elif command == 'get_status':
        cmd.get_status(master)
    elif command == 'get_battery_info':
        cmd.get_battery_info(master)
    elif command == 'get_speed_info':
        cmd.get_speed_info(master)
    elif command == 'get_heading_info':
        cmd.get_heading_info(master)
    elif command == 'get_gps_info':
        cmd.get_gps_info(master)
    elif command == 'get_flight_mode':
        cmd.get_flight_mode(master)
    elif command.startswith('set_speed'):
        try:
            _, speed = command.split()
            cmd.set_speed(master, float(speed))
        except ValueError:
            print("Неверный формат команды. Используйте: set_speed <speed>")
    elif command.startswith('set_heading'):
        try:
            _, heading = command.split()
            cmd.set_heading(master, float(heading))
        except ValueError:
            print("Неверный формат команды. Используйте: set_heading <heading>")
    elif command == 'return_to_launch':
        cmd.return_to_launch(master)
    elif command == 'pause_flight':
        cmd.pause_flight(master)
    elif command == 'continue_flight':
        cmd.continue_flight(master)
    else:
        print('Неизвестная команда!')
        continue
    print("Выполняю команду -", command)

6. Реализация команд управления БЛА

Команда включения моторов **arm**

Для подготовки дрона к полету.

Мы отправляем команду **MAV_CMD_COMPONENT_ARM_DISARM** с параметром **1** для включения моторов.)

In [None]:
def arm(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
        0,
        1, 0, 0, 0, 0, 0, 0)
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print("Дрон успешно заармирован")
    else:
        print("Ошибка при армировании дрона")

Команда взлета **takeoff**

Запускает взлет дрона на заданную высоту.

Мы устанавливаем режим **GUIDED** и отправляем команду **MAV_CMD_NAV_TAKEOFF** с указанием высоты взлета.

In [None]:
def takeoff(master, alt):
    master.set_mode_guided()
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
        0,
        0, 0, 0, 0, 0, 0, alt)
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print("Дрон взлетает")
    else:
        print("Ошибка при взлете")

Команда посадки **land**

Для завершения полета и безопасной посадки дрона.

Мы устанавливаем режим **LAND** и ожидаем подтверждения выполнения команды.

In [None]:
def land(master):
    master.set_mode_land()
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print("Дрон начинает посадку")
    else:
        print("Ошибка при посадке")

Команда перемещения в заданную точку **goto**

Перемещает дрон в заданную точку с указанными координатами и высотой.

Мы отправляем команду **MAV_CMD_NAV_WAYPOINT** с указанием координат и высоты.

In [None]:
def goto(master, lat, lon, alt):
    master.mav.mission_item_send(
        master.target_system,
        master.target_component,
        0,
        mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
        mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
        2, 0, 0, 0, 0, 0,
        lat, lon, alt)
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print(f"Дрон летит к точке: Lat {lat}, Lon {lon}, Alt {alt}")
    else:
        print("Ошибка при перемещении к точке")

Команда изменения высоты **change_altitude**

Для корректировки высоты полета дрона.

Мы отправляем команду **MAV_CMD_DO_CHANGE_ALTITUDE** с указанием новой высоты.

In [None]:
def change_altitude(master, alt):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_DO_CHANGE_ALTITUDE,
        0,
        alt, 0, 0, 0, 0, 0, 0)
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print(f"Дрон меняет высоту на {alt}м")
    else:
        print("Ошибка при изменении высоты")

Команда получения состояния дрона **get_status**

Для мониторинга состояния дрона в реальном времени.

Мы запрашиваем сообщение **SYS_STATUS** для получения информации о состоянии дрона.

In [None]:
def get_status(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
        0,
        mavutil.mavlink.MAVLINK_MSG_ID_SYS_STATUS, 0, 0, 0, 0, 0, 0)
    msg = master.recv_match(type='SYS_STATUS', blocking=True)
    if msg:
        print(f"Состояние дрона: {msg}")
    else:
        print("Не удалось получить состояние дрона")

Команда получения информации о батарее **get_battery_info**

Для мониторинга заряда батареи дрона.

Мы запрашиваем сообщение **BATTERY_STATUS** для получения информации о батарее.

In [None]:
def get_battery_info(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
        0,
        mavutil.mavlink.MAVLINK_MSG_ID_BATTERY_STATUS, 0, 0, 0, 0, 0, 0)
    msg = master.recv_match(type='BATTERY_STATUS', blocking=True)
    if msg:
        print(f"Информация о батарее: {msg}")
    else:
        print("Не удалось получить информацию о батарее")

Команда получения информации о скорости **get_speed_info**

Для мониторинга скорости полета дрона.

Мы запрашиваем сообщение **VFR_HUD** для получения информации о скорости.

In [None]:
def get_speed_info(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
        0,
        mavutil.mavlink.MAVLINK_MSG_ID_VFR_HUD, 0, 0, 0, 0, 0, 0)
    msg = master.recv_match(type='VFR_HUD', blocking=True)
    if msg:
        print(f"Информация о скорости: {msg}")
    else:
        print("Не удалось получить информацию о скорости")

Команда получения информации о курсе **get_heading_info**

Получает информацию о текущем курсе дрона.

Мы запрашиваем сообщение **ATTITUDE** для получения информации о курсе.

In [None]:
def get_heading_info(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
        0,
        mavutil.mavlink.MAVLINK_MSG_ID_ATTITUDE, 0, 0, 0, 0, 0, 0)
    msg = master.recv_match(type='ATTITUDE', blocking=True)
    if msg:
        print(f"Информация о курсе: {msg}")
    else:
        print("Не удалось получить информацию о курсе")

Команда получения информации о состоянии GPS **get_gps_info**

Получает информацию о состоянии GPS, включая количество видимых спутников.

Мы запрашиваем сообщение **GPS_RAW_INT** для получения информации о GPS.

In [None]:
def get_gps_info(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
        0,
        mavutil.mavlink.MAVLINK_MSG_ID_GPS_RAW_INT, 0, 0, 0, 0, 0, 0)
    msg = master.recv_match(type='GPS_RAW_INT', blocking=True)
    if msg:
        print(f"Информация о GPS: {msg}")
    else:
        print("Не удалось получить информацию о GPS")

Команда получения текущего режима полета **get_flight_mode**

Для мониторинга текущего режима полета дрона.

Мы запрашиваем сообщение **HEARTBEAT** для получения информации о режиме полета.

In [None]:
def get_flight_mode(master):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
        0,
        mavutil.mavlink.MAVLINK_MSG_ID_HEARTBEAT, 0, 0, 0, 0, 0, 0)
    msg = master.recv_match(type='HEARTBEAT', blocking=True)
    if msg:
        print(f"Текущий режим полета: {msg.custom_mode}")
    else:
        print("Не удалось получить информацию о режиме полета")

Команда установки скорости полета **set_speed**

Мы отправляем команду **MAV_CMD_DO_CHANGE_SPEED** с указанием новой скорости.

In [None]:
def set_speed(master, speed):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_DO_CHANGE_SPEED,
        0,
        0, speed, -1, 0, 0, 0, 0)
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print(f"Скорость полета установлена на {speed} м/с")
    else:
        print("Ошибка при установке скорости полета")

Команда установки курса **set_heading**

Для управления направлением полета дрона.

Мы отправляем команду MAV_CMD_CONDITION_YAW с указанием нового курса.

In [None]:
def set_heading(master, heading):
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_CONDITION_YAW,
        0,
        heading, 0, 0, 0, 0, 0, 0)
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print(f"Курс установлен на {heading} градусов")
    else:
        print("Ошибка при установке курса")

Команда возврата на базу **return_to_launch**

Для безопасного возврата дрона на точку взлета.

Мы устанавливаем режим **RTL** (Return to Launch) и ожидаем подтверждения выполнения команды.

In [None]:
def return_to_launch(master):
    master.set_mode_rtl()
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print("Дрон возвращается на базу")
    else:
        print("Ошибка при возврате на базу")

Команда паузы в полете **pause_flight**

Для временной остановки полета дрона.

Мы устанавливаем режим **HOLD** (пауза) и ожидаем подтверждения выполнения команды.

In [None]:
def pause_flight(master):
    master.set_mode_hold()
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print("Полет приостановлен")
    else:
        print("Ошибка при приостановке полета")

Команда продолжения полета после паузы **continue_flight**

Для возобновления полета после временной остановки.

Мы устанавливаем режим **GUIDED** и ожидаем подтверждения выполнения команды.

In [None]:
def continue_flight(master):
    master.set_mode_guided()
    ack_msg = master.recv_match(type='COMMAND_ACK', blocking=True)
    if ack_msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
        print("Полет возобновлен")
    else:
        print("Ошибка при возобновлении полета")

7. Завершение работы

Для корректного завершения работы и освобождения ресурсов.

Мы используем команду **break** для выхода из цикла и завершения работы программы.