In [None]:
import gym
import cv2

import time
import json
import random
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from collections import deque

In [None]:
ENVIRONMENT = "ALE/Pong-v5"

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

SAVE_MODELS = True  # 保存文件
MODEL_PATH = "./pong-cnn-"  # 文件路径
SAVE_MODEL_INTERVAL = 10  # 每多少次保存一次模型
TRAIN_MODEL = True  # 游戏过程中训练，测试时设为False

LOAD_MODEL_FROM_FILE = True  # 加载模型
LOAD_FILE_EPISODE = 430  # 加载模型的轮数

BATCH_SIZE = 64  # Minibatch size
MAX_EPISODE = 100000  # 最大幕数
MAX_STEP = 100000  # 一幕中最大步数

MAX_MEMORY_LEN = 50000  
MIN_MEMORY_LEN = 40000 

GAMMA = 0.97  # 折扣率
ALPHA = 0.00025  # 学习步长
EPSILON_DECAY = 0.99  # Epsilon衰减率

RENDER_GAME_WINDOW = False  # 渲染窗口

In [None]:
class CNN(nn.Module):
    def __init__(self, h, w, output_size):
        # 网络结构
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=4,  out_channels=32, kernel_size=8, stride=4)
        self.bn1 = nn.BatchNorm2d(32)
        convw, convh = self.conv2d_size_calc(w, h, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
        self.bn2 = nn.BatchNorm2d(64)
        convw, convh = self.conv2d_size_calc(convw, convh, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
        self.bn3 = nn.BatchNorm2d(64)
        convw, convh = self.conv2d_size_calc(convw, convh, kernel_size=3, stride=1)

        linear_input_size = convw * convh * 64

        # 行为
        self.Alinear1 = nn.Linear(in_features=linear_input_size, out_features=128)
        self.Alrelu = nn.LeakyReLU()
        self.Alinear2 = nn.Linear(in_features=128, out_features=output_size)

        # 状态
        self.Vlinear1 = nn.Linear(in_features=linear_input_size, out_features=128)
        self.Vlrelu = nn.LeakyReLU() 
        self.Vlinear2 = nn.Linear(in_features=128, out_features=1) 

    def conv2d_size_calc(self, w, h, kernel_size=5, stride=2):
        # 计算卷积层输出图像大小
        next_w = (w - (kernel_size - 1) - 1) // stride + 1
        next_h = (h - (kernel_size - 1) - 1) // stride + 1
        return next_w, next_h

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))

        x = x.view(x.size(0), -1)  # 展平

        Ax = self.Alrelu(self.Alinear1(x))
        Ax = self.Alinear2(Ax)  # 没有激活

        Vx = self.Vlrelu(self.Vlinear1(x))
        Vx = self.Vlinear2(Vx)  # 没有激活

        q = Vx + (Ax - Ax.mean())

        return q

In [None]:
class Agent:
    def __init__(self, environment):
        # 智能体超参数
        # 环境图像大小，作为状态
        self.state_size_h = environment.observation_space.shape[0]
        self.state_size_w = environment.observation_space.shape[1]
        self.state_size_c = environment.observation_space.shape[2]

        # 动作大小
        self.action_size = environment.action_space.n

        # 与处理后的参数
        self.target_h = 80
        self.target_w = 64

        self.crop_dim = [20, self.state_size_h, 0, self.state_size_w]

        
        self.gamma = GAMMA  # 折扣率
        self.alpha = ALPHA  # 学习率

        self.epsilon = 1  # Epsilon的初始值
        self.epsilon_decay = EPSILON_DECAY  # 衰减率
        self.epsilon_minimum = 0.05  # Epsilon的最小值

        # 用于经验回放
        self.memory = deque(maxlen=MAX_MEMORY_LEN)

        # 定义模型
        self.online_model = CNN(h=self.target_h, w=self.target_w, output_size=self.action_size).to(DEVICE)
        self.target_model = CNN(h=self.target_h, w=self.target_w, output_size=self.action_size).to(DEVICE)
        self.target_model.load_state_dict(self.online_model.state_dict())
        self.target_model.eval()

        # 设置优化器
        self.optimizer = optim.Adam(self.online_model.parameters(), lr=self.alpha)

    def preProcess(self, image):
        # 图像预处理
        frame = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # 灰度图
        frame = frame[self.crop_dim[0]:self.crop_dim[1], self.crop_dim[2]:self.crop_dim[3]]
        frame = cv2.resize(frame, (self.target_w, self.target_h)) 
        frame = frame.reshape(self.target_w, self.target_h) / 255  # 归一化

        return frame

    def act(self, state):
        # 选择动作
        act_protocol = 'Explore' if random.uniform(0, 1) <= self.epsilon else 'Exploit'

        if act_protocol == 'Explore':   # 随机选择动作
            action = random.randrange(self.action_size)
        else:
            with torch.no_grad():   # 按照神经网络选择动作
                state = torch.tensor(state, dtype=torch.float, device=DEVICE).unsqueeze(0)
                q_values = self.online_model.forward(state) 
                action = torch.argmax(q_values).item() 

        return action

    def train(self):
        # 训练神经网络
        if len(agent.memory) < MIN_MEMORY_LEN:
            loss, max_q = [0, 0]
            return loss, max_q
        
        state, action, reward, next_state, done = zip(*random.sample(self.memory, BATCH_SIZE))

        state = np.concatenate(state)
        next_state = np.concatenate(next_state)

        # 转换为张量
        state = torch.tensor(state, dtype=torch.float, device=DEVICE)
        next_state = torch.tensor(next_state, dtype=torch.float, device=DEVICE)
        action = torch.tensor(action, dtype=torch.long, device=DEVICE)
        reward = torch.tensor(reward, dtype=torch.float, device=DEVICE)
        done = torch.tensor(done, dtype=torch.float, device=DEVICE)

        # 预测
        state_q_values = self.online_model(state)
        next_states_q_values = self.online_model(next_state)
        next_states_target_q_values = self.target_model(next_state)

        # 获取动作值函数
        selected_q_value = state_q_values.gather(1, action.unsqueeze(1)).squeeze(1)

        next_states_target_q_value = next_states_target_q_values.gather(1, next_states_q_values.max(1)[1].unsqueeze(1)).squeeze(1)
        expected_q_value = reward + self.gamma * next_states_target_q_value * (1 - done)    # 贝尔曼方程

        # 计算损失
        loss = (selected_q_value - expected_q_value.detach()).pow(2).mean()

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss, torch.max(state_q_values).item()

    def storeResults(self, state, action, reward, nextState, done):
        # 存储经验
        self.memory.append([state[None, :], action, reward, nextState[None, :], done])

    def adaptiveEpsilon(self):
        # Epsilon衰减
        if self.epsilon > self.epsilon_minimum:
            self.epsilon *= self.epsilon_decay

In [None]:
# 设置环境
environment = gym.make(ENVIRONMENT, render_mode='rgb_array')
agent = Agent(environment)

In [None]:
if LOAD_MODEL_FROM_FILE:
    agent.online_model.load_state_dict(torch.load(MODEL_PATH+str(LOAD_FILE_EPISODE)+".pkl"))

    with open(MODEL_PATH+str(LOAD_FILE_EPISODE)+'.json') as outfile:
        param = json.load(outfile)
        agent.epsilon = param.get('epsilon')

    startEpisode = LOAD_FILE_EPISODE + 1

else:
    startEpisode = 1

In [None]:
last_100_ep_reward = deque(maxlen=100)  # 近100轮的奖励
total_step = 1  # 累计步数
for episode in range(startEpisode, MAX_EPISODE):

    startTime = time.time()
    state = environment.reset()[0]  # 重置环境

    state = agent.preProcess(state)  # 图像预处理
    state = np.stack((state, state, state, state))  # 堆叠连续的4帧图像

    total_max_q_val = 0
    total_reward = 0
    total_loss = 0
    for step in range(MAX_STEP):

        if RENDER_GAME_WINDOW:
            environment.render()  # 渲染图像

        # 选择动作
        action = agent.act(state)
        next_state, reward, done, truncated, info = environment.step(action)  # 下一状态

        next_state = agent.preProcess(next_state)  # 预处理
        next_state = np.stack((next_state, state[0], state[1], state[2]))   # 连续4帧

        # 存储经验
        agent.storeResults(state, action, reward, next_state, done)
      
        state = next_state

        if TRAIN_MODEL:
            loss, max_q_val = agent.train() # 训练模型
        else:
            loss, max_q_val = [0, 0]

        total_loss += loss
        total_max_q_val += max_q_val
        total_reward += reward
        total_step += 1
        if total_step % 1000 == 0:
            agent.adaptiveEpsilon()

        if done:
            currentTime = time.time()
            time_passed = currentTime - startTime 
            current_time_format = time.strftime("%H:%M:%S", time.gmtime())
            epsilonDict = {'epsilon': agent.epsilon} 

            if SAVE_MODELS and episode % SAVE_MODEL_INTERVAL == 0:  # 保存模型
                weightsPath = MODEL_PATH + str(episode) + '.pkl'
                epsilonPath = MODEL_PATH + str(episode) + '.json'

                torch.save(agent.online_model.state_dict(), weightsPath)
                with open(epsilonPath, 'w') as outfile:
                    json.dump(epsilonDict, outfile)

            if TRAIN_MODEL:
                agent.target_model.load_state_dict(agent.online_model.state_dict()) 

            last_100_ep_reward.append(total_reward)
            avg_max_q_val = total_max_q_val / step

            outStr = "Episode:{} Time:{} Reward:{:.2f} Loss:{:.2f} Last_100_Avg_Rew:{:.3f} Avg_Max_Q:{:.3f} Epsilon:{:.2f} Duration:{:.2f} Step:{} CStep:{}".format(
                episode, current_time_format, total_reward, total_loss, np.mean(last_100_ep_reward), avg_max_q_val, agent.epsilon, time_passed, step, total_step
            )

            print(outStr)

            if SAVE_MODELS:
                outputPath = MODEL_PATH + "out" + '.txt'
                with open(outputPath, 'a') as outfile:
                    outfile.write(outStr+"\n")

            break
