obs[0,1,2,3]->水平位置，水平速度，相对于竖直方向的角度，杆子的角速度

In [1]:
import gymnasium as gym
env = gym.make("CartPole-v1")
obs,info = env.reset()
print(obs)
print(info)

[ 0.01783977  0.04764322  0.02815316 -0.04374161]
{}


In [8]:
import gymnasium as gym
import torch
import numpy as np
import random
from collections import deque
import torch.nn as nn
import torch.optim as optim

class ImprovedDNQNet(nn.Module):
    def __init__(self, obs_dim, n_actions):
        super(ImprovedDNQNet, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )
    
    def forward(self, x):
        return self.net(x)

class ImprovedDNQAgent():
    def __init__(self, obs_dim, n_actions):
        self.n_actions = n_actions
        self.memory = deque(maxlen=100000)  # 更大的回放缓冲区
        self.batch_size = 128
        self.gamma = 0.99
        self.epsilon = 1.0
        self.min_epsilon = 0.01  # 更低的最终探索率
        self.epsilon_decay = 0.998  # 更慢的衰减
        self.lr = 1e-4  # 更小的学习率
        self.tau = 0.005  # 软更新参数

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")

        self.policy_net = ImprovedDNQNet(obs_dim, n_actions).to(self.device)
        self.target_net = ImprovedDNQNet(obs_dim, n_actions).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.lr)
        self.loss_fn = nn.SmoothL1Loss()  # 使用Huber损失代替MSE
        
    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.n_actions - 1)
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_value = self.policy_net(state)
        return q_value.argmax().item()
    
    def store(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def train_step(self):
        if len(self.memory) < self.batch_size:
            return 0
            
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(np.array(states)).to(self.device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

        # 当前Q值
        current_q = self.policy_net(states).gather(1, actions)
        
        # 下一个状态的最大Q值（使用目标网络）
        with torch.no_grad():
            next_q = self.target_net(next_states).max(1)[0].unsqueeze(1)
            target_q = rewards + (1 - dones) * self.gamma * next_q

        # 计算损失
        loss = self.loss_fn(current_q, target_q)
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0)
        self.optimizer.step()
        
        return loss.item()
    
    def soft_update_target(self):
        """软更新目标网络"""
        for target_param, policy_param in zip(self.target_net.parameters(), self.policy_net.parameters()):
            target_param.data.copy_(self.tau * policy_param.data + (1.0 - self.tau) * target_param.data)
    
    def decay_epsilon(self):
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

# 预填充回放缓冲区
def prefill_memory(agent, env, num_samples=10000):
    print("Prefilling replay memory...")
    state, _ = env.reset()
    for _ in range(num_samples):
        action = env.action_space.sample()  # 随机动作
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        # 调整奖励函数，鼓励保持平衡
        reward = reward if not done else -10
        
        agent.store(state, action, reward, next_state, done)
        
        if done:
            state, _ = env.reset()
        else:
            state = next_state
    print(f"Memory pre-filled with {len(agent.memory)} samples")

# 训练过程
env = gym.make("CartPole-v1")
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n

agent = ImprovedDNQAgent(obs_dim, n_actions)

# 预填充经验回放
prefill_memory(agent, env, 5000)

n_episodes = 2000
print_interval = 500
scores = deque(maxlen=100)  # 记录最近100个episode的分数

for episode in range(n_episodes):
    state, _ = env.reset()
    total_reward = 0
    episode_loss = 0
    step_count = 0

    for t in range(500):  # CartPole的最大步长是500
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        # 改进的奖励函数：越久不倒奖励越高
        if not done:
            reward = 1.0 + t * 0.01  # 随时间增加额外奖励
        else:
            reward = -10  # 失败惩罚
            
        agent.store(state, action, reward, next_state, done)
        
        # 更频繁的训练
        loss = agent.train_step()
        if loss:
            episode_loss += loss
            
        # 软更新目标网络（每步都更新一点）
        agent.soft_update_target()
        
        state = next_state
        total_reward += reward
        step_count += 1
        
        if done:
            break

    agent.decay_epsilon()
    scores.append(total_reward)
    
    if episode % print_interval == 0:
        avg_score = np.mean(scores) if scores else 0
        print(f"Episode {episode:4d} | Score: {total_reward:6.1f} | "
              f"Avg Score: {avg_score:6.1f} | Epsilon: {agent.epsilon:.3f} | "
              f"Steps: {step_count:3d}")

    # 提前停止条件：连续100个episode平均分达到480
    if len(scores) >= 100 and np.mean(scores) >= 480:
        print(f"Training completed! Achieved target score at episode {episode}")
        break

Using device: cuda
Prefilling replay memory...
Memory pre-filled with 5000 samples
Episode    0 | Score:    6.0 | Avg Score:    6.0 | Epsilon: 0.998 | Steps:  16
Episode  500 | Score:  137.5 | Avg Score:   38.4 | Epsilon: 0.367 | Steps: 100
Episode 1000 | Score:   58.3 | Avg Score:   39.6 | Epsilon: 0.135 | Steps:  55
Episode 1500 | Score:   28.3 | Avg Score:   25.9 | Epsilon: 0.050 | Steps:  34


In [None]:
# 测试训练好的智能体
print("\nTesting trained agent...")
state, _ = env.reset()
total_reward = 0
while True:
    env.render()
    action = agent.select_action(state)
    state, reward, terminated, truncated, _ = env.step(action)
    total_reward += reward
        break
        
print(f"Test score: {total_reward}")
env.close()


Testing trained agent...
Test score: 11.0


: 

In [None]:
for i in range(5):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    print(f"第{i+1}步: 动作={action}, 状态={obs}, 奖励={reward}")
print(111)

第1步: 动作=0, 状态=[ 0.01879263 -0.14787088  0.02727833  0.25768927], 奖励=1.0
第2步: 动作=1, 状态=[ 0.01583522  0.04685122  0.03243211 -0.02626638], 奖励=1.0
第3步: 动作=1, 状态=[ 0.01677224  0.24149342  0.03190678 -0.3085428 ], 奖励=1.0
第4步: 动作=1, 状态=[ 0.02160211  0.43614656  0.02573593 -0.5909949 ], 奖励=1.0
第5步: 动作=1, 状态=[ 0.03032504  0.6308989   0.01391603 -0.87546116], 奖励=1.0
