<a href="https://colab.research.google.com/github/Qiongqi123/DQN-Atari-pong/blob/main/DQN_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ale-py
!pip install gym[atari,accept-rom-license]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 扫描到相应的路径
import sys
sys.path.append('//content/drive/MyDrive/Colab Notebooks')

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torch.autograd as autograd
from collections import namedtuple

import gym, random, pickle, os.path, math, glob

from wrappers import *

from collections import namedtuple
from itertools import count
import numpy as np
import time
import pandas as pd
import matplotlib.pyplot as plt
import os

In [None]:
# from memory import ReplayMemory
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#用namedtuple()函数，定义了一个命名元组，用于存储智能体与环境交互时的经验
Transition = namedtuple('Transion', ('state', 'action', 'next_state', 'reward'))

In [None]:
## 超参数
epsilon = 0.9
BATCH_SIZE = 32
GAMMA = 0.99
EPS_START = 1
EPS_END = 0.02
EPS_DECAY = 500000 #经过EPS_DECAY，epsilon由EPS_START衰减至EPS_END
TARGET_UPDATE = 1000
RENDER = False
lr = 1e-3
INITIAL_MEMORY = 10000
MEMORY_SIZE = 10 * INITIAL_MEMORY
n_episode = 2000

In [None]:
# 这里用colab运行时的路径
MODEL_STORE_PATH = '/content/drive/My Drive/'+'DQN_pytorch_pong'
modelname = 'DQN_Pong'
model_path = MODEL_STORE_PATH + '/' + 'model/' + 'DQN_Pong_episode.pt'

# 本地运行时
# MODEL_STORE_PATH = os.getcwd()
# print(MODEL_STORE_PATH)
# modelname = 'DQN_Pong'
# madel_path = MODEL_STORE_PATH + '/' + 'model/' + 'DQN_Pong_episode900.pt'

In [None]:
class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = [] #经验回放缓冲区一开始是一个空列表
        self.position = 0

    def push(self, *args):
        # 在缓冲区未满时，预留一个空槽来存储新的经验。
        # 为了防止缓冲区满了之后的下一个位置并不存在新的经验存储时，会发生索引越界错误。
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        # 循环队列的更新，当缓冲区满了之后就会从头开始覆盖
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class DQN(nn.Module):
    # 之所以初始化了参数，是为了预防漏掉某个参数使会报错，若参数被定义则会覆盖掉这里的初始化
    def __init__(self, in_channels=4, n_actions=14):
        """
        Args:
            in_channels (int): 输入通道数量
            n_actions (int): 输出动作数量，pong游戏中有14个动作
        """
        super(DQN, self).__init__()
        """
        使用了三个卷积层和两个全连接层，并且每个卷积层后都有批量归一化层；
        若使用批量归一化层提高模型的稳定性和泛化能力
        """
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=8, stride=4)
        # self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        # self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        # self.bn3 = nn.BatchNorm2d(64)
        self.fc4 = nn.Linear(7 * 7 * 64, 512)
        self.head = nn.Linear(512, n_actions)

    def forward(self, x):
        x = x.float() / 255 # 数据归一化，将像素值归一化到[0, 1]的范围内
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc4(x.view(x.size(0), -1)))
        return self.head(x)

In [None]:
class DQN_agent():
    def __init__(self,in_channels=4, action_space=[], learning_rate=1e-3, memory_size=100000, epsilon=0.99):
        """
        Args:
          in_channels:
          action_space:
          learning_rate:
          memory_size:
          epsilon:
        """
        self.in_channels = in_channels
        # action_space表示智能体在环境中可以采取的所有可能动作的集合，通常由Gym环境提供
        self.action_space = action_space
        # self.action_dim变量将存储智能体可以采取的动作数量
        self.action_dim = self.action_space.n
        self.memory_buffer = ReplayMemory(memory_size)
        self.stepdone = 0
        # 采用双网络结构，在线网络逐步更新，目标网络在一定步数后更新
        self.DQN = DQN(self.in_channels, self.action_dim).cuda()
        self.target_DQN = DQN(self.in_channels, self.action_dim).cuda()
        # 加载之前训练好的模型，如果没有则提示，！模型保存这里有问题需要修改
        if os.path.exists(model_path):
            print(f"Loading model from {model_path}")
            self.DQN.load_state_dict(torch.load(model_path))
            self.target_DQN.load_state_dict(self.DQN.state_dict())
        else:
            print(f"Model file not found at {model_path}. Starting with a new model.")
        # 使用RMSprop优化
        self.optimizer = optim.RMSprop(self.DQN.parameters(),lr=learning_rate, eps=0.001, alpha=0.95)
        # 使用随机梯度上升优化器
        #self.optimizer = optim.SGD(self.DQN.parameters(), lr=learning_rate)



    def select_action(self, state):
        self.stepdone += 1
        #epsilon = 0.99
        state = state.to(device)
        # 使用epsilon衰减的方法来平衡探索和利用
        epsilon = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * self.stepdone / EPS_DECAY)
        # print(epsilon)
        # epsilon-greedy算法
        if random.random()<epsilon:
            # 从0-action_dim中随机选一个动作进行，action是一个二维的张量
            action = torch.tensor([[random.randrange(self.action_dim)]], device=device, dtype=torch.long)
        else:
            action = self.DQN(state).detach().max(1)[1].view(1,1)
            """
            self.DQN(state)：之前定义的DQN接受state作为输入，state是一个4维张量，根据DQN，输出是一个张量表示每个动作的Q值，eg[1,14]
            max(1)：第一个维度上（动作维度）寻找最大值，返回一个包含最大值和相应索引的元组 eg.最大值是10.5，对应的索引是0，则返回(tensor([10.5]), tensor([0]))
            [1]:取max(1)得到的第一个索引，变成tensor([[0]])
            view(1,1)：将张量的形状重塑为(1,1)的二维张量
            """
        return action


    def learn(self):
        # 如果当前缓冲区的数据不够一个批量，则不操作
        if self.memory_buffer.__len__()<BATCH_SIZE:
            return
        #从经验回放缓冲区中随机抽取一批经验，大小为BATCH_SIZE
        transitions = self.memory_buffer.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        # print(batch)
       actions = tuple((map(lambda a: torch.tensor([[a]], device='cuda'), batch.action)))
        rewards = tuple((map(lambda r: torch.tensor([r], device='cuda'), batch.reward)))
        # 将batch.next_state中不是终止状态（None）的next_state转化为bol的True
        # eg. non_final_mask = torch.tensor([True, False, True, False, True], device='cuda')
        non_final_mask = torch.tensor(
            tuple(map(lambda s: s is not None, batch.next_state)),
            device=device, dtype=torch.uint8).bool()
        # 将不是终止状态的next_state连接成一个张量
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]).to('cuda')
        # print(type(batch.state))
        # 将状态、动作和奖励张量分别拼接成一个大的张量
        state_batch = torch.cat(batch.state).to('cuda')
        action_batch = torch.cat(actions)
        reward_batch = torch.cat(rewards)
        # 从每个状态对应的所有动作价值估计中，提取出智能体实际采取的动作对应的价值估计。
        state_action_values = self.DQN(state_batch).gather(1, action_batch)

        next_state_values = torch.zeros(BATCH_SIZE, device=device)
        # 将非终止状态的下一个状态价值估计赋值给 next_state_values 中对应的位置
        next_state_values[non_final_mask] = self.target_DQN(non_final_next_states).max(1)[0].detach()
        # 当前状态下采取实际动作的预期价值。
        expected_state_action_values = (next_state_values * GAMMA) + reward_batch
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
        # print(loss)
        self.optimizer.zero_grad() # 清空之前计算的梯度
        loss.backward() # 反向传播计算损失函数对网络参数的梯度
        for param in self.DQN.parameters(): # 梯度裁剪，将梯度的值限制在[-1, 1]的范围内。
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step() # 更新参数

In [None]:
class Trainer():
    def __init__(self, env, agent, n_episode):
        self.env = env
        self.n_episode = n_episode
        self.agent = agent
        # self.losslist = []
        self.rewardlist = [] #存储训练过程中每个episode获得的总奖励。

    # 将Gym环境观察到的原始数据obs转换为PyTorch张量，并调整其形状，以便输入到DQN网络中。
    # Pong游戏为例，obs是一个表示游戏画面的NumPy数组，其形状通常为(210, 160, 3)
    def get_state(self,obs):
        state = np.array(obs)
        state = state.transpose((2, 0, 1))
        state = torch.from_numpy(state)
        return state.unsqueeze(0)    # 转化为四维的数据结构

    def train(self):
        # 之所以用900 - n_episode是因为
        for episode in range(0,self.n_episode):
            obs = self.env.reset() #重置Gym环境，开始一个新的episode
            state = self.get_state(obs)
            episode_reward = 0.0 #初始化当前 episode 的总奖励为0
            # print('episode:',episode)
            for t in count():
                # print(state.shape)
                action = self.agent.select_action(state) #得到最优的动作
                if RENDER: #是否显示游戏画面的开关
                    self.env.render()
                # .step(action)方法接受一个动作作为输入，并在环境中执行这个动作，并返回四个值
                obs,reward,done,info = self.env.step(action)
                episode_reward += reward
                #通过上一步得到的done判断是否结束
                if not done:
                    next_state = self.get_state(obs) #将新的观察数据obs转换为下一个状态next_state
                else:
                    next_state = None
                # print(next_state.shape)
                reward = torch.tensor([reward], device=device)# 将从Gym环境中获得的奖励reward转换为PyTorch张量
                # 将四元组存到memory中
                '''
                state: batch_size channel h w    size: batch_size * 4
                action: size: batch_size * 1
                next_state: batch_size channel h w    size: batch_size * 4
                reward: size: batch_size * 1
                '''
                self.agent.memory_buffer.push(state, action.to('cpu'), next_state, reward.to('cpu'))
                state = next_state
                # 经验池满了之后开始学习
                if self.agent.stepdone > INITIAL_MEMORY:
                    self.agent.learn()
                    if self.agent.stepdone % TARGET_UPDATE == 0:
                        # 将DQN网络的参数复制到目标网络中
                        self.agent.target_DQN.load_state_dict(self.agent.DQN.state_dict())
                if done:
                    break
                # print(episode_reward)
                if episode % 50 == 0:
                  # 如果目录不存在，则创建该目录
                  os.makedirs(MODEL_STORE_PATH + '/model', exist_ok=True)
                  torch.save(self.agent.DQN.state_dict(), MODEL_STORE_PATH + '/' + "model/{}_episode.pt".format(modelname))
                  print('Total steps: {} \t Episode: {}/{} \t Total reward: {}'.format(self.agent.stepdone, episode, t, episode_reward))
            # 将当前episode获得的总奖励episode_reward添加到rewardlist列表的末尾
            self.rewardlist.append(episode_reward)
            self.env.close()
        return
    def plot_reward(self):
        plt.plot(self.rewardlist)
        plt.xlabel("episode")
        plt.ylabel("episode_reward")
        plt.title('train_reward')
        plt.show()

In [None]:
if __name__ == '__main__':
    # create environment
    env = gym.make("PongNoFrameskip-v4")
    env = make_env(env)
    action_space = env.action_space #通过环境导入动作空间
    state_channel = env.observation_space.shape[2]
    agent = DQN_agent(in_channels = state_channel, action_space= action_space)
    trainer = Trainer(env, agent, n_episode)
    trainer.train()
    trainer.plot_reward()

In [None]:
'''
!pip install ale-py
!pip install gym[atari,accept-rom-license]
!pip install pyvirtualdisplay
!pip install moviepy
from moviepy.editor import ImageSequenceClip

from google.colab import drive
drive.mount('/content/drive')

# ... 其他导入和超参数设置 ...

# 创建虚拟显示器
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

# ... DQN, DQN_agent, Trainer 类的定义 ...



if __name__ == '__main__':
    # 创建环境
    env = gym.make("PongNoFrameskip-v4")
    env = make_env(env)

    # ... 其他代码 ...

    frames = []  # 存储每一帧的画面

    trainer = Trainer(env, agent, n_episode)
    trainer.train()

    # 在训练过程中记录画面
    for episode in range(900, trainer.n_episode):
        obs = env.reset()
        state = trainer.get_state(obs)

        for t in count():
            action = agent.select_action(state)
            obs, reward, done, info = env.step(action)
            frames.append(env.render('rgb_array'))  # 记录画面

            if done:
                break

        # ... 其他代码 ...

    # 关闭环境和虚拟显示器
    env.close()
    display.stop()

    # 创建视频
    clip = ImageSequenceClip(frames, fps=30)
    clip.write_videofile("game_video.mp4")
这段代码在训练过程中将每一帧的画面添加到frames列表中，
并在训练结束后将这些画面合成为一个视频文件game_video.mp4。
你可以在Colab的文件管理器中找到这个视频文件，并下载到本地观看。
'''