In [1]:
import gymnasium as gym 
import numpy as np 
import torch
import torch.nn as nn 
import torch.optim as optim 
import random 
from collections import deque
import matplotlib.pyplot as plt 

In [2]:
# DQN 网络
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
        
    def forward(self, x): 
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

In [3]:
# 经验回放缓冲区
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.array(state), action, reward, np.array(next_state), done 
    
    def __len__(self):
        return len(self.buffer) 

In [4]:
# DQN代理
class DQNAgent:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 主网络和目标网络
        self.q_network = DQN(state_dim, action_dim).to(self.device)
        self.target_network = DQN(state_dim, action_dim).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=0.001)
        
        # 超参数
        self.gamma = 0.99  # 折扣因子
        self.epsilon = 1.0 # 初始探索率
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.batch_size = 64
        self.memory = ReplayBuffer(10000)
        self.target_update_freq = 100  # 目标网络更新频率
    
    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randrange(self.action_dim)
        state = torch.FloatTensor(state).to(self.device).unsqueeze(0)  # 从 (state_dim,) 转换为 (1, state_dim)
        with torch.no_grad():
            q_values = self.q_network(state)
        return q_values.argmax().item()
    
    def update(self, step): 
        if len(self.memory) < self.batch_size:
            return
        
        # 从回放缓冲区采样
        state, action, reward, next_state, done = self.memory.sample(self.batch_size)
        state = torch.FloatTensor(state).to(self.device)
        action = torch.LongTensor(action).to(self.device)
        reward = torch.FloatTensor(reward).to(self.device)
        next_state = torch.FloatTensor(next_state).to(self.device)
        done = torch.FloatTensor(done).to(self.device)
        
        # 计算 Q 值
        q_values = self.q_network(state).gather(1, action.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_network(next_state).max(1)[0]
        target = reward + (1 - done) * self.gamma * next_q_values
        
        # 计算损失并更新网络
        loss = nn.MSELoss()(q_values, target.detach()) 
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # 更新探索率
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        
        # 定期更新目标网络
        if step % self.target_update_freq == 0:
            self.target_network.load_state_dict(self.q_network.state_dict())
            
    def save_model(self, path="dqn_cartpole.pth"):
        torch.save(self.q_network.state_dict(), path)
    
    def load_model(self, path="dqn_cartpole.pth"):
        self.q_network.load_state_dict(torch.load(path))
        self.target_network.load_state_dict(self.q_network.state_dict())
        

In [None]:
# 训练函数
def train_dqn():
    env = gym.make("CartPole-v1")
    # env = gym.make("LunarLander-v3")
    agent = DQNAgent(state_dim=env.observation_space.shape[0], action_dim=env.action_space.n)
    
    num_episodes = 500
    max_steps = 500
    scores = []
    step = 0
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0
        
        for t in range(max_steps):
            action = agent.select_action(state)
            next_state, reward, done, truncated, _ = env.step(action)
            done = done or truncated
            agent.memory.push(state, action, reward, next_state, done)
            agent.update(step)
            state = next_state
            episode_reward += reward
            step += 1
            
            if done: 
                break
        scores.append(episode_reward)
        print(f"Episode {episode+1}/{num_episodes}, Reward: {episode_reward}, Epsilon: {agent.epsilon:.3f}")
        
    # 保存模型
    agent.save_model()
            
    # 绘制训练过程中的奖励曲线
    plt.figure(figsize=(10, 5))
    plt.plot(scores)
    plt.title("Training Rewards Over Episodes")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.grid()
    plt.savefig("training_rewards.png")
    plt.close()

    return agent

In [None]:
# 测试和可视化
def test_dqn(agent):
    env = gym.make("CartPole-v1", render_mode = "human")
    # env = gym.make("LunarLander-v3", render_mode = "human")
    agent.epsilon = 0.0 # 测试时禁用探索
    
    num_test_episodes = 5
    test_rewards = []
    
    for episode in range(num_test_episodes):
        state, _ = env.reset()
        episode_reward = 0
        done = False
        t = 0
        
        while not done and t < 500:
            action = agent.select_action(state)
            state, reward, done, truncated, _ = env.step(action)
            done = done or truncated
            episode_reward += reward
            t += 1
            
        test_rewards.append(episode_reward)
        print(f"Test Episode {episode+1}, Reward: {episode_reward}")

    env.close()
    return test_rewards


In [7]:
# 训练
print("开始训练 DQN...")
agent = train_dqn()


开始训练 DQN...
Episode 1/500, Reward: -299.126988862112, Epsilon: 0.818
Episode 2/500, Reward: -97.74559447140948, Epsilon: 0.496
Episode 3/500, Reward: -347.97912661826894, Epsilon: 0.335
Episode 4/500, Reward: -296.8380310808734, Epsilon: 0.205
Episode 5/500, Reward: -142.6587821434033, Epsilon: 0.072
Episode 6/500, Reward: -288.9314364642813, Epsilon: 0.040
Episode 7/500, Reward: -411.19273878949207, Epsilon: 0.023
Episode 8/500, Reward: -150.4581861657603, Epsilon: 0.010
Episode 9/500, Reward: -202.26465086400765, Epsilon: 0.010
Episode 10/500, Reward: -220.62561064863672, Epsilon: 0.010
Episode 11/500, Reward: -267.9975616884445, Epsilon: 0.010
Episode 12/500, Reward: -173.96618840155534, Epsilon: 0.010
Episode 13/500, Reward: -154.1930321596106, Epsilon: 0.010
Episode 14/500, Reward: -195.1619088582769, Epsilon: 0.010
Episode 15/500, Reward: -129.82235751654457, Epsilon: 0.010
Episode 16/500, Reward: -241.58506382186482, Epsilon: 0.010
Episode 17/500, Reward: -136.1887743264152, Eps

In [8]:
# 测试和可视化
print("\n开始测试和可视化...")
test_rewards = test_dqn(agent)

# 评估表现
avg_reward = np.mean(test_rewards)
print(f"\n模型评估：")
print(f"平均测试奖励：{avg_reward:.2f}")
print("训练奖励曲线已保存为 'training_rewards.png'")
if avg_reward > 475:
    print("表现优秀！代理成功学会了保持杆子平衡，接近最大奖励 500。")
elif avg_reward > 300:
    print("表现良好！代理能够保持平衡一段时间，但还有改进空间。")
else:
    print("表现一般。代理需要更多训练以提升性能。")


开始测试和可视化...
Test Episode 1, Reward: 81.08594187782563
Test Episode 2, Reward: 100.94688221944253
Test Episode 3, Reward: 109.91905886062199
Test Episode 4, Reward: 66.27508625447068
Test Episode 5, Reward: 69.63976619530172

模型评估：
平均测试奖励：85.57
训练奖励曲线已保存为 'training_rewards.png'
表现一般。代理需要更多训练以提升性能。


In [9]:
# 测试用例：
from collections import namedtuple
experience = namedtuple("Experience", field_names=["state", "action", "reward"])

experience = experience(2, 1, 10)
experience

Experience(state=2, action=1, reward=10)