这一章主要是DP(Dynamic Programming)在RL中相关的东西

基于DP的强化学习算法主要有:策略迭代policy iteration & 价值迭代value iteration, policy iteration可以分为策略评估policy evaluation & 策略提升policy improvement

具体来说，策略迭代中的策略评估使用贝尔曼期望方程来得到一个策略的状态价值函数，这是一个动态规划的过程；而价值迭代直接使用贝尔曼最优方程来进行动态规划，得到最终的最优状态价值。

基于动态规划的这两种强化学习算法要求事先知道环境的状态转移函数和奖励函数，也就是需要知道整个马尔可夫决策过程。

在这样一个白盒环境中，不需要通过智能体和环境的大量交互来学习，可以直接用动态规划求解状态价值函数。

但是，现实中的白盒环境很少，这也是动态规划算法的局限之处，我们无法将其运用到很多实际场景中。

另外，策略迭代和价值迭代通常只适用于有限马尔可夫决策过程，即状态空间和动作空间是离散且有限的。

环境介绍:悬崖漫步Cliff Walking

悬崖漫步是一个非常经典的强化学习环境，它要求一个智能体从起点出发，避开悬崖行走，最终到达目标位置。有一个 4×12 的网格世界，每一个网格表示一个状态。智能体的起点是左下角的状态，目标是右下角的状态，智能体在每一个状态都可以采取 4 种动作：上、下、左、右。如果智能体采取动作后触碰到边界墙壁则状态不发生改变，否则就会相应到达下一个状态。环境中有一段悬崖，智能体掉入悬崖或到达目标状态都会结束动作并回到起点，也就是说掉入悬崖或者达到目标状态是终止状态。智能体每走一步的奖励是 −1，掉入悬崖的奖励是 −100。


In [None]:
import copy

class CliffWalkingEnv:
    """ 悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 定义网格世界的列
        self.nrow = nrow  # 定义网格世界的行
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励, done表示是否到达终止状态
        self.P = self.createP()

    def createP(self):
        # 初始化
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
                                                    True)]
                        continue
                    # 其他位置
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

## 策略迭代算法

### 策略评估

贝尔曼期望方程:

\begin{aligned}V^{\pi}(s)&=\mathbb{E}_\pi[R_t+\gamma V^\pi(S_{t+1})|S_t=s]\\&=\sum_{a\in A}\pi(a|s)\left(r(s,a)+\gamma\sum_{s^{\prime}\in S}p(s^{\prime}|s,a)V^\pi(s^{\prime})\right)\end{aligned}

这个公式的作用是根据其他的价值函数计算当前的价值函数,可以将其修改为

\begin{aligned}V^{k+1}(s)=\sum_{a\in A}\pi(a|s)\left(r(s,a)+\gamma\sum_{s^{\prime}\in S}P(s^{\prime}|s,a)V^k(s^{\prime})\right)\end{aligned}

表示每一轮的迭代,可以证明V^k最终会收敛到V^pi,初始值V0可以任意指定

在实际编程中,可以设置阈值,如果 $\max_{s\in\mathcal{S}}|V^{k+1}(s)-V^k(s)|$ 比较小,就可以结束策略评估 

### 策略提升

建立在策略评估的基础上,可以通过最大化动作价值函数达到改进策略的目的(贪心策略 $\pi'$ ,视作确定性策略,将此策略代入Q,可以得出Q在此时与V等价)

假设: 

$$Q^\pi(s,\pi^{\prime}(s))\geq V^\pi(s)$$

可以得出

\begin{aligned}V^{\pi}(s)&\leq Q^\pi(s,\pi^{\prime}(s))\\&=\mathbb{E}_{\pi^{\prime}}[R_t+\gamma V^{\pi}(S_{t+1})|S_t=s]\\&\leq\mathbb{E}_{\pi^{\prime}}[R_t+\gamma Q^\pi(S_{t+1},\pi^{\prime}(S_{t+1}))|S_t=s]\\&=\mathbb{E}_{\pi^{\prime}}[R_t+\gamma R_{t+1}+\gamma^2V^\pi(S_{t+2})|S_t=s]\\&\leq\mathbb{E}_{\pi^{\prime}}[R_t+\gamma R_{t+1}+\gamma^2R_{t+2}+\gamma^3V^\pi(S_{t+3})|S_t=s]\\&:\\&\leq\mathbb{E}_{\pi^{\prime}}[R_t+\gamma R_{t+1}+\gamma^2R_{t+2}+\gamma^3R_{t+3}+\cdots|S_t=s]\\&=V^{\pi^{\prime}}(s)\end{aligned}

即为策略提升定理,根据此定理可以使用上面提到的贪心策略得出

$$\pi^{\prime}(s)=\arg\max_aQ^\pi(s,a)=\arg\max_a\{r(s,a)+\gamma\sum_{s^{\prime}}P(s^{\prime}|s,a)V^\pi(s^{\prime})\}$$

结合策略评估和策略提升，我们得到以下策略迭代算法：

·随机初始化策略 $\pi(s)$ 和价值函数 $V(s)$

$\cdot$ while $\Delta>\theta$ do: (策略评估循环)

$.\Delta\leftarrow0$

$\cdot$   对于每一个状态$s\in\mathcal{S}$:

$$\begin{aligned}&v\leftarrow V(S)\\&V(s)\leftarrow r(s,\pi(s))+\gamma\sum_{s^{\prime}}P(s^{\prime}|s,\pi(s))V(s^{\prime})\\&\Delta\leftarrow\max(\Delta,|v-V(s)|)\end{aligned}$$


. end while

$\cdot\pi_\mathrm{old}\leftarrow\pi$

·对于每一个状态$s\in\mathcal{S}$:

$$\pi(s)\leftarrow\arg\max_ar(s,a)+\gamma\sum_{s^{\prime}}P(s^{\prime}|s,a)V(s^{\prime})$$
·若$\pi_\mathrm{old}=\pi$,则停止算法并返回$V$ 和 $\pi;$ 否则转到策略评估循环

我们现在来看一下策略迭代算法的代码实现过程。

In [49]:
class PolicyIteration:
    def __init__(self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值
        self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)]  # 初始化策略
        self.theta = theta
        self.gamma = gamma

    def policy_evaluation(self):  # 策略评估
        cnt = 1  # 计数器
        while 1:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):


                qsa_list = []  # 开始计算状态s下的所有Q(s,a)价值
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                        # 本章环境比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘
                    qsa_list.append(self.pi[s][a] * qsa)
                # 再乘策略就得到了v

                new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break  # 满足收敛条件,退出评估迭代
            cnt += 1
        print("策略评估进行%d轮后完成" % cnt)

    def policy_improvement(self):  # 策略提升
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
            # 让这些动作均分概率
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
        print("策略提升完成")
        return self.pi

    def policy_iteration(self):  # 策略迭代
        while 1:
            self.policy_evaluation()
            old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较
            """
            - 浅度拷贝只复制最外层对象，内部的嵌套对象（如列表、字典等）仍然引用原来的对象。修改嵌套对象会影响原对象
            - 深度拷贝会递归复制所有层级的对象，原对象和新对象完全独立，互不影响。
            """
            new_pi = self.policy_improvement()
            if old_pi == new_pi: break

def print_agent(agent, action_meaning, disaster=[], end=[]):
    print("状态价值：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 为了输出美观,保持输出6个字符
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')
        print()

    print("策略：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 一些特殊的状态,例如悬崖漫步中的悬崖
            if (i * agent.env.ncol + j) in disaster:
                print('****', end=' ')
            elif (i * agent.env.ncol + j) in end:  # 目标状态
                print('EEEE', end=' ')
            else:
                a = agent.pi[i * agent.env.ncol + j]
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
        print()


env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])

策略评估进行60轮后完成
策略提升完成
策略评估进行72轮后完成
策略提升完成
策略评估进行44轮后完成
策略提升完成
策略评估进行12轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


## 价值迭代算法

从上面的代码运行结果中我们能发现，策略迭代中的策略评估需要进行很多轮才能收敛得到某一策略的状态函数，这需要很大的计算量，尤其是在状态和动作空间比较大的情况下。我们是否必须要完全等到策略评估完成后再进行策略提升呢？试想一下，可能出现这样的情况：虽然状态价值函数还没有收敛，但是不论接下来怎么更新状态价值，策略提升得到的都是同一个策略。如果只在策略评估中进行一轮价值更新，然后直接根据更新后的价值进行策略提升，这样是否可以呢？答案是肯定的，这其实就是本节将要讲解的价值迭代算法，**它可以被认为是一种策略评估只进行了一轮更新的策略迭代算法**。需要注意的是，价值迭代中不存在显式的策略，我们只维护一个状态价值函数。

$$V^{k+1}(s)=\max_{a\in\mathcal{A}}\{r(s,a)+\gamma\sum_{s^{\prime}\in\mathcal{S}}P(s^{\prime}|s,a)V^k(s^{\prime})\}$$

### 价值迭代算法流程如下：

· 随机初始化$V(s)$

. while $\Delta>\theta$ do :

$.\Delta\leftarrow0$

.  对于每一个状态$s\in\mathcal{S}:$

$$\begin{aligned}&v\leftarrow V(s)\\&V(s)\leftarrow\max_ar(s,a)+\gamma\sum_{s^{\prime}}P(s^{\prime}|s,a)V(s^{\prime})\\&\Delta\leftarrow\max(\Delta,|v-V(s)|)\end{aligned}$$

·end while

. 返回一个确定性策略

$$\pi(s)=\arg\max_a\{r(s,a)+\gamma\sum_{s^{\prime}}P(s^{\prime}|s,a)V(s^{\prime})\}$$

我们现在来编写价值迭代的代码。

In [50]:
class ValueIteration:
    """ 价值迭代算法 """
    def __init__(self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0
        self.theta = theta  # 价值收敛阈值
        self.gamma = gamma
        # 价值迭代结束后得到的策略
        self.pi = [None for i in range(self.env.ncol * self.env.nrow)]

    def value_iteration(self):
        cnt = 0
        while 1:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):


                qsa_list = []  # 开始计算状态s下的所有Q(s,a)价值
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(qsa)  # 这一行和下一行代码是价值迭代和策略迭代的主要区别,策略迭代中为 qsa_list.append(self.pi[s][a] * qsa)
                new_v[s] = max(qsa_list)  # 策略迭代中, 这一行为 new_v[s] = sum(qsa_list)
                """
                简言之就是对策略的加权求和换成了取最大值,这也是最优方程和期望方程的区别
                """

                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break  # 满足收敛条件,退出评估迭代
            cnt += 1
        print("价值迭代一共进行%d轮" % cnt)
        self.get_policy()

    def get_policy(self):  # 根据价值函数导出一个贪婪策略
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
            # 让这些动作均分概率
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]


env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])

价值迭代一共进行14轮
状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


openai有一个gym库, 其中包含了很多有名的环境(比如Mujoco), 可以调用这些环境做一些操作

In [51]:
import gym
import pygame

desc = [
    "SFFF",
    "FHFH",
    "FFFH",
    "HFFG"
]

env = gym.make("FrozenLake-v1", is_slippery = False, render_mode="rgb_array")
env = env.unwrapped  # 使得能够访问转移矩阵
env.reset()



holes = set()
ends = set()

# 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励, done表示是否到达终止状态

for s in env.P:
    for a in env.P[s]:
        for s_ in env.P[s][a]:
            if s_[2] == 1.0:
                ends.add(s_[1])
            if s_[3] == True:
                holes.add(s_[1])

holes = holes - ends

print("冰洞的索引:", holes)
print("目标的索引:", ends)

for a in env.P[14]:  # 查看目标左边一格的状态转移信息
    print(env.P[14][a])

# 每个动作都会等概率“滑行”到 3 种可能的结果, 可以通过制定is_slippery修改

img = env.render()  # 返回一个 RGB 数组

# 用 pygame 显示图像（可选）
pygame.init()
screen = pygame.display.set_mode((img.shape[1], img.shape[0]))
pygame.surfarray.blit_array(screen, img.transpose(1, 0, 2))
pygame.display.update()

# 保持窗口打开
running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
pygame.quit()

冰洞的索引: {11, 12, 5, 7}
目标的索引: {15}
[(1.0, 13, 0.0, False)]
[(1.0, 14, 0.0, False)]
[(1.0, 15, 1.0, True)]
[(1.0, 10, 0.0, False)]


In [52]:
# 尝试一下策略迭代
# 这个动作意义是Gym库针对冰湖环境事先规定好的
action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])  # 此处应如实反映, 这里的修改并不能反映到env.中

策略评估进行25轮后完成
策略提升完成
策略评估进行7轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
状态价值：
 0.590  0.656  0.729  0.656 
 0.656  0.000  0.810  0.000 
 0.729  0.810  0.900  0.000 
 0.000  0.900  1.000  0.000 
策略：
ov>o oo>o ovoo <ooo 
ovoo **** ovoo **** 
oo>o ov>o ovoo **** 
**** oo>o oo>o EEEE 


In [53]:
# 尝试一下价值迭代
action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])  # 此处应如实反映, 这里的修改并不能反映到env.中

价值迭代一共进行6轮
状态价值：
 0.590  0.656  0.729  0.656 
 0.656  0.000  0.810  0.000 
 0.729  0.810  0.900  0.000 
 0.000  0.900  1.000  0.000 
策略：
ov>o oo>o ovoo <ooo 
ovoo **** ovoo **** 
oo>o ov>o ovoo **** 
**** oo>o oo>o EEEE 
