In [1]:
import serial
import struct
import time

### Connection
# connect to a Betaflight drone via serial
port = '/dev/ttyACM0'
baudrate = 9600 # expected baudrate

ser = serial.Serial(port, baudrate, timeout=1)

In [2]:
### MSP commands
# out messages usually start with 1 (1**)
MSP_STATUS = 101
MSP_RAW_IMU = 102
# in messages usually start with 2 (2**)
MSP_SET_MOTOR = 214

In [3]:
### Main functions
def construct_msp_message(command: int, data: list = []) -> bytearray:
    """Creates expected formatting.

    Command frame format:
    Header: `$M<`
    Body: length of `data` + `command` + `data` values
    Tail: checksum

    Args:
        command (int): MSP command
        data (list, optional): list of params (length is varying). Defaults to [].

    Returns:
        bytearray: command frame
    """
    message = bytearray()
    # header
    message.append(ord('$'))
    message.append(ord('M'))
    message.append(ord('<'))
    # body
    message.append(len(data))
    message.append(command)
    checksum = len(data) ^ command
    for byte in data:
        message.append(byte)
        checksum ^= byte
    # tail
    message.append(checksum)
    return message

def send_msp_message(command: int, data: list = []):
    """Send prepared command via serial

    Args:
        command (int): MSP command
        data (list, optional): list of params. Defaults to [].
    """
    message = construct_msp_message(command, data)
    ser.write(message)

def read_msp_response() -> tuple:
    """Read data

    Returns:
        tuple: command code and its values
    """
    header = ser.read(3)  # read header
    if header != b'$M>':
        return None
    # read body
    data_length = ser.read(1)[0]
    code = ser.read(1)[0]
    data = ser.read(data_length)
    # read tail
    checksum = ser.read(1)[0]
    # check if the command is of the expected length
    calculated_checksum = data_length ^ code
    for byte in data:
        calculated_checksum ^= byte
    if calculated_checksum != checksum:
        return None

    return (code, data)

In [8]:
### Get data functions
def get_raw_imu() -> tuple:
    """Read raw IMU and decode it

    Returns:
        tuple | None: tuple - processed data, None - received data doesn't correspond to sent command
    """
    send_msp_message(MSP_RAW_IMU)  # send request
    time.sleep(0.1)  # wait for reply
    response = read_msp_response() # get reply

    if response and response[0] == MSP_RAW_IMU:
        imu = struct.unpack(f'<{len(response[1]) // 2}H', response[1])  # decode
        return imu
    return None

In [11]:
print(get_raw_imu())

None


In [28]:
### Set motors values functions
def set_motor_values(motor1: int, motor2: int, motor3: int, motor4: int):
    """Set throttle per motor

    Args:
        motor1 (int): motor 1 throttle
        motor2 (int): motor 2 throttle
        motor3 (int): motor 3 throttle
        motor4 (int): motor 4 throttle
    """
    motor_values = [motor1, motor2, motor3, motor4, 0, 0, 0, 0]
    send_msp_message(MSP_SET_MOTOR, struct.pack('<8H', *motor_values))

In [29]:
### Arming/Disarming
def arm():
    """Arm
    Uses small throttle values (1100).
    """
    set_motor_values(1100, 1100, 1100, 1100)

def disarm():
    """Disarm
    Uses min motor values (1000).
    """
    set_motor_values(1000, 1000, 1000, 1000)

In [30]:
### Flying functions
def takeoff():
    """Take Off 
    Uses mean throttle values (1500).
    """
    set_motor_values(1500, 1500, 1500, 1500)

def yaw(cw: bool = True, speed: int = 100, motors: list = []):
    """Yaw rotation

    Changes values for diagonal motors.

    Args:
        cw (bool, optional): if True CW rotation, if False - CCW. Defaults to True.
        speed (int, optional): power amount to add/subtract from motor values. Defaults to 100.
        motors (list, optional): current motor values (add to /subtract from). Defaults to [].
    """
    if cw:
        new_motors = [motors[0] + speed, motors[1] - speed, motors[2] - speed, motors[3] + speed]
    else:
        new_motors = [motors[0] - speed, motors[1] + speed, motors[2] + speed, motors[3] - speed]
    set_motor_values(*new_motors)

In [39]:
set_motor_values(1500, 500, 500, 1500)

In [40]:
set_motor_values(0, 0, 0, 0)

In [41]:
### Close connection when turning off the drone
ser.close()