In [1]:
from schorl_utils.envs import *
from schorl_utils.functions import Train, Agent
from schorl_utils.net import generate_mlpnet, show_net_structure
import gym

# 以CartPole-v1环境为例：连续状态，离散动作环境
env = gym.make('CartPole-v1', new_step_api=True)
print(f"状态空间 : {env.observation_space}\n obs shape : {env.observation_space.shape}")
print(f"动作空间 : {env.action_space}\n aciton nums : {env.action_space.n}")

状态空间 : Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
 obs shape : (4,)
动作空间 : Discrete(2)
 aciton nums : 2


---
# 策略学习算法
策略学习算法要学习的是一个策略函数，我们定义一个参数为$\theta$的策略函数，

该函数的输入状态，输出是动作概率分布。

算法在训练过程中追求的目标是寻找一个最优的策略，即一组参数使得策略在当前环境中的回报期望最大化。

我们用状态值函数来表征某状态下当前策略的好坏程度，那么策略在环境中的回报期望就是对状态值函数的期望：

$\pi(s)=$
$J(\theta)=\mathbb{E}[V^{\pi_{\theta}}(s_0)]$


---
## 下面创建策略网络
如果是离散动作空间，输出增加一个softmax层，将输出的动作向量变为概率分布；后续可以使用epsilon贪心策略来平衡探索利用问题

如果是连续动作空间，可以增加一个tahn层，将输出变为(-1,1)内的值，再根据动作的范围进行放缩

连续动作在做动作选择的时候可以通过增加噪声来平衡探索利用

In [2]:
# 创建一个全连接网络作为策略网络，此处离散动作空间，故增加一个softmax层
policyNet = generate_mlpnet([env.observation_space.shape[0], 
                128, 64, env.action_space.n],
                if_softmax=True)

print("策略网络")
show_net_structure(policyNet, (env.observation_space.shape[0],))

策略网络
The structure of the net:
Flatten output shape : torch.Size([5, 4])
Linear output shape : torch.Size([5, 128])
Tanh output shape : torch.Size([5, 128])
Linear output shape : torch.Size([5, 64])
Tanh output shape : torch.Size([5, 64])
Linear output shape : torch.Size([5, 2])
Softmax output shape : torch.Size([5, 2])


---
回顾一下动作价值函数和状态价值函数
#### 动作价值函数 Action-value function
折扣奖励<br>
$U_t=R_t+\gamma \cdot R_{t+1} + \gamma^2 \cdot R_{t+2} + \gamma^3 \cdot \R_{t+3} + ... $

动作值函数<br>
$Q_\pi(s_t,a_t)=\mathbb{E}[U_t|S_t=s_t, A_t=a_t]$
> 动作值函数，是当前状态st下选择动作at的期望回报

#### 状态价值函数
$V_\pi(s_t)=\mathbb{E}[Q_\pi(s_t,A)]=\sum_a\pi(a|s_t) \cdot Q_\pi(s_t,a)$
> 状态值函数，是当前状态下该策略的期望回报，状态值函数是动作值函数关于策略的所有动作的期望

---
### 使用神经网络近似策略函数
则此时的状态值函数为：<br>
$V_\pi(s_t;\theta)=\sum_a\pi(a|s_t;\theta) \cdot Q_\pi(s_t,a)$

定义$J(\theta)$为对策略网络的评价：<br>
$J(\theta)=\mathbb{E}[V_\pi(s_t;\theta)]$

### 采用梯度上升来更新$\theta$以提升策略
我们对$J(\theta)$求导，但是计算$J(\theta)$是不切实际的，我们无法遍历所有情况也不会遍历所有情况，所以这个期望是不可能准确算出来。这里采用对$V_\pi(s;\theta)$的导数来近似$J(\theta)$的导数。由于s可以看作是随机采样的其中一个状态，故此时梯度是随机梯度。

离散动作：$\frac{\partial V(s;\theta)}{\partial \theta} = \sum_a 
    \frac{\partial \pi(a|s;\theta)}{\partial \theta} \cdot Q_\pi(s,a)$

连续动作：$\frac{\partial V(s;\theta)}{\partial \theta} = \mathbb{E}_{A\sim\pi(\cdot|s;\theta)}
    [\frac{\partial log \pi(A|s;\theta)}{\partial \theta} \cdot Q_\pi(s,A)]$

由于连续动作时，无法准确积分求真实的期望，所以采用蒙特卡洛采样来近似期望。随机采样一组或者多组的动作来计算这个期望。

---
策略梯度类算法的大致过程
1. 获得状态$s_t$
2. 根据策略$\pi(\cdot|s_t;\theta_t)$随机采样动作$a_t$
3. 计算$Q(s_t, a_t)$
4. 对网络求导(神经网络框架自带功能)，$d_{\theta,t}=\frac{\partial log\pi(a_t|s_t;\theta)}{\partial \theta}|_{\theta=\theta_t}$
5. 近似计算策略梯度，$g(a_t,\theta_t)=Q(s_t,a_t)\cdot d_{\theta,t}$
6. 更新策略网络，$\theta_{t+1} = \theta_t + \beta \cdot g(a_t,\theta_t)$ , $\beta$学习率


### 对Q(s_t,a_t)的计算方式不同，主要有以下两类算法
#### 一、REINFORCE算法
1. 跑完一局游戏，采样获得一个轨迹，$s_1,a_1,r_1,s_2,a_2,r_2,...$
2. 计算累计折扣奖励，$u_t = \sum^T_{k=t}\gamma^k r_k$
3. Q(s_t,a_t)严格意义上是累计折扣奖励的期望，此处用一局的累计折扣近似期望

reinforce算法过程：<br>
- 采样一条轨迹
- for step in reversed(轨迹):<br>
-   - 计算$u_{step}$
-   - 更新参数，（$\theta$+求对数后的导数*对应的Q值）

由于计算的Q可能波动较大，一般减去一个base来使之稳定一点，比如V(s),减去Q的均值也可以。

#### 二、AC算法
用神经网络来近似Q网络，计算q值。<br>
好处是算法可以单步执行了，而且随着训练进行神经网络估计得q值比采样得方式要稳定。

In [3]:
import numpy as np
from schorl_utils.functions import UPDATE_MODE
# 此处以reinforce算法实现

class Reinforce(Agent):
    def __init__(self, 
            net, 
            nums_action
            ) -> None:
        super().__init__()  # 初始化默认参数
        self.policy = net
        self.nums_action = nums_action
        self.policy_optim = self.optim(self.policy.parameters(), self.lr)
    
    def get_action(self, state:torch.tensor):
        # 数据格式转换的操作在Train内部已经实现
        probs = self.policy(state)  # 此处返回的是各个动作相应的softmax后的概率组合
        action_dist = torch.distributions.Categorical(probs)  # 按概率分布的采样工具
        action = action_dist.sample().item()
        return action, probs[0,action] # 采样一个动作,后续要用该动作的概率
        # 也可以直接这里返回 action_dist.log_prob(action) 即对数概率

    def update(self, obs_track:list, *args):
        # reinforce算法 蒙特卡洛积分，用ut代替Q
        # Q = sum([reward_track[i]*self.gamma**i for i in range(len(reward_track))])

        probs_track = args[0]
        reward_track = [i.reward for i in obs_track]

        self.policy_optim.zero_grad()
        # 对每步分别进行loss反向传播累计
        for step in range(len(reward_track)):
            this_step = reward_track[step:]
            Q = sum([this_step[i]*self.gamma**i for i in range(len(this_step))])
            log_prob = torch.log(probs_track[step][0])
            loss = -log_prob * Q
            # 在王树森老师的书中，此处的Q是整场游戏的Q
            # 整场的Q，算法学习效果较差，学习慢，数据利用率低
            # 在张伟楠老师的代码中，此处的Q是每一步的累计Q 
            loss.backward()
        self.policy_optim.step()

        return loss


In [4]:
class ReinfroceCartpoleTrain(Train):
    def __init__(self, env, agent:Reinforce) -> None:
        super().__init__(env, agent)
        self.update_mode = UPDATE_MODE.MULTI_STEP   # reinforce是多步更新
    
    def run_episode(self):
        done = False
        reward_list = []
        prob_list = []
        state = self.env.reset()
        while not done:
            action, prob = self.agent(state)
            got = self.env.step(action)
            reward_list.append(got[1])
            prob_list.append(prob)
            state = got[0]
            done = got[2]
        
        self.env.close()
        loss = self.agent.update(reward_list, prob_list)

        return {'accumulated reward':sum(reward_list), 'loss':loss}

In [5]:
num_episodes = 500

net = policyNet

agent = Reinforce(net, env.action_space.n)

train = ReinfroceCartpoleTrain(env=env, agent=agent)
train.tblogpath = './reforcelog'

train.train(num_episodes)

  return torch.tensor(data, dtype=self.data_type).to(device=self.device)
Iteration 0: 100%|██████████| 50/50 [00:00<00:00, 95.06it/s, episode=50, mean reward=1.000] 
Iteration 1: 100%|██████████| 50/50 [00:02<00:00, 18.85it/s, episode=100, mean reward=1.000]
Iteration 2: 100%|██████████| 50/50 [00:05<00:00,  9.16it/s, episode=150, mean reward=1.000]
Iteration 3: 100%|██████████| 50/50 [00:25<00:00,  1.92it/s, episode=200, mean reward=1.000]
Iteration 4: 100%|██████████| 50/50 [00:19<00:00,  2.60it/s, episode=250, mean reward=1.000]
Iteration 5: 100%|██████████| 50/50 [00:17<00:00,  2.87it/s, episode=300, mean reward=1.000]
Iteration 6: 100%|██████████| 50/50 [00:15<00:00,  3.20it/s, episode=350, mean reward=1.000]
Iteration 7: 100%|██████████| 50/50 [00:03<00:00, 16.56it/s, episode=400, mean reward=1.000]
Iteration 8: 100%|██████████| 50/50 [00:02<00:00, 23.93it/s, episode=450, mean reward=1.000]
Iteration 9: 100%|██████████| 50/50 [00:02<00:00, 18.89it/s, episode=500, mean reward=1.00

In [6]:
agent.save_net(agent.policy ,'./model/reforceCartpole.pt')

In [7]:
!tensorboard --logdir=./reforcelog --port 8123
# open web browser and visit 127.0.0.1:8123

^C


In [9]:
import gym

env = gym.make('CartPole-v1', new_step_api=True)

net = generate_mlpnet([env.observation_space.shape[0], 
                128, 64, env.action_space.n],
                if_softmax=True)
model = torch.load('./model/reforceCartpole.pt')
net.load_state_dict(model)

done = False
state = env.reset()
while not done:
    env.render()
    state = torch.tensor(np.array([state]), dtype=torch.float).to(get_device())
    action = net(state).argmax().item()
    got = env.step(action=action)
    state = got[0]
    done = got[2]
env.close()

# 策略梯度算法进阶

引入 baseline 提升算法稳定性：在梯度更新时，将Q换成(Q-b)，其中b可以是状态价值函数，即使用 Q - (Q的期望)来减小随机采样不确定性带来的收敛差的问题。

$\nabla_{\theta}J(\theta) = \mathbb{E}_s[\mathbb{E}_{A\sim\pi(\cdot|s;\theta)}
    [\frac{\partial log \pi(A|s;\theta)}{\partial \theta} \cdot (Q_\pi(s,A) - b)]]$

理论可以证明，减去基线的随机梯度是无偏的。

#### 对 REINFORCE算法
增加一个神经网络来估计V作为baseline<br>\
注意这里和AC算法的不同，这里的这个神经网络只是作为baseline并没有对策略做评价，这一点和AC算法本质不同。