In [None]:
from pymavlink import mavutil
import math

# 连接到 SITL 模拟器
connection = mavutil.mavlink_connection('udp:192.168.2.1:14550')

# 等待心跳信号，确保连接成功
connection.wait_heartbeat()

# 获取当前位置的 GPS 信息
msg = connection.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
latitude = msg.lat / 1e7  # 将纬度转为实际值
longitude = msg.lon / 1e7  # 将经度转为实际值
altitude = msg.alt / 1e3   # 转换为米

# 获取当前的航向信息
heading_msg = connection.recv_match(type='VFR_HUD', blocking=True)
heading = heading_msg.heading  # 航向角度 (0 - 360度)

# 计算前方 10 米处的目标位置
def calculate_new_position(lat, lon, distance_m, bearing_deg):
    R = 6378.1 * 1000  # 地球半径（米）
    bearing = math.radians(bearing_deg)  # 将角度转换为弧度

    lat1 = math.radians(lat)  # 当前纬度（弧度）
    lon1 = math.radians(lon)  # 当前经度（弧度）

    # 计算新纬度
    lat2 = math.asin(math.sin(lat1) * math.cos(distance_m / R) +
                     math.cos(lat1) * math.sin(distance_m / R) * math.cos(bearing))

    # 计算新经度
    lon2 = lon1 + math.atan2(math.sin(bearing) * math.sin(distance_m / R) * math.cos(lat1),
                             math.cos(distance_m / R) - math.sin(lat1) * math.sin(lat2))

    lat2 = math.degrees(lat2)
    lon2 = math.degrees(lon2)
    
    return lat2, lon2

# 计算目标点 (前进 10 米)
target_lat, target_lon = calculate_new_position(latitude, longitude, 10, heading)
print(f"Target position: lat {target_lat}, lon {target_lon}")

# 发送航点指令到飞控
connection.mav.mission_item_send(
    connection.target_system,
    connection.target_component,
    0,  # 序列号
    mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,  # 坐标框架
    mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,  # 命令: 航点
    0, 0,  # 当前航点, 自动继续
    0, 0, 0, 0,  # 参数留空
    target_lat,  # 目标纬度
    target_lon,  # 目标经度
    altitude)  # 目标高度（保持当前高度）

print("Waypoint command sent")

# 设置为自动模式，开始执行航点任务
mode_id = connection.mode_mapping()['AUTO']
connection.set_mode(mode_id)
print("Switched to AUTO mode, proceeding to waypoint")


In [44]:
import time
import sys
master = mavutil.mavlink_connection('udp:192.168.2.2:14550')
master.wait_heartbeat()

def set_rc_channel_pwm(channel_id, pwm=1500):
    if channel_id < 1 or channel_id > 18:
        print("Channel does not exist.")
        return
    rc_channel_values = [65535 for _ in range(18)]
    rc_channel_values[channel_id - 1] = pwm
    master.mav.rc_channels_override_send(
        master.target_system,
        master.target_component,
        *rc_channel_values)

# while True:
#     try:
#         set_rc_channel_pwm(5, 1600)
#         time.sleep(0.2)
#         print('command sent')
#     except Exception as error:
#         print(error)
#         sys.exit(0)

In [64]:
"""
Example of how to use RC_CHANNEL_OVERRIDE messages to force input channels
in Ardupilot. These effectively replace the input channels (from joystick
or radio), NOT the output channels going to thrusters and servos.
"""

# Import mavutil
from pymavlink import mavutil

# Create the connection
master = mavutil.mavlink_connection('udp:192.168.2.2:14550')
# Wait a heartbeat before sending commands
master.wait_heartbeat()

def is_armed():
    try:
        return bool(master.wait_heartbeat().base_mode & 0b10000000)
    except:
        return False


while not is_armed():
    master.arducopter_arm()


# Create a function to send RC values
# More information about Joystick channels
# here: https://www.ardusub.com/operators-manual/rc-input-and-output.html#rc-inputs
def set_rc_channel_pwm(channel_id, pwm=1500):
    """ Set RC channel pwm value
    Args:
        channel_id (TYPE): Channel ID
        pwm (int, optional): Channel pwm value 1100-1900
    """
    if channel_id < 1 or channel_id > 18:
        print("Channel does not exist.")
        return

    # Mavlink 2 supports up to 18 channels:
    # https://mavlink.io/en/messages/common.html#RC_CHANNELS_OVERRIDE
    rc_channel_values = [65535 for _ in range(8)]
    rc_channel_values[channel_id - 1] = pwm
    master.mav.rc_channels_override_send(
        master.target_system,                # target_system
        master.target_component,             # target_component
        *rc_channel_values)                  # RC channel list, in microseconds.


# Set some yaw
#set_rc_channel_pwm(5, 1800)

In [66]:
set_rc_channel_pwm(5, 1400)

In [None]:
import gym
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from IPython import display
env = gym.make("CartPole-v1", render_mode='rgb_array')
def system_dynamics(state, u, dt=0.02):
    # CartPole的状态是 [位置, 速度, 杆角, 杆角速度]
    x, x_dot, theta, theta_dot = state
    
    # 系统参数
    gravity = 9.8
    masscart = 1.0
    masspole = 0.1
    total_mass = masspole + masscart
    length = 0.5  # 半长
    polemass_length = masspole * length
    
    force = u
    temp = (force + polemass_length * theta_dot**2 * np.sin(theta)) / total_mass
    theta_acc = (gravity * np.sin(theta) - np.cos(theta) * temp) / (length * (4/3 - masspole * np.cos(theta)**2 / total_mass))
    x_acc = temp - polemass_length * theta_acc * np.cos(theta) / total_mass
    
    # 更新状态
    x_new = x + dt * x_dot
    x_dot_new = x_dot + dt * x_acc
    x_dot_new = x_dot_new[0]
    theta_new = theta + dt * theta_dot
    theta_dot_new = theta_dot + dt * theta_acc
    theta_dot_new = theta_dot_new[0]
    return np.array([x_new, x_dot_new, theta_new, theta_dot_new])
def mpc_controller(state, horizon=10, dt=0.02):
    # 定义目标函数（最小化杆角和控制输入）
    def cost(u_seq):
        u_seq = np.reshape(u_seq, (horizon, 1))
        state_pred = np.copy(state)
        total_cost = 0.0
        
        for u in u_seq:
            state_pred = system_dynamics(state_pred, u, dt)
            # 成本函数：让杆角尽量接近0，控制力尽量小
            total_cost += (state_pred[2]**2 + 0.01 * (u**2))
            total_cost += state_pred[0]**2
        
        return total_cost
    
    # 初始控制序列
    u0 = np.zeros(horizon)
    
    # 使用优化算法求解最优控制输入序列
    result = minimize(cost, u0, method='SLSQP', bounds=[(-10, 10)] * horizon)
    
    # 返回第一个控制输入
    return result.x[0]
# 定义一个函数，用于在Jupyter Notebook中显示环境帧
def show_env(env, step=0):
    plt.figure(figsize=(6,4))
    plt.imshow(env.render())  # 获取当前环境的图像
    plt.axis('off')
    plt.title(f"Step: {step}")
    display.display(plt.gcf())  # 显示当前帧
    display.clear_output(wait=True)  # 清除之前的输出

env.reset()
env.state = np.array([0.0, 0.0, 0.0, 0.0])  # [x, x_dot, theta, theta_dot]
total_reward = 0
done = False
state = env.state
step = 0
while not done:
    show_env(env, step)
    
    # 获取当前状态
    # print(state)
    current_state = np.array(state)
    
    # 通过MPC计算最优控制输入
    action = mpc_controller(current_state)
    
    # Gym的动作是离散的，所以将连续的控制输入映射为离散动作
    action = 1 if action > 0 else 0
    
    # 执行动作
    temp = env.step(action)
    state, reward, done, _, _ = temp
    total_reward += reward
    
    if done:
        print("Total reward:", total_reward)
        break
    step += 1
env.close()


In [1]:
from pymavlink import mavutil
import time

# 创建 MAVLink 连接
master = mavutil.mavlink_connection('udp:192.168.2.1:14550')

# 等待心跳包，以确认与飞行控制器连接
master.wait_heartbeat()
print("Heartbeat received. Connection established.")

def set_servo_pwm(servo_channel, pwm_value):
    """
    发送命令来设置特定舵机的 PWM 值。

    :param servo_channel: 要控制的舵机通道号 (通常 1-8)。
    :param pwm_value: PWM 值 (范围 1000-2000)。
    """
    master.mav.command_long_send(
        master.target_system,       # 目标系统 ID
        master.target_component,    # 目标组件 ID
        mavutil.mavlink.MAV_CMD_DO_SET_SERVO,  # 命令
        0,                          # 确认（不需要）
        servo_channel,              # 参数 1: 舵机通道
        pwm_value,                  # 参数 2: PWM 值
        0, 0, 0, 0, 0              # 剩余的参数不使用
    )
def arm_vehicle():
    """
    解锁飞控
    """
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
        0,  # 确认
        1,  # 1 表示解锁，0 表示上锁
        0, 0, 0, 0, 0, 0  # 剩余参数不使用
    )
    # 等待状态变更
    master.motors_armed_wait()
    print("Vehicle armed!")

def disarm_vehicle():
    """
    上锁飞控
    """
    master.mav.command_long_send(
        master.target_system,
        master.target_component,
        mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
        0,  # 确认
        0,  # 1 表示解锁，0 表示上锁
        0, 0, 0, 0, 0, 0  # 剩余参数不使用
    )

    # 等待状态变更
    master.motors_disarmed_wait()
    print("Vehicle disarmed!")   

Heartbeat received. Connection established.


In [None]:

try:
    # 控制第一个舵机通道，将其 PWM 设置为 1500
    arm_vehicle()
    set_servo_pwm(1, 1600)
    print("Servo 1 set to 1600 PWM")
    time.sleep(5)
    # 控制第二个舵机通道，将其 PWM 设置为 1700
    # set_servo_pwm(1, 1500)
    # print("Servo 2 set to 1500 PWM")
    # time.sleep(1)
    # # 上锁飞控
    # disarm_vehicle()

    # 保持运行，以确保 QGroundControl 保持连接
except:
    pass
       
