# 第七课 冰壶比赛中PPO算法的实现

## 7.1 PPO算法简介

在第六课介绍了DQN及其相关的一系列算法，这些算法的特点是利用深度强化学习模型来拟合状态-动作价值函数，默认使用的策略一般是ε-贪心策略，通过逐渐更新状态-动作函数的估计来完成对应的策略迭代和价值迭代的过程。

本课会介绍一个更加直接的思路来进行强化学习，也就是所谓的策略梯度（Policy Gradient）。这类算法会使用一个深度学习模型来根据当前智能体所处的环境状态直接生成对应的决策，策略梯度的存在让策略可以输出连续的动作，就解决了DQN算法只能适用离散空间的问题。同时策略网络因为可以直接输出对应的动作，在实际预测过程中就不再需要状态-动作价值函数了。

### 7.1.1 原始策略梯度（Vanilla Policy Gradient，VPG）算法

在强化学习的过程中，决策的每一步都能获得一定的奖励。如果有这样一个算法，每当奖励为正时，智能体就让对应概率分布的概率变大；反之，则令对应的概率变小。这样就需要搭建一个预测策略的概率分布$π(a|s)$的深度学习模型并设计一个损失函数，这个损失函数同时考虑了每一步获取的奖励和对应的策略概率分布，通过极小化损失函数，来训练一个能够把奖励最大化的策略网络。

因为优化损失函数需要使用损失函数对于策略网络参数的梯度，因此这类算法被称为策略梯度算法。VPG算法就是使用一个策略网络来拟合具体的策略概率分布$π(a|s)$，使得在这个概率分布下，获取的奖励尽可能多。

在策略梯度中，智能体能够控制的是每一步的动作，优化的也是策略网络的参数。假设策略梯度网络的参数为θ，则策略优化的目标函数如式(7-1)所示。

$$J(θ)=\Bbb E_{a\simπ(a|s;θ)} [R(a)] \tag{7-1}$$

其中，$J(θ)$代表的是在$π(a|s;θ)$策略下，采样得到对应的动作，然后根据对应的动作获取奖励。为了能够对这个函数进行策略梯度优化，需要对这个函数求梯度。梯度求取结果如式(7-2)所示，因为该式涉及策略对数概率分布的梯度，所以对应的算法称为策略梯度算法。

$$\nabla_θ J(θ) = \Bbb E_{a\simπ(a|s;θ)}[\nabla_θ \log π(a|s;θ)R(a)] \tag{7-2}$$

在实际应用中，需要做的是构建具体的策略网络的深度学习模型，用深度学习模型做出决策，记录对应决策获得的奖励，然后根据式(7-3)计算对应的策略梯度的损失函数，反向传播得到对应的策略网络的梯度，用随机梯度优化的方法优化对应的策略网络即可。

$$L = -\frac{1}{BT}\sum_{b=1}^B \sum_{t=1}^T \log π(a_t|s_t;θ)R_b(a_t) \tag{7-3}$$

### 7.1.2 演员-评价者（Actor-Critic，AC）算法

VPG算法主要的问题来源在于策略梯度算法对于基准（对回报函数期望的估计）的估计往往存在很大的偏置。为了解决这个问题，Actor-Critic算法引入了第二个深度学习模型，用这个深度学习模型来估计基准函数，以提高策略梯度算法的效率。

在Actor-Critic算法的策略梯度网络中，需要同时考虑当前的状态$s_t$和当前智能体的动作$a_t$。这里的回报函数和当前状态与智能体的动作同时相关，而且还有了一个新添加的基准项，这个基准项和当前的动作$a_t$无关，仅和当前智能体所处的强化学习环境的状态$s_t$有关。

也就是说，我们可以在策略梯度中引入一个和策略网络参数无关的基准函数$B(s_t)$，当回报函数$G(a,s_t)$减去这个基准函数的时候，整个策略网络的策略梯度不会发生任何改变。如式(7-4)所示。

$$\nabla_θ J(θ) = \Bbb E_{a\simπ(a|s;θ)}[\nabla_θ \log π(a|s;θ)(G(a,s_t)-B(s_t))] \tag{7-4}$$

其中，基准函数在这里应该设定为回报函数的期望，当回报函数大于基准函数的时候，对应的策略选择的动作就是比较好的动作，策略梯度应该增加这个动作的概率。反之，策略梯度会减少这个动作的概率。

为了实现式(7-4)中策略梯度的计算，需要有两个网络，一个网络即策略网络，这个网络与VPG算法中使用的策略网络类似，对应的损失函数如式(7-5)所示；第二个网络即价值网络，这个网络使用了另外一个网络来拟合对应的价值函数。假设对应的基准函数表示为$B(s_i;\phi)$，其中$\phi$为基准函数的参数，则对应的损失函数如式(7-6)所示。

$$L_{actor} = -\frac{1}{BT}\sum_{b=1}^B \sum_{t=1}^T \log π(a_t|s_t;θ)(G(a,s_t)-B(s_t)) \tag{7-5}$$
$$L_{critic} = -\frac{1}{BT}\sum_{b=1}^B \sum_{t=1}^T (G(a,s_t)-B(s_t;\phi)) \tag{7-6}$$

可以看到，第二个网络使用了MSE损失函数，这个函数的目标是为了让基准函数$B(s_t;\phi)$的值等于回报函数r(a_t,s_t)的期望。第一个网络是策略网络，训练的函数 π(a_t|s_t;θ)负责“表演”，称为“演员”；第二个网络是价值网络，训练的函数$B(s_t;\phi)$负责衡量策略函数给出的动作的好坏，称为“评论家”。这也是算法中“演员-评论家”名字的来源。

在这两个策略梯度损失函数中，如何估计回报函数$G(a_t,s_t)$是非常重要的，因为回报函数的值的准确计算需要考虑到未来获取的折扣奖励，而未来折扣奖励的数据是很长甚至是无穷多的，无法准确计算，所以需要考虑使用深度学习模型来拟合未来的折扣奖励。

这和在DQN中碰到的问题类似，只是DQN参与估计的是Q函数，而这里参数估计需要用到的是基准函数。DQN算法中可以采用单步估计或多部估计来估计Q函数，类似的，在优势演员-评论家（Advantage Actor-Critic, A2C）算法中，也使用了多步估计的方法来计算回报函数，从而计算对应的优势函数。

假设使用策略梯度做了一个n步的决策，动作分别是$(a_1, a_2, …, a_n)$，对应获取的奖励分别为$(r_1, r_2, …, r_n)$，假设折扣因子为γ，那么可以得到对应的回报函数的估计，如式(7-7)所示。

$$G(a_t,s_t) = \sum_{i=t}^n γ^{i-t}r_i + γ^nB(s_{n+1})\tag{7-7}$$

结合式(7-5)和式(7-7)可知，优势函数的估计$G(a,s_t)-B(s_t)$和决策的采样长度n有关。当长度n比较长时，回报函数$G(a_t,s_t)$的估计比较准确，但是对应优势函数的波动也比较大，这种情况称为低偏置、高方差。如果减少决策采样长度n，对应的回报函数$G(a_t,s_t)$的估计就会不准，但梯度波动也比较小，这种情况称为高偏置、低方差。实际应用时需要根据具体情况斟酌选取。

### 7.1.3 近端策略优化（Proximal Policy Optimization，PPO）算法

虽然AC算法的稳定性和收敛性比VPG算法有了很大提高，但相对来说策略梯度的波动，以及奖励在算法运行过程中的波动还是很大。另外，还有个问题就是策略地图对采样数据的利用效率不太高，需要考虑如何利用一定的采样数据尽量增加优化的步数，让对应的策略尽可能获得更多的奖励。

为了减少策略梯度的波动，同时尽量提高优势函数比较大的动作的概率，我们可以考虑在初始策略附近的一个置信区间内对策略进行优化，也就是初始的策略和优化过程中的策略尽量不要偏离过大，让这个策略对应的动作能够尽量增加正的优势函数产生的概率，减少负的优势函数产生的概率。这就是基于置信区间的策略梯度算法，包括置信区间策略优化（Trust Region Policy Optimization, TRPO）算法和近端策略优化（Proximal Policy Optimization，PPO）算法的基本思想。

PPO与TRPO旨在解决相同的问题：在策略梯度定理的步长的选取中，如何选取合适的步长，使得更新的参数尽可能对应最好的策略，但也不至于走得太远，以至于导致性能崩溃。TRPO和PPO的核心思想都是引入重要性采样，提高样本效率；同时，通过某种方式来约束新旧策略间的差异不要太大。

#### 广义优势估计（Generalized Advantage Estimation，GAE）

TRPO和PPO中都是采用GAE的方法对优势函数进行估计，具体而言，其计算公式为

$$\hat{A_t}^{GAE(γ,λ)}=\sum_{l=0}^{\infty}(γλ)^lδ_{t+l}^V=\sum_{l=0}^{\infty}(γλ)^l[r_{t+l}+γV(s_{t+l+1})-V(s_{t+l})]\tag{7-8}$$

其估计advantage的方法与$TD(λ)$类似，从公式上可以看出GAE中分别考虑了状态$s_t$后续各个时刻的优势值，然后按照距离当前状态的远近加权求和，从而起到了平滑作用。为了便于理解，考虑两种极限情况：

1. $λ=0$时，$\hat{A_t}=δ_t=r_t+γV(s_{t+1})-V(s_{t})$，优势值便是使用$TD(0)$估计的Q值与V值的差；
2. $λ=1$时，$\hat{A_t}=\sum_{l=0}^{\infty}(γ)^lδ_{t+l}=\sum_{l=0}^{\infty}γ^lr_{t+l}-V(s_{t+l})$，优势值则是使用蒙特卡洛方法估计的收益$G_t$值与V值的差。

可以看出λ作为GAE算法的调整因子，它越接近1时，方差越大，偏差越小，接近0时反之。这也是GAE的一个优势，即可以更加不同环境的情况让我们可以调整参数来找到更合理的advantage。

实际求取GAE的时候，不需要在整个流程中进行平均，只需选取关联性较大的N步即可。
 
#### PPO惩罚

TRPO在目标函数中，另外增加了一个约束条件。在推导该式的过程中， 涉及到了一个将KL散度作为惩罚项的极值问题，转化为KL散度作为约束条件的优化问题的过程，将KL散度作为惩罚项的问题。公式如下：

$$\max\limits_θ  \hat{\Bbb E}_t \big[\frac{π_θ(a_t|s_t)}{π_{θ_{old}}(a_t|s_t)}\hat A_t-βKL[π_{θ_{old}}(\cdot|s_t),π_θ(\cdot|s_t)]\big] \tag{7-9}$$

PPO1算法用拉格朗日乘数法直接将KL散度的限制放入了目标函数，将有约束的优化问题转为无约束的优化问题，在迭代的过程中不断更新KL散度前的系数，使用几个阶段的小批量SGD，优化KL惩罚目标，其更新方式即为式(7-9)。

为了对β进行动态调整，PPO1算法还提出了自适应KL散度（Adaptive KL Divergence）的思想。具体做法是，在每个epoch对KL惩罚目标进行优化后，计算$d=\hat{\Bbb E}_t \big[KL[π_{θ_{old}}(\cdot|s_t),π_θ(\cdot|s_t)]\big]$
：

- 如果$d<\frac{δ}{1.5}$，则$β\leftarrow \frac{β}{2}$；
- 如果$d>1.5δ$，则$β\leftarrow 2β$；
- 否则，$β$保持不变。

在这里，更新的$β$用于下一次迭代时的参数更新。

#### PPO截断

PPO2在限制新的策略参数与旧的策略参数的距离上，相比于PPO1更加直接。区别于PPO1使用KL散度的方式进行限制，PPO2直接在目标函数上进行限制，如式(7-10)所示。

$$L^{CLIP}(θ)=\hat{\Bbb E}_t[\min (r_t(θ)\hat{A}_t, clip(r_t(θ),1-ε,1+ε)\hat{A}_t] \tag{7-10}$$

其中，$r_t(θ)]=\frac{π_θ(a_t,s_t)}{π_{θ_old}(a_t,s-t)}$称为概率比，易得$r_t(θ_{old})=1$。$clip(r_t(θ),1-ε,1+ε)$指的是将$r_t(θ)$限制在$[1-ε,1+ε]$的范围内。ε为超参数，表示进行截断操作的范围，一般取ε=0.2。这样，就始终保证了新旧策略的比值在[0.8, 1.2]的范围内，保证了两个策略的差距不会太大。

PPO2中，较为精妙的一点是在clip操作后乘了$\hat{A}_t$（以下用A表示），而优势函数是有正负的。如下面两张图所示， $\frac{π_θ(a_t,s_t)}{π_{θ_old}(a_t,s-t)}$ 是绿色的线；clip(r_t(θ),1-ε,1+ε) 是蓝色的线；在绿色的线与蓝色的线中间，我们要取一个最小的结果。

如左图所示，假设前面乘上的项A>0，取最小的结果，就是红色的这条线。如右图所示，如果A<0，取最小结果的以后，就得到红色的这条线。

<center><img src="img/PPO_clip.png" width=600></center>

虽然Clip操作已经很大程度上限制了新策略与旧策略之间的差距，但最终新旧策略依然有可能相差太远。不同的PPO算法采用了不同的技巧来避免这种情况。比如说，提前挺值得策略：如果新策略和旧策略的平均KL散度超过了某个阈值，则停止采取更新参数的步骤。

### 7.1.4 PPO算法伪代码

<img src="img/PPO_code.png">

其中，第3-5行实现了采样数据、计算优势函数（GAE的方式）和策略初始化。第6-9行采用公式（7-9）实现了Actor网络（策略网络）更新，参数M的含义为策略网络的参数个数。第10-13行实现了Critic网络（价值函数网络）更新，通过Critic网络的预测值为$V_\phi(s_t)$，Label为
$\sum_{t'>t}γ^{t'-t}r_{t'}$。第14-17行实现了对权重进行调整。

## 7.2 PPO算法在数字冰壶中的实现

导入算法实现所需要的模块

In [1]:
# -*- coding: utf-8 -*-
import math
import numpy as np
import torch
from collections import deque
import torch.nn as nn
from torch.autograd import Variable

### 7.2.1 网络搭建

#### 7.2.1.1 网络输入设置

根据场上的冰壶与营垒圆心的距离又近至远进行排序，每个冰壶包含五个信息：x坐标、y坐标、离营垒圆心的距离、投掷顺序、是否为有效得分壶，共80个特征作为网络输入。

In [2]:
#获取某一冰壶距离营垒圆心的距离
def get_dist(x, y):
    House_x = 2.375
    House_y = 4.88
    return math.sqrt((x-House_x)**2+(y-House_y)**2)

#根据冰壶比赛服务器发送来的场上冰壶位置坐标列表获取得分情况并生成信息状态数组
def get_infostate(position):
    House_R = 1.830
    Stone_R = 0.145

    init = np.empty([8], dtype=float)
    gote = np.empty([8], dtype=float)
    both = np.empty([16], dtype=float)
    #计算双方冰壶到营垒圆心的距离
    for i in range(8):
        init[i] = get_dist(position[4 * i], position[4 * i + 1])
        both[2*i] = init[i] 
        gote[i] = get_dist(position[4 * i + 2], position[4 * i + 3])
        both[2*i+1] = gote[i]
    #找到距离圆心较远一方距离圆心最近的壶
    if min(init) <= min(gote):
        win = 0                     #先手得分
        d_std = min(gote)
    else:
        win = 1                     #后手得分
        d_std = min(init)
    
    infostate = []  #状态数组
    init_score = 0  #先手得分
    #16个冰壶依次处理
    for i in range(16):
        x = position[2 * i]         #x坐标
        y = position[2 * i + 1]     #y坐标
        dist = both[i]              #到营垒圆心的距离
        sn = i % 2 + 1              #投掷顺序
        if (dist < d_std) and (dist < (House_R+Stone_R)) and ((i%2) == win):
            valid = 1               #是有效得分壶
            #如果是先手得分
            if win == 0:
                init_score = init_score + 1
            #如果是后手得分
            else:
                init_score = init_score - 1
        else:
            valid = 0               #不是有效得分壶
        #仅添加有效壶
        if x!=0 or y!=0:
            infostate.append([x, y, dist, sn, valid])
    #按dist升序排列
    infostate = sorted(infostate, key=lambda x:x[2])
    
    #无效壶补0
    for i in range(16-len(infostate)):
        infostate.append([0,0,0,0,0])

    #返回先手得分和转为一维的状态数组
    return init_score, np.array(infostate).flatten()

#### 7.2.1.2 网络输出设置

如下所示范例代码继承torch.nn.Module类，搭建了两个相同结构的四层神经网络——Actor网络和Critic网络。网络每一层都是线性层（全连接层），实现将80维的输入张量映射为256维张量再经tanh函数激活，继而映射为64维张量再经tanh函数激活，继而映射为10维张量再经tanh函数激活，最终映射为3维的输出张量。

其中Critic的网络输出就是神经网络输出层的数据，是当前状态的估计价值。Actor的网络输出则是以神经网络输出层的数据为均值，以std为标准差，经由torch.normal()函数实现正态分布近似后得到最终投掷动作。这里的标准差std越小越接近于均值，算法越倾向于利用，反之则越倾向于探索。范例代码中将std设置为固定值1，在实际训练中建议对其进行动态设置，来达到前期重视探索，后期重视利用的目的。

在训练开始时，Actor网络各层的权重均为随机数值，因此网络的输出也是完全随机的，有很大概率根本就不在投掷动作参数的取值范围内，因此需要对网络的输出按照相应参数的取值范围截断后再作为动作参数使用。可以预见，在训练前期，大部分的动作参数都落在了截断区间的上下限，因此PPO算法和DQN算法相比，需要更多个训练周期后才能输出合理的动作参数。

In [None]:
#创建Actor网络类继承自nn.Module
class Actor(nn.Module):
    def __init__(self):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(80, 128)       # 定义全连接层1
        self.fc2 = nn.Linear(128, 64)       # 定义全连接层2
        self.fc3 = nn.Linear(64, 10)        # 定义全连接层3
        self.out = nn.Linear(10, 3)         # 定义输出层
        self.out.weight.data.mul_(0.1)      # 初始化输出层权重

    def forward(self, x):
        x = self.fc1(x)                     # 输入张量经全连接层1传递
        x = torch.tanh(x)                   # 经tanh函数激活
        x = self.fc2(x)                     # 经全连接层2传递
        x = torch.relu(x)                   # 经tanh函数激活
        x = self.fc3(x)                     # 经全连接层3传递
        x = torch.tanh(x)                   # 经tanh函数激活

        mu = self.out(x)                    # 经输出层传递得到输出张量
        logstd = torch.zeros_like(mu)       # 生成shape和mu相同的全0张量
        std = torch.exp(logstd)             # 生成shape和mu相同的全1张量
        return mu, std, logstd

    def choose_action(action):
    # 映射 action[0] 到 [2.6, 5.5] 区间
        action[0] = scipy.stats.truncnorm(-2.6, 5.5, loc=, scale=0.45).rvs() + 2.6

    # 映射 action[1] 到 [-1, 1] 区间
        action[1] = scipy.stats.truncnorm(-4, 4, loc=0, scale=0.5).rvs()
        action[2] = np.clip(action[2], -3.14, 3.14)
        return action
    
    


#创建Critic网络类继承自nn.Module
class Critic(nn.Module):
    def __init__(self):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(80, 128)       # 定义全连接层1
        self.fc2 = nn.Linear(128, 64)       # 定义全连接层2
        self.fc3 = nn.Linear(64, 10)        # 定义全连接层3
        self.out = nn.Linear(10, 1)         # 定义输出层
        self.out.weight.data.mul_(0.1)      # 初始化输出层权重

    def forward(self, x):
        x = self.fc1(x)                     # 输入张量经全连接层1传递
        x = torch.tanh(x)                   # 经tanh函数激活
        x = self.fc2(x)                     # 经全连接层2传递
        x = torch.sigmoid(x)                   # 经tanh函数激活
        x = self.fc3(x)                     # 经全连接层3传递
        x = torch.tanh(x)                   # 经tanh函数激活
        return self.out(x)                  # 经输出层传递得到输出张量

### 7.2.2 PPO模型搭建

模型训练超参数设置

In [None]:
BATCH_SIZE = 32                             # 批次尺寸
GAMMA = 0.92                                 # 奖励折扣因子
LAMDA = 0.9                                 # GAE算法的调整因子
EPSILON = 0.12                               # 截断调整因子

#生成动态学习率
def LearningRate(x):
    lr_start = 0.0001                       # 起始学习率
    lr_end = 0.0005                         # 终止学习率
    lr_decay = 20000                        # 学习率衰减因子
    return lr_end + (lr_start - lr_end) * math.exp(-1. * x / lr_decay)

模型搭建

In [None]:
# 输出连续动作的概率分布
def log_density(x, mu, std, logstd):
    var = std.pow(2)
    log_density = -(x - mu).pow(2) / (2 * var) - 0.5 * math.log(2 * math.pi) - logstd
    return log_density.sum(1, keepdim=True)

# 使用GAE方法计算优势函数
def get_gae(rewards, masks, values):
    rewards = torch.Tensor(rewards)
    masks = torch.Tensor(masks)
    returns = torch.zeros_like(rewards)
    advants = torch.zeros_like(rewards)
    running_returns = 0
    previous_value = 0
    running_advants = 0

    for t in reversed(range(0, len(rewards))):
        running_returns = rewards[t] + GAMMA * running_returns * masks[t]
        running_tderror = rewards[t] + GAMMA * previous_value * masks[t] - values.data[t]
        running_advants = running_tderror + GAMMA * LAMDA * running_advants * masks[t]

        returns[t] = running_returns
        previous_value = values.data[t]
        advants[t] = running_advants
    advants = (advants - advants.mean()) / advants.std()
    return returns, advants

# 替代损失函数
def surrogate_loss(actor, advants, states, old_policy, actions, index):
    mu, std, logstd = actor(torch.Tensor(states))
    new_policy = log_density(actions, mu, std, logstd)
    old_policy = old_policy[index]
    ratio = torch.exp(new_policy - old_policy)
    surrogate = ratio * advants
    return surrogate, ratio

# 训练模型
def train_model(actor, critic, memory, actor_optim, critic_optim):
    memory = np.array(memory, dtype=object)
    states = np.vstack(memory[:, 0])
    actions = list(memory[:, 1])
    rewards = list(memory[:, 2])
    masks = list(memory[:, 3])
    values = critic(torch.Tensor(states))
    loss_list = []
    # step 1: get returns and GAEs and log probability of old policy
    returns, advants = get_gae(rewards, masks, values)
    mu, std, logstd = actor(torch.Tensor(states))
    old_policy = log_density(torch.Tensor(np.array(actions)), mu, std, logstd)
    old_values = critic(torch.Tensor(states))
    criterion = torch.nn.MSELoss()
    n = len(states)
    arr = np.arange(n)
    # step 2: get value loss and actor loss and update actor & critic
    for epoch in range(10):
        np.random.shuffle(arr)
        for i in range(n // BATCH_SIZE):
            batch_index = arr[BATCH_SIZE * i: BATCH_SIZE * (i + 1)]
            batch_index = torch.LongTensor(batch_index)
            inputs = torch.Tensor(states)[batch_index]
            returns_samples = returns.unsqueeze(1)[batch_index]
            advants_samples = advants.unsqueeze(1)[batch_index]
            actions_samples = torch.Tensor(np.array(actions))[batch_index]
            oldvalue_samples = old_values[batch_index].detach()
            loss, ratio = surrogate_loss(actor, advants_samples, inputs,
                                         old_policy.detach(), actions_samples,
                                         batch_index)
            values = critic(inputs)
            clipped_values = oldvalue_samples + torch.clamp(values - oldvalue_samples, -EPSILON, EPSILON)
            critic_loss1 = criterion(clipped_values, returns_samples)
            critic_loss2 = criterion(values, returns_samples)
            critic_loss = torch.max(critic_loss1, critic_loss2).mean()

            clipped_ratio = torch.clamp(ratio, 1.0 - EPSILON, 1.0 + EPSILON)
            clipped_loss = clipped_ratio * advants_samples
            actor_loss = -torch.min(loss, clipped_loss).mean()

            loss = actor_loss + critic_loss

            loss_list.append(loss)
            critic_optim.zero_grad()
            critic_loss.backward(retain_graph=True)
            critic_optim.step()

            actor_optim.zero_grad()
            actor_loss.backward()
            actor_optim.step()

    return 0, sum(loss_list)/10

### 7.2.3 PPO模型训练/部署

训练代码，其中根据传回的信息自动判断是先手还是后手，对于奖励的设置目前仅对最后一壶根据比赛得分进行设置，其余的奖励均为0，后续可根据规则、经验等对奖励进行自行设计

In [None]:
import time, os
from AIRobot import AIRobot

class PPORobot(AIRobot):
    def __init__(self, key, name, round_max):
        super().__init__(key, name, show_msg=False)

        #初始化并加载先手actor模型
        self.init_actor = Actor()
        self.init_actor_file = 'model/PPO_init_actor.pth'
        if os.path.exists(self.init_actor_file):
            print("加载模型文件 %s" % (self.init_actor_file))
            self.init_actor.load_state_dict(torch.load(self.init_actor_file))

        #初始化并加载先手critic模型
        self.init_critic = Critic()
        self.init_critic_file = 'model/PPO_init_critic.pth'
        if os.path.exists(self.init_critic_file):
            print("加载模型文件 %s" % (self.init_critic_file))
            self.init_critic.load_state_dict(torch.load(self.init_critic_file))

        #初始化并加载后手actor模型
        self.dote_actor = Actor()
        self.dote_actor_file = 'model/PPO_dote_actor.pth'
        if os.path.exists(self.dote_actor_file):
            print("加载模型文件 %s" % (self.dote_actor_file))
            self.dote_actor.load_state_dict(torch.load(self.dote_actor_file))
  
        #初始化并加载后手critic模型
        self.dote_critic = Critic()        
        self.dote_critic_file = 'model/PPO_dote_critic.pth'
        if os.path.exists(self.dote_critic_file):
            print("加载模型文件 %s" % (self.dote_critic_file))
            self.dote_critic.load_state_dict(torch.load(self.dote_critic_file))
          
        self.memory = deque()               # 清空经验数据
        self.round_max = round_max          # 最多训练局数
        self.log_file_name = 'log/PPO_log/traindata_' + time.strftime("%y%m%d_%H%M%S") + '.log' # 日志文件     

    #根据当前比分获取奖励分数
    def get_reward(self, this_score):
        House_R = 1.830
        Stone_R = 0.145
        # 以投壶后得分减去投壶前得分为奖励分
        reward = this_score - self.last_score
        if (reward == 0):
            x = self.position[2*self.shot_num]
            y = self.position[2*self.shot_num+1]
            dist = self.get_dist(x, y)
            #对于大本营内的壶按照距离大本营圆心远近给奖励分
            if dist < (House_R+Stone_R):
                reward = 1 - dist / (House_R+Stone_R)
        return reward
        
    #处理投掷状态消息
    def recv_setstate(self, msg_list):
        #当前完成投掷数
        self.shot_num = int(msg_list[0])
        #总对局数
        self.round_total = int(msg_list[2])

        #达到最大局数则退出训练
        if self.round_num == self.round_max:
            self.on_line = False
            return
        
        #每一局开始时将历史比分清零
        if (self.shot_num == 0):
            self.last_score = 0
        this_score = 0
            
        #根据先后手选取模型并设定当前选手第一壶是当局比赛的第几壶
        if self.player_is_init:
            first_shot = 0
            self.actor = self.init_actor
            self.critic = self.init_critic
        else:
            first_shot = 1
            self.actor = self.dote_actor
            self.critic = self.dote_critic
            
        #当前选手第1壶投出前
        if self.shot_num == first_shot:
            init_score, self.s1 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s1)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第1壶投出后
        if self.shot_num == first_shot+1:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s1, self.action, reward, 1])   # 保存经验数据
        #当前选手第2壶投出前
        if self.shot_num == first_shot+2:
            init_score, self.s2 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s2)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第2壶投出后
        if self.shot_num == first_shot+3:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s2, self.action, reward, 1])   # 保存经验数据
        #当前选手第3壶投出前
        if self.shot_num == first_shot+4:
            init_score, self.s3 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s3)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第3壶投出后
        if self.shot_num == first_shot+5:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s3, self.action, reward, 1])   # 保存经验数据
        #当前选手第4壶投出前
        if self.shot_num == first_shot+6:
            init_score, self.s4 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s4)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第4壶投出后
        if self.shot_num == first_shot+7:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s4, self.action, reward, 1])   # 保存经验数据
        #当前选手第5壶投出前
        if self.shot_num == first_shot+8:
            init_score, self.s5 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s5)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第5壶投出后
        if self.shot_num == first_shot+9:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s5, self.action, reward, 1])   # 保存经验数据
        #当前选手第6壶投出前
        if self.shot_num == first_shot+10:
            init_score, self.s6 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s6)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第6壶投出后
        if self.shot_num == first_shot+11:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s6, self.action, reward, 1])   # 保存经验数据
        #当前选手第7壶投出前
        if self.shot_num == first_shot+12:
            init_score, self.s7 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s7)         # 根据状态获取对应的动作参数列表
            self.last_score = (1-2*first_shot)*init_score           # 先手为正/后手为负
        #当前选手第7壶投出后
        if self.shot_num == first_shot+13:
            init_score, _ = get_infostate(self.position)            # 获取当前得分和状态描述
            this_score = (1-2*first_shot)*init_score                # 先手为正/后手为负
            reward = self.get_reward(this_score)                    # 获取动作奖励
            self.memory.append([self.s7, self.action, reward, 1])   # 保存经验数据
        #当前选手第8壶投出前
        if self.shot_num == first_shot+14:
            init_score, self.s8 = get_infostate(self.position)      # 获取当前得分和状态描述
            self.action = self.actor.choose_action(self.s8)         # 根据状态获取对应的动作参数列表
            
        if self.shot_num == 16:
            if self.score > 0:
                reward = 10 * self.score                            # 获取动作奖励
            else:
                reward = 0      
            self.memory.append([self.s8, self.action, reward, 0])   # 保存经验数据
            
            self.round_num += 1
            #如果处于训练模式且有12局数据待训练
            if (self.round_max > 0) and (self.round_num % 12 == 0):
                #训练模型
                actor_optim = torch.optim.Adam(self.actor.parameters(), lr=LearningRate(self.round_num))
                critic_optim = torch.optim.Adam(self.critic.parameters(), lr=LearningRate(self.round_num),
                                                weight_decay=0.0001)
                self.actor.train(), self.critic.train()
                _, loss = train_model(self.actor, self.critic, self.memory, actor_optim, critic_optim)
                #保存模型
                if self.player_is_init:
                    torch.save(self.actor.state_dict(), self.init_actor_file)
                    torch.save(self.critic.state_dict(), self.init_critic_file)
                else:
                    torch.save(self.actor.state_dict(), self.dote_actor_file)
                    torch.save(self.critic.state_dict(), self.dote_critic_file)
                print('============= Checkpoint Saved =============')
                #清空训练数据
                self.memory = deque()
                
            #将本局比分和当前loss值写入日志文件
            log_file = open(self.log_file_name, 'a+')
            log_file.write("score "+str(self.score)+" "+str(self.round_num)+"\n")
            if self.round_num % 12 == 0:
                log_file.write("loss "+str(float(loss))+" "+str(self.round_num)+"\n")
            log_file.close()
            
    def get_bestshot(self):
        return  "BESTSHOT " + str(self.action)[1:-1].replace(',', '')

#根据数字冰壶服务器界面中给出的连接信息修改CONNECTKEY，注意这个数据每次启动都会改变。
key = "test2023_2_1a5e475b-6a1c-499e-ae43-b67d72f2ab69"

myrobot = PPORobot(key, "PPORobot", round_max=10000)
myrobot.recv_forever()

### 7.2.5 训练过程曲线的绘制

读取日志文件中的数据，绘制训练过程中的比分变化曲线和loss值变化曲线。

In [None]:
#导入matplotlib函数库
import matplotlib.pyplot as plt

#定义两个曲线的坐标数组
score_x, score_y = [], []
loss_x, loss_y = [], []

#读取日志文件
log_file = open(myrobot.log_file_name, 'r')
for line in log_file.readlines():
    var_name, var_value, round_num = line.split(' ')
    #存储比分曲线数据
    if var_name == 'score':
        score_x.append(int(round_num))
        score_y.append(int(var_value))
    #存储loss曲线数据
    if var_name == 'loss':
        loss_x.append(int(round_num))
        loss_y.append(float(var_value))

#分两个子图以散点图的方式绘制比分曲线和loss值曲线
fig, axes = plt.subplots(2,1)
axes[0].scatter(np.array(score_x),np.array(score_y),s=5)
axes[1].scatter(np.array(loss_x),np.array(loss_y),s=5)

plt.show()

## 小结

本课介绍了从PPO算法及各种相关算法的原理，并给出了如何在数字冰壶比赛中应用PPO算法的范例代码。

作为强化学习算法的一种重要思想，策略梯度类型算法让我们能够处理连续的动作空间，对于解决很多连续动作空间的控制问题非常有效。同时，作为一种算法思想，策略梯度的方法也渗透到了深度学习领域的很多思想中。在很多深度学习模型中，引入强化学习的框架，并且使用策略梯度的算法能够有效提高算法的效果。

特别地，演员-评论家算法将价值网络和策略网络结合到一起，用这两个网络共同解决了一些强化学习领域的实际问题。