# Double DQN for CartPole with Image Input

使用图像输入的 Double DQN 算法来解决 CartPole 问题

In [None]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import cv2
from collections import deque
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
class CartPoleImageWrapper(gym.Wrapper):
    def __init__(self, env, width=84, height=84):
        super().__init__(env)
        self.width = width
        self.height = height
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(height, width, 1), dtype=np.uint8
        )
        cv2.namedWindow('CartPole', cv2.WINDOW_NORMAL)
        cv2.resizeWindow('CartPole', 600, 400)

    def render_frame(self):
        frame = self.env.render()
        cv2.imshow('CartPole', frame)
        cv2.waitKey(1)
        
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(frame, (self.width, self.height))
        return np.expand_dims(frame, axis=-1)

    def reset(self):
        self.env.reset()
        return self.render_frame()

    def step(self, action):
        _, reward, terminated, truncated, info = self.env.step(action)
        done = terminated or truncated
        obs = self.render_frame()
        return obs, reward, done, info

    def close(self):
        cv2.destroyAllWindows()
        self.env.close()

In [None]:
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        conv_out_size = self._get_conv_out(input_shape)
        
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
        
    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, 1, *shape))
        return int(np.prod(o.size()))
    
    def forward(self, x):
        x = x.float() / 255.0
        x = x.permute(0, 3, 1, 2)
        conv_out = self.conv(x).reshape(x.size()[0], -1)
        return self.fc(conv_out)

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return (np.array(state), np.array(action), np.array(reward), 
                np.array(next_state), np.array(done))
    
    def __len__(self):
        return len(self.buffer)

In [None]:
# 超参数
BATCH_SIZE = 128
GAMMA = 0.999
EPSILON_START = 0.9
EPSILON_END = 0.01
EPSILON_DECAY = 3000
TARGET_UPDATE = 50
LEARNING_RATE = 1e-4
MEMORY_SIZE = 100000
EPISODES = 1000

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
def train():
    env = CartPoleImageWrapper(gym.make('CartPole-v1', render_mode="rgb_array"))
    
    policy_net = DQN(env.observation_space.shape[:-1], env.action_space.n).to(device)
    target_net = DQN(env.observation_space.shape[:-1], env.action_space.n).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    
    optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
    memory = ReplayBuffer(MEMORY_SIZE)
    
    steps_done = 0
    episode_rewards = []
    
    def select_action(state, eps_threshold):
        if random.random() > eps_threshold:
            with torch.no_grad():
                state = torch.FloatTensor(state).unsqueeze(0).to(device)
                return policy_net(state).max(1)[1].view(1, 1)
        else:
            return torch.tensor([[random.randrange(env.action_space.n)]], 
                              device=device, dtype=torch.long)
    
    def optimize_model():
        if len(memory) < BATCH_SIZE:
            return
        
        states, actions, rewards, next_states, dones = memory.sample(BATCH_SIZE)
        
        states = torch.FloatTensor(states).to(device)
        actions = torch.LongTensor(actions).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.FloatTensor(dones).to(device)
        
        next_actions = policy_net(next_states).max(1)[1].unsqueeze(1)
        next_state_values = target_net(next_states).gather(1, next_actions)
        expected_state_action_values = rewards.unsqueeze(1) + GAMMA * next_state_values * (1 - dones.unsqueeze(1))
        
        state_action_values = policy_net(states).gather(1, actions.unsqueeze(1))
        
        loss = nn.MSELoss()(state_action_values, expected_state_action_values.detach())
        if steps_done % 100 == 0:
            print(f"Step {steps_done}, Loss: {loss.item():.4f}")
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    try:
        for episode in range(EPISODES):
            state = env.reset()
            total_reward = 0
            
            while True:
                epsilon = EPSILON_END + (EPSILON_START - EPSILON_END) * \
                         np.exp(-1. * steps_done / EPSILON_DECAY)
                steps_done += 1
                
                action = select_action(state, epsilon)
                next_state, reward, done, info = env.step(action.item())
                
                memory.push(state, action.item(), reward, next_state, done)
                state = next_state
                total_reward += reward
                
                optimize_model()
                
                if done:
                    print(f"Episode {episode}, Total Reward: {total_reward}, epsilon: {epsilon}")
                    episode_rewards.append(total_reward)

                    # 保存最好的模型
                    if total_reward >= max(episode_rewards):
                    torch.save({
                        'episode': episode,
                        'model_state_dict': policy_net.state_dict(),
                        'optimizer_state_dict': optimizer.state_dict(),
                        'reward': total_reward,
                        'epsilon': epsilon
                    }, '/Users/xiaodong.guo/workspace/python/RLPlayground/ddqn/best_model.pth')
                    break
            
            if episode % TARGET_UPDATE == 0:
                target_net.load_state_dict(policy_net.state_dict())
    
    finally:
        env.close()
        
    return episode_rewards

# 添加加载模型和测试的函数
def load_and_play():
    env = CartPoleImageWrapper(gym.make('CartPole-v1', render_mode="rgb_array"))
    
    # 创建模型
    policy_net = DQN(env.observation_space.shape[:-1], env.action_space.n).to(device)
    
    # 加载模型
    checkpoint = torch.load('/Users/xiaodong.guo/workspace/python/RLPlayground/ddqn/best_model.pth')
    policy_net.load_state_dict(checkpoint['model_state_dict'])
    policy_net.eval()  # 设置为评估模式
    
    print(f"Loading model from episode {checkpoint['episode']} with reward {checkpoint['reward']}")
    
    try:
        for episode in range(10):  # 玩10个回合
            state = env.reset()
            total_reward = 0
            
            while True:
                # 使用模型预测动作
                with torch.no_grad():
                    state = torch.FloatTensor(state).unsqueeze(0).to(device)
                    action = policy_net(state).max(1)[1].view(1, 1)
                
                next_state, reward, done, info = env.step(action.item())
                state = next_state
                total_reward += reward
                
                if done:
                    print(f"Test Episode {episode}, Total Reward: {total_reward}")
                    break

In [None]:
# 训练并保存模型
rewards = train()

# 加载模型并测试
load_and_play()

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(rewards, alpha=0.6, label='Raw Rewards')

# 计算移动平均
window_size = 10
moving_avg = np.convolve(rewards, np.ones(window_size)/window_size, mode='valid')
plt.plot(range(window_size-1, len(rewards)), moving_avg, 
         label=f'{window_size}-Episode Moving Average')

plt.title('Training Progress')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.legend()
plt.grid(True)
plt.show()