In [1]:
import random
import gym
import numpy as np
import collections
from tqdm import tqdm
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

class ReplayBuffer:
    ''' 经验回放池 '''
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)     # 队列,先进先出

    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):  # 从buffer中采样数据,数量为batch_size
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)  # 类似于解包
        return np.array(state), action, reward, np.array(next_state), done  

    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)
    

class ConvolutionalQnet(torch.nn.Module):
    ''' 加入卷积层的Q网络 '''
    def __init__(self, action_dim, in_channels=4):
        super(ConvolutionalQnet, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels, 32, kernel_size=8, stride=4)
        self.conv2 = torch.nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = torch.nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc4 = torch.nn.Linear(22528, 512)
        self.head = torch.nn.Linear(512, action_dim)

    def forward(self, x):
        x = x / 255
        x = F.relu(self.conv1(x))   # torch.Size([1, 32, 51, 39])
        x = F.relu(self.conv2(x))   # torch.Size([1, 64, 24, 18])
        x = F.relu(self.conv3(x))   # torch.Size([1, 64, 22, 16])
        x = x.view(x.size(0), -1)   # 展平 torch.Size([1, 22528])
        x = F.relu(self.fc4(x))
        return self.head(x)
    # 输入 state 的形状应该是 (batch_size, 4, 210, 160)
    
class DQN:
    ''' DQN算法 '''
    def __init__(self, action_dim, learning_rate, gamma, epsilon, target_update, device):
        self.action_dim = action_dim
        # 当前Q网络
        self.q_net = ConvolutionalQnet(action_dim).to(device)  # 更新为只传入 action_dim
        # 目标网络
        self.target_q_net = ConvolutionalQnet(action_dim).to(device)  # 更新为只传入 action_dim
        # 使用Adam优化器
        self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate)
        self.gamma = gamma  # 折扣因子
        self.epsilon = epsilon  # epsilon-贪婪策略
        self.target_update = target_update  # 目标网络更新频率
        self.count = 0  # 计数器,记录更新次数
        self.device = device

    def take_action(self, state):  # epsilon-贪婪策略采取动作
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.action_dim)
        else:
            state = torch.tensor(state, dtype=torch.float).to(self.device)  # 确保输入维度正确
            action = self.q_net(state).argmax().item()  # 通过当前网络，返回具有最大Q值的动作
        return action

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        # print("update State shape:", state.shape)  # 输出状态的形状
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)

        q_values = self.q_net(states).gather(1, actions)  # Q值
        # 下个状态的最大Q值
        max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)  # 目标Q，最大q值的action
        q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)  # TD误差目标
        
        dqn_loss = F.mse_loss(q_values, q_targets)  # 均方误差损失函数
        self.optimizer.zero_grad()  # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
        dqn_loss.backward()  # 反向传播更新参数
        self.optimizer.step()

        if self.count % self.target_update == 0:
            self.target_q_net.load_state_dict(self.q_net.state_dict())  # 更新目标网络
        self.count += 1

lr = 2e-3
num_episodes = 500
hidden_dim = 128
gamma = 0.98
epsilon = 0.01
target_update = 10
buffer_size = 10000
minimal_size = 500
batch_size = 64
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'SpaceInvaders-v0'
env = gym.make('SpaceInvaders-v0')
random.seed(0)
np.random.seed(0)
env.seed(0)
torch.manual_seed(0)

replay_buffer = ReplayBuffer(buffer_size)
action_dim = env.action_space.n

agent = DQN(action_dim, lr, gamma, epsilon, target_update, device)

return_list = []
for i in range(10):
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0
            state = env.reset()                         # [210,160,3]
            state = np.stack([state] * 4, axis=0)       # 堆叠4帧 [4,210,160,3]
            state = state[..., 0]
            state = np.expand_dims(state, axis=0)       # (1, 4, 210,160)
            
            done = False
            while not done:
                action = agent.take_action(state)
                next_state, reward, done, _ = env.step(action)
                
                next_state = np.stack([next_state] * 4, axis=0)  # 堆叠4帧
                next_state = next_state[..., 0]  # 忽略颜色通道
                next_state = np.expand_dims(next_state, axis=0)  # (1, 4, 210, 160)
                # print("State shape:", next_state.shape)  # 输出状态的形状
                replay_buffer.add(state.squeeze(0), action, reward, next_state.squeeze(0), done) # 加入经验池之前要去掉batch_size这一维度
                state = next_state

                episode_return += reward
                # 当buffer数据的数量超过一定值后,才进行Q网络训练
                if replay_buffer.size() > minimal_size:
                    b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
                    transition_dict = {
                        'states': b_s,
                        'actions': b_a,
                        'next_states': b_ns,
                        'rewards': b_r,
                        'dones': b_d
                    }
                    agent.update(transition_dict)
            return_list.append(episode_return)
            
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

  from .autonotebook import tqdm as notebook_tqdm
  f"The environment {id} is out of date. You should consider "
  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Core environment is written in old step API which returns one bool instead of two. "
Iteration 0:   2%|▏         | 1/50 [02:31<2:03:34, 151.31s/it]


KeyboardInterrupt: 