In [1]:
import numpy as np
import random
import os
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from collections import deque
import copy
import pygame
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

pygame 2.6.1 (SDL 2.28.4, Python 3.12.7)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
import numpy as np
import random

class GridWorld:
    def __init__(self, grid_map, start, goal, max_steps=50000, goal_list=None):
        """
        初始化 GridWorld 环境
        """
        self.grid_map = grid_map
        self.base_grid_map = grid_map.copy()  # 保存基础地图（无动态障碍物）
        self.rows, self.cols = grid_map.shape
        self.max_steps = max_steps
        self.goal_list = goal_list if goal_list else []  # 目标点列表
        
        if start is not None and goal is not None:
            self.start = start
            self.goal = goal
            self.agent_pos = np.array(self.start, dtype=float)
            self.distance = np.linalg.norm(self.agent_pos - np.array(self.goal))
        else:
            self.reset_dynamic(start, goal)

    def reset_dynamic(self, num_obstacles=30, num_dynamic_obstacles=10):
        """
        每次 reset 时动态生成障碍物，并使用传入的起点和目标，或者随机生成。
        """
        self.grid_map = self.base_grid_map.copy()

        # 固定障碍物初始化
        valid_positions = [
            (i, j) for i in range(1, self.rows - 1) for j in range(1, self.cols - 1)
            if self.grid_map[i, j] == 0
        ]
        random_obstacles = random.sample(valid_positions, num_obstacles)
        for x, y in random_obstacles:
            self.grid_map[x, y] = 1

        # 动态障碍物初始化
        self.dynamic_obstacles = []
        for _ in range(num_dynamic_obstacles):
            x, y = random.choice(valid_positions)
            direction = random.choice(['up', 'down', 'left', 'right'])
            speed = random.randint(1, 2)

            self.dynamic_obstacles.append({
                'position': (x, y),
                'direction': direction,
                'speed': speed
            })

        # 确保起点和目标点不在障碍物中
        self.valid_positions = [
            (i, j) for i in range(1, self.rows - 1) for j in range(1, self.cols - 1)
            if self.grid_map[i, j] == 0
        ]
        self.agent_pos = np.array(self.start, dtype=float)
        self.steps = 0
        self.distance = np.linalg.norm(self.agent_pos - np.array(self.goal))

    def reset(self):
        """
        重置环境到初始状态
        """
        self.reset_dynamic()
        self.goal_list_copy = self.goal_list.copy()
        self.current_goal_index = 0
        self.goal = self.goal_list[self.current_goal_index] if self.goal_list else self.goal
        return self.get_state()

    def get_state(self):
        """
        获取当前状态
        """
        nearby_grid = np.ones((7, 7), dtype=int)
        x_min, x_max = max(0, int(self.agent_pos[0] - 3)), min(self.rows, int(self.agent_pos[0] + 4))
        y_min, y_max = max(0, int(self.agent_pos[1] - 3)), min(self.cols, int(self.agent_pos[1] + 4))
        r_min, r_max = 3 - (int(self.agent_pos[0]) - x_min), 3 + (x_max - int(self.agent_pos[0]))
        c_min, c_max = 3 - (int(self.agent_pos[1]) - y_min), 3 + (y_max - int(self.agent_pos[1]))

        nearby_grid[r_min:r_max, c_min:c_max] = self.grid_map[x_min:x_max, y_min:y_max]
        nearby_flat = nearby_grid.flatten()

        dx = self.goal[0] - self.agent_pos[0]
        dy = self.goal[1] - self.agent_pos[1]
        distance_to_goal = np.sqrt(dx**2 + dy**2)
        angle_to_goal = np.arctan2(dy, dx)

        # 当前状态
        current_state = np.concatenate(([distance_to_goal, angle_to_goal], nearby_flat))
        return current_state
    
    def step(self, action):
        """
        执行动作并更新环境状态
        """
        # 动作有效性检查（假设动作范围为 [-1, 1]）
        action = np.clip(action, -1, 1)
        
        # 将连续动作映射到移动量，假设最大移动量为 1 单元
        delta = action  # 可以根据需要调整比例

        # 更新位置
        new_pos = self.agent_pos + delta
        new_pos = np.clip(new_pos, 0, [self.rows - 1, self.cols - 1])

        # 检查是否碰到障碍物
        int_new_pos = tuple(new_pos.astype(int))
        if self.grid_map[int_new_pos] == 1:
            reward = -3.0
            done = True
            return self.get_state(), reward, done

        self.agent_pos = new_pos
        reward = -0.5
        done = False

        # 计算距离变化
        next_distance = np.linalg.norm(self.agent_pos - np.array(self.goal))
        if self.distance > next_distance:
            reward += 0.6
        else:
            reward -= 0.4
        self.distance = next_distance

        # 计算对齐奖励
        action_vector = np.array(delta)
        goal_vector = np.array(self.goal) - self.agent_pos
        goal_vector_norm = goal_vector / (np.linalg.norm(goal_vector) + 1e-5)
        
        if np.linalg.norm(goal_vector) > 0.1:
            alignment_reward = np.dot(action_vector, goal_vector_norm)
        else:
            alignment_reward = 0
        
        reward += alignment_reward * 0.4

        # 检查是否到达目标
        if self.distance < 0.5:
            reward += 10
            if self.current_goal_index + 1 < len(self.goal_list):
                self.current_goal_index += 1
                self.goal = self.goal_list[self.current_goal_index]  
            else:
                done = True

        # 更新动态障碍物的位置
        for obstacle in self.dynamic_obstacles:
            x, y = obstacle['position']
            direction = obstacle['direction']
            speed = obstacle['speed']
            
            if direction == 'up':
                new_ob_pos = (x - speed, y)
            elif direction == 'down':
                new_ob_pos = (x + speed, y)
            elif direction == 'left':
                new_ob_pos = (x, y - speed)
            elif direction == 'right':
                new_ob_pos = (x, y + speed)
            
            # 边界检查
            if 0 <= new_ob_pos[0] < self.rows and 0 <= new_ob_pos[1] < self.cols:
                if self.grid_map[new_ob_pos] == 1:
                    # 反向移动
                    if direction == 'up':
                        obstacle['direction'] = 'down'
                    elif direction == 'down':
                        obstacle['direction'] = 'up'
                    elif direction == 'left':
                        obstacle['direction'] = 'right'
                    elif direction == 'right':
                        obstacle['direction'] = 'left'
                else:
                    # 更新障碍物位置
                    self.grid_map[x, y] = 0  # 清除旧位置
                    self.grid_map[new_ob_pos] = 1  # 设置新位置
                    obstacle['position'] = new_ob_pos

        # 计算最小障碍物距离
        min_distance_to_obstacle = float('inf')
        for i in range(self.rows):
            for j in range(self.cols):
                if self.grid_map[i, j] == 1:  # 障碍物
                    distance_to_obstacle = np.linalg.norm(np.array([i, j]) - self.agent_pos)
                    min_distance_to_obstacle = min(min_distance_to_obstacle, distance_to_obstacle)

        # 奖励：距离障碍物越近，惩罚越大
        if min_distance_to_obstacle == 1.0:
            reward -= 2.0
        elif min_distance_to_obstacle <= 2.0:
            reward -= 1.5
        elif min_distance_to_obstacle <= 3.0:
            reward -= 0.2
        else:
            reward -= 0.01

        # 检查是否碰到动态障碍物
        for obstacle in self.dynamic_obstacles:
            if tuple(self.agent_pos.astype(int)) == obstacle['position']:
                reward -= 3.0
                done = True
                return self.get_state(), reward, done

        self.steps += 1
        if self.steps >= self.max_steps:
            done = True
            reward -= 10

        return self.get_state(), reward, done


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action, hidden_dim=256):
        super(Actor, self).__init__()
        self.l1 = nn.Linear(state_dim, hidden_dim)
        self.l2 = nn.Linear(hidden_dim, hidden_dim)
        self.l3 = nn.Linear(hidden_dim, action_dim)
        
        self.max_action = max_action

    def forward(self, state):
        a = F.relu(self.l1(state))
        a = F.relu(self.l2(a))
        a = torch.tanh(self.l3(a))  # 输出范围 [-1, 1]
        return a * self.max_action  # 缩放到动作空间范围


In [4]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(Critic, self).__init__()
        # Q1 architecture
        self.l1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.l2 = nn.Linear(hidden_dim, hidden_dim)
        self.l3 = nn.Linear(hidden_dim, 1)
        
        # Q2 architecture
        self.l4 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.l5 = nn.Linear(hidden_dim, hidden_dim)
        self.l6 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        sa = torch.cat([state, action], dim=1)
        
        # Q1 forward
        q1 = F.relu(self.l1(sa))
        q1 = F.relu(self.l2(q1))
        q1 = self.l3(q1)
        
        # Q2 forward
        q2 = F.relu(self.l4(sa))
        q2 = F.relu(self.l5(q2))
        q2 = self.l6(q2)
        
        return q1, q2

    def Q1(self, state, action):
        sa = torch.cat([state, action], dim=1)
        q1 = F.relu(self.l1(sa))
        q1 = F.relu(self.l2(q1))
        q1 = self.l3(q1)
        return q1


In [5]:
from collections import deque
import random

class ReplayBuffer:
    def __init__(self, max_size=1_000_000):
        self.buffer = deque(maxlen=max_size)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done
    
    def size(self):
        return len(self.buffer)


In [6]:
class TD3:
    def __init__(
        self,
        state_dim,
        action_dim,
        max_action,
        actor_lr=5e-3,
        critic_lr=1e-4,
        gamma=0.99,
        tau=0.005,
        policy_noise=0.2,
        noise_clip=0.5,
        policy_freq=2,
    ):
        self.actor = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        
        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        
        self.max_action = max_action
        self.gamma = gamma
        self.tau = tau
        self.policy_noise = policy_noise
        self.noise_clip = noise_clip
        self.policy_freq = policy_freq
        
        self.replay_buffer = ReplayBuffer()
        
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        
        self.total_it = 0

    def select_action(self, state):
        state = torch.FloatTensor(state.reshape(1, -1)).to(device)
        return self.actor(state).cpu().data.numpy().flatten()
    
    def train(self, batch_size=256):
        if self.replay_buffer.size() < batch_size:
            return
        
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
        
        state = torch.FloatTensor(state).to(device)
        action = torch.FloatTensor(action).to(device)
        reward = torch.FloatTensor(reward).unsqueeze(1).to(device)
        next_state = torch.FloatTensor(next_state).to(device)
        done = torch.FloatTensor(done).unsqueeze(1).to(device)
        
        with torch.no_grad():
            # 选择下一个动作，加入噪声
            noise = (torch.randn_like(action) * self.policy_noise).clamp(-self.noise_clip, self.noise_clip)
            next_action = (self.actor_target(next_state) + noise).clamp(-self.max_action, self.max_action)
            
            # 计算目标 Q 值
            target_Q1, target_Q2 = self.critic_target(next_state, next_action)
            target_Q = torch.min(target_Q1, target_Q2)
            target_Q = reward + ((1 - done) * self.gamma * target_Q).detach()
        
        # 当前 Q 值
        current_Q1, current_Q2 = self.critic(state, action)
        
        # 计算 critic 的损失
        critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)
        
        # 优化 critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        # Delayed policy updates
        if self.total_it % self.policy_freq == 0:
            # 计算 actor 的损失
            actor_loss = -self.critic.Q1(state, self.actor(state)).mean()
            
            # 优化 actor
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()
            
            # 更新目标网络
            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
            
            for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        
        self.total_it += 1
    
    def save(self, filename):
        torch.save(self.actor.state_dict(), filename + "_actor.pth")
        torch.save(self.critic.state_dict(), filename + "_critic.pth")
    
    def load(self, filename):
        self.actor.load_state_dict(torch.load(filename + "_actor.pth"))
        self.critic.load_state_dict(torch.load(filename + "_critic.pth"))

In [7]:
import os
import time
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm

def train_td3_agent(env, agent, episodes=1000, max_steps=1000, save_path="td3_model", log_dir="runs"):
    """
    使用 TD3 算法训练智能体，并记录训练信息到 TensorBoard。
    
    :param env: 环境实例 (GridWorld)
    :param agent: 智能体实例 (TD3)
    :param episodes: 训练的总回合数
    :param max_steps: 每回合的最大步数
    :param save_path: 模型保存路径（不带扩展名）
    :param log_dir: TensorBoard 日志保存目录
    """
    # 初始化 TensorBoard 日志记录器
    run_id = time.strftime("%Y%m%d-%H%M%S")
    log_dir = os.path.join(log_dir, f"run_{run_id}")
    os.makedirs(log_dir, exist_ok=True)
    writer = SummaryWriter(log_dir)
    
    best_reward = -float('inf')  # 记录最佳奖励
    step_counter = 0  # 全局步数计数器
    all_rewards = []  # 保存所有回合奖励
    all_losses = []  # 保存所有回合平均损失
    
    # 使用 tqdm 进度条显示训练过程
    with tqdm(total=episodes, desc="训练进度", unit="episode") as pbar:
        for episode in range(1, episodes + 1):
            state = env.reset()
            done = False
            total_reward = 0  # 当前回合总奖励
            episode_steps = 0  # 当前回合步数计数
            losses = []  # 当前回合的损失列表

            while not done:
                # 选择动作
                action = agent.select_action(state)
                
                # 环境执行动作并返回新状态、奖励和是否结束
                next_state, reward, done = env.step(action)
                
                # 存储经验
                agent.replay_buffer.add(state, action, reward, next_state, done)
                
                # 训练模型
                actor_loss, critic_loss = agent.train(batch_size=256)
                
                # 记录损失
                if actor_loss is not None and critic_loss is not None:
                    losses.append((actor_loss, critic_loss))
                
                state = next_state
                total_reward += reward
                episode_steps += 1
                step_counter += 1

                # 如果达到最大步数，则结束当前回合
                if episode_steps >= max_steps:
                    done = True

            # 记录奖励和损失到 TensorBoard
            writer.add_scalar("Reward/Episode", total_reward, episode)
            if actor_loss is not None and critic_loss is not None:
                avg_actor_loss = np.mean([loss[0] for loss in losses])
                avg_critic_loss = np.mean([loss[1] for loss in losses])
                writer.add_scalar("Loss/Actor", avg_actor_loss, episode)
                writer.add_scalar("Loss/Critic", avg_critic_loss, episode)
                all_losses.append((avg_actor_loss, avg_critic_loss))
            all_rewards.append(total_reward)
            
            # 更新 tqdm 进度条信息
            pbar.set_postfix({"总奖励": total_reward})
            pbar.update(1)
            
            # 如果当前回合的奖励是历史最高，则保存模型
            if total_reward > best_reward:
                best_reward = total_reward
                agent.save_model(save_path)
            
            # 每 10 个回合刷新 TensorBoard 数据
            if episode % 10 == 0:
                writer.flush()
    
    # 保存最终模型
    agent.save_model(save_path)
    
    # 关闭 TensorBoard 日志记录器
    writer.close()
    
    print(f"训练完成，模型已保存到 {save_path}_actor.pth 和 {save_path}_critic.pth")
    
    # 保存奖励和损失数据以便后续绘图
    np.save(os.path.join(log_dir, "rewards.npy"), all_rewards)
    np.save(os.path.join(log_dir, "actor_losses.npy"), [loss[0] for loss in all_losses])
    np.save(os.path.join(log_dir, "critic_losses.npy"), [loss[1] for loss in all_losses])



In [8]:
def train_td3(env, agent, episodes=1000, max_steps=1000, batch_size=256, save_every=100):
    import numpy as np
    import torch
    import random

    rewards_history = []
    for episode in range(1, episodes + 1):
        state = env.reset()
        episode_reward = 0
        for step in range(max_steps):
            # 选择动作
            action = agent.select_action(np.array(state))
            
            # 添加探索噪声
            noise = np.random.normal(0, 0.1, size=agent.actor.l3.out_features)
            action = (action + noise).clip(-agent.max_action, agent.max_action)
            
            # 执行动作
            next_state, reward, done = env.step(action)
            
            # 存储经验
            agent.replay_buffer.add(state, action, reward, next_state, float(done))
            
            state = next_state
            episode_reward += reward
            
            # 训练
            agent.train(batch_size)
            
            if done:
                break
        
        rewards_history.append(episode_reward)
        print(f"Episode {episode}, Reward: {episode_reward}")
        
        # 保存模型
        if episode % save_every == 0:
            agent.save("td3_gridworld")
    
    return rewards_history



In [9]:
def check_trained_model_td3(env, agent, model_path, max_steps=1000):
    """
    交互式测试训练好的模型，动态添加障碍物来评估避障能力。

    :param env: 环境实例 (GridWorld)
    :param agent: 智能体实例
    :param model_path: 已训练好的模型文件路径
    :param max_steps: 每次测试的最大步数
    """
    # 加载训练好的模型
    agent.load_model(model_path)
    start_time = time.time()
    print(f"Loaded model from {model_path}")

    # 设置 epsilon 为 0（测试时只选择最优动作）
    agent.epsilon = 0

    # 初始化 Pygame
    pygame.init()
    grid_size = 10  # 每个格子的像素大小
    screen = pygame.display.set_mode((env.cols * grid_size, env.rows * grid_size))
    pygame.display.set_caption("Trained Model Test")
    clock = pygame.time.Clock()

    # 初始化字体
    pygame.font.init()
    font = pygame.font.SysFont("Arial", 24)

    # 测试
    state = env.reset()
    done = False
    total_reward = 0
    path = [env.agent_pos]  # 路径记录
    steps = 0

    while not done and steps < max_steps:
        # 渲染环境
        screen.fill((255, 255, 255))  # 清空屏幕，设置白色背景

        # 绘制栅格地图
        for i in range(env.rows):
            for j in range(env.cols):
                color = (255, 255, 255)  # 默认白色
                if env.grid_map[i, j] == 1:
                    color = (0, 0, 0)  # 黑色障碍物
                pygame.draw.rect(screen, color, (j * grid_size, i * grid_size, grid_size, grid_size))

        # 绘制目标
        pygame.draw.rect(
            screen,
            (0, 0, 255),
            (env.goal[1] * grid_size, env.goal[0] * grid_size, grid_size, grid_size)
        )  # 蓝色目标点

        # 绘制路径轨迹
        for pos in path:
            pygame.draw.rect(
                screen,
                (200, 200, 200),  # 浅灰色轨迹
                (int(pos[1] * grid_size), int(pos[0] * grid_size), grid_size, grid_size)  # 使用矩形表示路径
            )

        # 绘制智能体
        car_x = int(env.agent_pos[1] * grid_size + grid_size // 2)
        car_y = int(env.agent_pos[0] * grid_size + grid_size // 2)
        car_radius = grid_size // 4

        # 绘制小车的主体
        pygame.draw.circle(screen, (255, 0, 0), (car_x, car_y), car_radius)

        # 智能体选择动作
        action = agent.select_action(state)  # 测试时选择最优动作
        next_state, reward, done = env.step(action)

        pygame.display.flip()  # 更新显示

        # 更新状态和累计奖励
        state = next_state
        total_reward += reward
        path.append(env.agent_pos)  # 记录路径
        steps += 1

        # 检查是否按下退出事件或者添加障碍物
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                return
            elif event.type == pygame.MOUSEBUTTONDOWN:
                # 获取鼠标点击的坐标
                mouse_x, mouse_y = pygame.mouse.get_pos()
                # 转换为对应的栅格坐标
                grid_x = mouse_x // grid_size
                grid_y = mouse_y // grid_size
                # 在地图中添加障碍物
                if env.grid_map[grid_y, grid_x] == 0:  # 只在空白位置添加障碍物
                    env.grid_map[grid_y, grid_x] = 1
                    print(f"Obstacle added at ({grid_y}, {grid_x})")
        
        # 限制帧率
        clock.tick(0.7)
    end_time = time.time()
    pygame.quit()  # 测试完成后关闭窗口

    # 输出测试结果
    print(f"Test Completed. Total Reward: {total_reward}, Steps Taken: {steps}")
    print(f"仿真完成，耗时 {end_time - start_time:.2f} 秒")

In [10]:
def check_trained_model_num_td3(env, agent, model_path, max_steps=1000, num_runs=100):
    """
    测试训练好的 TD3 模型在 GridWorld 环境中的表现，统计成功次数。
    
    :param env: 环境实例 (GridWorld)
    :param agent: 智能体实例 (TD3)
    :param model_path: 已训练好的模型文件路径（不带扩展名）
    :param max_steps: 每次测试的最大步数
    :param num_runs: 测试的总次数
    """
    # 加载训练好的模型
    agent.load_model(model_path)
    print(f"Loaded model from {model_path}_actor.pth and {model_path}_critic.pth")
    
    success_count = 0  # 记录成功次数
    
    # 设置为评估模式
    agent.actor.eval()
    agent.critic.eval()
    
    with torch.no_grad():
        # 测试多次
        for run in range(1, num_runs + 1):
            state = env.reset()
            done = False
            steps = 0
            total_reward = 0
            path = [env.agent_pos.copy()]  # 路径记录
            
            while not done and steps < max_steps:
                # 智能体选择动作
                action = agent.select_action(state)
                next_state, reward, done = env.step(action)
                
                # 更新状态和累计奖励
                state = next_state
                total_reward += reward
                path.append(env.agent_pos.copy())  # 记录路径
                steps += 1
                
                # 检查是否到达目标
                if np.linalg.norm(env.agent_pos - np.array(env.goal)) < 0.5:
                    success_count += 1
                    break  # 达到目标则结束当前测试
            
            print(f"Run {run}: Reward = {total_reward}, Steps = {steps}, Success = {done and total_reward >= 10}")
    
    # 输出成功次数
    print(f"测试完成。成功次数: {success_count}/{num_runs}")

In [11]:
def train_main():
    # 设置 CUDA_LAUNCH_BLOCKING 以便同步错误报告（仅调试时使用）
    os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

    # 加载网格地图
    grid_map = np.loadtxt("grid_map_final.txt", dtype=int)
    
    # 定义目标点列表
    nodes = [(51, 12), (55, 23), (55, 30), (32, 47), (9, 50), (9, 64)]
    
    # 初始化环境
    env = GridWorld(
        grid_map=grid_map,
        start=(58, 5),
        goal=(51, 12),
        goal_list=nodes
    )
    
    state_dim = env.get_state().shape[0]  # 状态维度，例如 51
    action_dim = 2  # 连续动作：dx 和 dy
    max_action = 1.0  # 动作范围 [-1, 1]
    
    # 初始化 TD3 Agent
    agent = TD3(
        state_dim=state_dim,
        action_dim=action_dim,
        max_action=max_action,
    )
    
    # 开始训练
    print("Starting training...")
    train_td3(
        env=env,
        agent=agent,
        episodes=2000,
        max_steps=10000,
    )
    
    # 保存最终模型
    agent.save_model("td3_gridworld_final")

train_main()

Starting training...
Episode 1, Reward: -19.454156605830832
Episode 2, Reward: -26.67165886176781
Episode 3, Reward: -18.00078806711348
Episode 4, Reward: -30.84581672395841
Episode 5, Reward: -24.218817535467444
Episode 6, Reward: -20.53843797319585
Episode 7, Reward: -9.353223146457182
Episode 8, Reward: -14.234742097330182
Episode 9, Reward: -12.91400888865037
Episode 10, Reward: -12.850032421278112
Episode 11, Reward: -15.651508267155567
Episode 12, Reward: -20.0462168449548
Episode 13, Reward: -19.066977694530358
Episode 14, Reward: -15.304154476387994
Episode 15, Reward: -14.263788723983284
Episode 16, Reward: -21.433091954159853
Episode 17, Reward: -25.20727133008628
Episode 18, Reward: -16.674222510148077
Episode 19, Reward: -21.528290816585763
Episode 20, Reward: -19.01593214859142
Episode 21, Reward: -18.027904546386004
Episode 22, Reward: -18.057076455183605
Episode 23, Reward: -20.474965214299033
Episode 24, Reward: -19.41148338529437
Episode 25, Reward: -14.229840614729024

KeyboardInterrupt: 

In [None]:
def main_test():
    # 初始化环境
    grid_map = np.loadtxt("grid_map_final.txt", dtype=int)
    nodes=[(51, 12), (55, 23), (55, 30), (32, 47), (9, 50), (9, 64)]
    env = GridWorld(grid_map , start=(58, 5) , goal=(51, 12),goal_list=nodes)
    
    state_dim = env.get_state().shape[0]  # 应为 51
    action_dim = 2  # 连续动作：dx 和 dy
    max_action = 1.0  # 动作范围 [-1, 1]
    
    # 初始化 TD3 Agent
    agent = TD3(state_dim, action_dim, max_action, device="cuda" if torch.cuda.is_available() else "cpu")
    
    # 加载训练好的模型
    trained_model_path = "td3_gridworld_final"  # 模型路径
    agent.load_model(trained_model_path)
    
    # 测试多个回合
    success_runs = check_trained_model_num_td3(env, agent, trained_model_path, max_steps=1000, num_runs=100)
    
    # 测试并动态渲染单次
    check_trained_model_td3(env, agent, trained_model_path, max_steps=1000)

main_test()

  self.actor.load_state_dict(torch.load(filename + "_actor.pth"))
  self.critic.load_state_dict(torch.load(filename + "_critic.pth"))


Loaded model from td3_gridworld_final_actor.pth and td3_gridworld_final_critic.pth
Run 1: Reward = 22.01634070009231, Steps = 24, Success = True
Run 2: Reward = 14.955437269089117, Steps = 23, Success = True
Run 3: Reward = 9.998472894139216, Steps = 28, Success = False
Run 4: Reward = -5.186766226810603, Steps = 14, Success = False
Run 5: Reward = 16.16634039368835, Steps = 24, Success = True
Run 6: Reward = 9.998472894139216, Steps = 28, Success = False
Run 7: Reward = 21.135962740260062, Steps = 24, Success = True
Run 8: Reward = 22.01634070009231, Steps = 24, Success = True
Run 9: Reward = 20.907906854607408, Steps = 24, Success = True
Run 10: Reward = 12.294992283396553, Steps = 22, Success = True
Run 11: Reward = 12.75574032921128, Steps = 33, Success = True
Run 12: Reward = -4.668823064451061, Steps = 3, Success = False
Run 13: Reward = 6.310747876838583, Steps = 13, Success = False
Run 14: Reward = 21.137571322097, Steps = 23, Success = True
Run 15: Reward = -91.79765708798266,

In [None]:
check_trained_model_td3(env, agent, "td3_gridworld_final", max_steps=1000)

  self.actor.load_state_dict(torch.load(filename + "_actor.pth"))
  self.critic.load_state_dict(torch.load(filename + "_critic.pth"))


Loaded model from td3_gridworld_final
Test Completed. Total Reward: 20.687011300547542, Steps Taken: 26
仿真完成，耗时 37.21 秒
