# 神经网络增强 RRT 算法教程

本教程展示如何使用神经网络增强 RRT 算法进行路径规划。主要内容包括：

1. 环境准备和依赖安装
2. 训练数据生成和预处理
3. 神经网络模型训练
4. 使用训练好的模型进行路径规划
5. 基于 Pygame 的可视化演示

## 环境准备

首先确保已安装所需的依赖包：

In [None]:
!pip install torch numpy pygame matplotlib tqdm shapely pyyaml

## 1. 导入必要的模块

In [1]:
import os
import sys
import torch
import numpy as np
from tqdm.notebook import tqdm

# 添加项目根目录到 Python 路径
current_dir = os.path.dirname(os.path.abspath('.'))
sys.path.append(current_dir)

from ml.data import DataGenerator, create_data_loaders, RRTDataset
from ml.models.rrt_nn import NeuralRRT, SamplingNetwork, EvaluationNetwork, OptimizationNetwork
from simulation.environment import Environment
from rrt.rrt_star import RRTStar

pygame 2.6.1 (SDL 2.28.4, Python 3.10.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


## 2. 训练数据准备

### 2.1 加载已生成的训练数据

In [2]:
# 加载已生成的训练数据
data_path = os.path.join(current_dir, 'data/training/rrt_dataset.pt')

if os.path.exists(data_path):
    dataset = torch.load(data_path)
    print(f"成功加载数据集，样本数量: {len(dataset)}")
else:
    print("未找到预生成的数据集，将创建新的数据集")
    # 配置数据生成器参数
    generator = DataGenerator(
        env_width=500.0,
        env_height=300.0,
        grid_size=(64, 64),
        min_obstacles=3,
        max_obstacles=8,
        num_samples=100
    )
    
    print("数据生成器初始化完成")
    
    # 生成训练样本
    num_examples = 10
    examples = []
    
    with tqdm(total=num_examples, desc="生成训练样本") as pbar:
        while len(examples) < num_examples:
            example = generator.generate_example()
            if example is not None:
                examples.append(example)
                pbar.update(1)
    
    # 创建数据集
    dataset = generator.generate_dataset(examples)
    
    # 保存数据集
    os.makedirs(os.path.dirname(data_path), exist_ok=True)
    torch.save(dataset, data_path)
    print(f"数据集已保存到 {data_path}")

# 创建数据加载器
train_loader, val_loader = create_data_loaders(
    dataset,
    batch_size=2,  # 使用小批量以适应小数据集
    train_ratio=0.8
)

未找到预生成的数据集，将创建新的数据集
数据生成器初始化完成


ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html

### 2.2 查看数据集统计信息

In [None]:
def print_dataset_stats(dataset):
    """打印数据集统计信息"""
    print("\n数据集统计信息:")
    print(f"样本数量: {len(dataset)}")
    
    # 计算路径长度统计
    path_lengths = [ex.path_length for ex in dataset.examples]
    print("\n路径长度统计:")
    print(f"  最小值: {min(path_lengths):.2f}")
    print(f"  最大值: {max(path_lengths):.2f}")
    print(f"  平均值: {np.mean(path_lengths):.2f}")
    print(f"  标准差: {np.std(path_lengths):.2f}")
    
    # 计算平滑度统计
    smoothness = [ex.smoothness for ex in dataset.examples]
    print("\n平滑度统计:")
    print(f"  最小值: {min(smoothness):.2f}")
    print(f"  最大值: {max(smoothness):.2f}")
    print(f"  平均值: {np.mean(smoothness):.2f}")
    print(f"  标准差: {np.std(smoothness):.2f}")
    
    # 计算间隙统计
    clearance = [ex.clearance for ex in dataset.examples]
    print("\n障碍物间隙统计:")
    print(f"  最小值: {min(clearance):.2f}")
    print(f"  最大值: {max(clearance):.2f}")
    print(f"  平均值: {np.mean(clearance):.2f}")
    print(f"  标准差: {np.std(clearance):.2f}")

print_dataset_stats(dataset)

### 2.3 可视化训练样本

In [None]:
import matplotlib.pyplot as plt

# 可视化一个训练样本
def visualize_sample(example, index=0):
    # 创建环境
    env = Environment(width=500.0, height=300.0)
    
    # 添加障碍物
    for obs_dict in example.obstacles:
        if obs_dict['type'] == 'circle':
            env.add_obstacle(
                x=obs_dict['x'],
                y=obs_dict['y'],
                obstacle_type='circle',
                radius=obs_dict['radius']
            )
        elif obs_dict['type'] == 'rectangle':
            env.add_obstacle(
                x=obs_dict['x'],
                y=obs_dict['y'],
                obstacle_type='rectangle',
                width=obs_dict['width'],
                height=obs_dict['height'],
                angle=obs_dict['angle']
            )
    
    # 可视化环境和路径
    fig, ax = plt.subplots(figsize=(10, 6))
    env.plot_obstacles(ax)
    
    # 绘制起点和终点
    ax.plot(example.start[0], example.start[1], 'go', markersize=10, label='起点')
    ax.plot(example.goal[0], example.goal[1], 'ro', markersize=10, label='终点')
    
    # 绘制路径
    path_x = [p[0] for p in example.path]
    path_y = [p[1] for p in example.path]
    ax.plot(path_x, path_y, 'b-', linewidth=2, label='路径')
    
    # 绘制采样点
    valid_x = [p[0] for p in example.valid_samples[:100]]  # 限制点数以避免过度绘制
    valid_y = [p[1] for p in example.valid_samples[:100]]
    ax.scatter(valid_x, valid_y, c='g', s=2, alpha=0.3, label='有效采样点')
    
    invalid_x = [p[0] for p in example.invalid_samples[:100]]
    invalid_y = [p[1] for p in example.invalid_samples[:100]]
    ax.scatter(invalid_x, invalid_y, c='r', s=2, alpha=0.3, label='无效采样点')
    
    # 设置图表
    ax.set_xlim(0, 500)
    ax.set_ylim(0, 300)
    ax.set_aspect('equal')
    ax.legend()
    ax.set_title(f'训练样本 #{index+1}')
    
    plt.show()
    
    # 打印样本信息
    print(f"样本 #{index+1} 信息:")
    print(f"  路径长度: {example.path_length:.2f}")
    print(f"  平滑度: {example.smoothness:.2f}")
    print(f"  障碍物间隙: {example.clearance:.2f}")
    print(f"  路径点数: {len(example.path)}")
    print(f"  有效采样点数: {len(example.valid_samples)}")
    print(f"  无效采样点数: {len(example.invalid_samples)}")

# 可视化第一个样本
visualize_sample(dataset.examples[0], 0)

## 3. 模型训练

### 3.1 初始化模型

In [None]:
# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

# 初始化神经网络
sampling_net = SamplingNetwork().to(device)
evaluation_net = EvaluationNetwork().to(device)
optimization_net = OptimizationNetwork().to(device)

# 创建优化器
learning_rate = 1e-3
optimizers = {
    'sampling': torch.optim.Adam(sampling_net.parameters(), lr=learning_rate),
    'evaluation': torch.optim.Adam(evaluation_net.parameters(), lr=learning_rate),
    'optimization': torch.optim.Adam(optimization_net.parameters(), lr=learning_rate)
}

### 3.2 训练模型

注意：由于我们的数据集很小，这里只进行少量训练，主要是为了演示流程。实际应用中应该使用更大的数据集和更多的训练轮次。

In [None]:
num_epochs = 10  # 使用较少的轮次以快速演示
save_dir = 'results/models'
os.makedirs(save_dir, exist_ok=True)

# 训练循环
for epoch in range(num_epochs):
    # 训练模式
    sampling_net.train()
    evaluation_net.train()
    optimization_net.train()
    
    train_loss = {'sampling': 0.0, 'evaluation': 0.0, 'optimization': 0.0}
    
    with tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}') as pbar:
        for batch in pbar:
            # 将数据移到设备上
            env_state = batch['env_state'].to(device)
            start = batch['start'].to(device)
            goal = batch['goal'].to(device)
            path = batch['path'].to(device)
            
            # 训练采样网络
            optimizers['sampling'].zero_grad()
            sample_loss = sampling_net.compute_loss(env_state, start, goal)
            sample_loss.backward()
            optimizers['sampling'].step()
            
            # 训练评估网络
            optimizers['evaluation'].zero_grad()
            eval_loss = evaluation_net.compute_loss(env_state, path)
            eval_loss.backward()
            optimizers['evaluation'].step()
            
            # 训练优化网络
            optimizers['optimization'].zero_grad()
            opt_loss = optimization_net.compute_loss(env_state, path)
            opt_loss.backward()
            optimizers['optimization'].step()
            
            # 更新损失
            train_loss['sampling'] += sample_loss.item()
            train_loss['evaluation'] += eval_loss.item()
            train_loss['optimization'] += opt_loss.item()
            
            # 更新进度条
            pbar.set_postfix({
                'sample_loss': sample_loss.item(),
                'eval_loss': eval_loss.item(),
                'opt_loss': opt_loss.item()
            })
    
    # 打印每个 epoch 的平均损失
    print(f"\nEpoch {epoch+1} 平均损失:")
    for key, value in train_loss.items():
        print(f"  {key}: {value/len(train_loader):.4f}")
    
    # 保存模型
    if (epoch + 1) % 5 == 0 or epoch == num_epochs - 1:
        torch.save(sampling_net.state_dict(), f"{save_dir}/sampling_net_epoch_{epoch+1}.pt")
        torch.save(evaluation_net.state_dict(), f"{save_dir}/evaluation_net_epoch_{epoch+1}.pt")
        torch.save(optimization_net.state_dict(), f"{save_dir}/optimization_net_epoch_{epoch+1}.pt")
        print(f"模型已保存到 {save_dir}")

## 4. 使用训练好的模型进行路径规划

### 4.1 创建测试环境

In [None]:
# 创建测试环境
test_env = Environment(width=500.0, height=300.0)

# 添加一些障碍物
test_env.add_obstacle(x=150, y=150, obstacle_type="circle", radius=30)
test_env.add_obstacle(x=300, y=100, obstacle_type="rectangle", width=50, height=40)
test_env.add_obstacle(x=250, y=200, obstacle_type="circle", radius=25)

# 设置起点和终点
start_point = (50, 50)
goal_point = (450, 250)

# 可视化环境
fig, ax = plt.subplots(figsize=(10, 6))
test_env.plot_obstacles(ax)
ax.plot(start_point[0], start_point[1], 'go', markersize=10, label='起点')
ax.plot(goal_point[0], goal_point[1], 'ro', markersize=10, label='终点')
ax.set_xlim(0, 500)
ax.set_ylim(0, 300)
ax.set_aspect('equal')
ax.legend()
ax.set_title('测试环境')
plt.show()

### 4.2 使用神经网络增强的 RRT 进行路径规划

In [None]:
# 创建神经网络增强的 RRT 规划器
neural_rrt = NeuralRRT(
    sampling_net=sampling_net,
    evaluation_net=evaluation_net,
    optimization_net=optimization_net,
    start=start_point,
    goal=goal_point,
    env=test_env,
    max_iterations=1000
)

# 进行路径规划
path = neural_rrt.plan()

if path:
    print("路径规划成功!")
    # 可视化规划结果
    fig, ax = plt.subplots(figsize=(10, 6))
    test_env.plot_obstacles(ax)
    ax.plot(start_point[0], start_point[1], 'go', markersize=10, label='起点')
    ax.plot(goal_point[0], goal_point[1], 'ro', markersize=10, label='终点')
    
    # 绘制路径
    path_x = [p[0] for p in path]
    path_y = [p[1] for p in path]
    ax.plot(path_x, path_y, 'b-', linewidth=2, label='神经网络增强 RRT 路径')
    
    ax.set_xlim(0, 500)
    ax.set_ylim(0, 300)
    ax.set_aspect('equal')
    ax.legend()
    ax.set_title('神经网络增强 RRT 路径规划结果')
    plt.show()
else:
    print("未找到有效路径")

### 4.3 与传统 RRT* 算法对比

In [None]:
# 使用传统 RRT* 进行规划
rrt_star = RRTStar(
    start=start_point,
    goal=goal_point,
    env=test_env,
    max_iterations=1000,
    step_size=10.0,
    goal_sample_rate=0.2
)

rrt_path = rrt_star.plan()

if rrt_path:
    print("RRT* 规划成功!")
    # 可视化规划结果
    fig, ax = plt.subplots(figsize=(10, 6))
    test_env.plot_obstacles(ax)
    ax.plot(start_point[0], start_point[1], 'go', markersize=10, label='起点')
    ax.plot(goal_point[0], goal_point[1], 'ro', markersize=10, label='终点')
    
    # 绘制路径
    path_x = [p[0] for p in rrt_path]
    path_y = [p[1] for p in rrt_path]
    ax.plot(path_x, path_y, 'g-', linewidth=2, label='RRT* 路径')
    
    ax.set_xlim(0, 500)
    ax.set_ylim(0, 300)
    ax.set_aspect('equal')
    ax.legend()
    ax.set_title('RRT* 路径规划结果')
    plt.show()
else:
    print("RRT* 未找到有效路径")

# 比较路径质量
if path and rrt_path:
    neural_length = sum(np.linalg.norm(np.array(path[i+1]) - np.array(path[i])) 
                       for i in range(len(path)-1))
    rrt_length = sum(np.linalg.norm(np.array(rrt_path[i+1]) - np.array(rrt_path[i])) 
                     for i in range(len(rrt_path)-1))
    
    print(f"\n路径长度对比:")
    print(f"神经网络增强 RRT: {neural_length:.2f}")
    print(f"传统 RRT*: {rrt_length:.2f}")
    print(f"改进比例: {((rrt_length - neural_length) / rrt_length * 100):.2f}%")

## 5. 总结

本教程展示了如何：

1. 准备训练数据
2. 训练神经网络模型
3. 使用训练好的模型进行路径规划
4. 与传统 RRT* 算法进行对比

神经网络增强的 RRT 算法通过学习环境特征和历史规划经验，可以：

- 生成更智能的采样策略
- 更准确地评估路径质量
- 优化生成的路径

这些改进使得算法能够更快地找到更优质的路径。