基于动态规划的强化学习算法有两种：一种是**策略迭代**，一种是**价值迭代**。其中策略迭代由两部分组成：**策略评估**、**策略提升**。

这些算法需要事先知道环境的状态转移函数和奖励函数。

### 悬崖漫步环境

**悬崖漫步**：在一个网格图中，每个格子表示一种状态，智能体从起点出发到达一个指定终点，每次可以选择 4 种动作：上、下、左、右。如果智能体碰到墙壁则状态不改变，否则到达相应的状态。图中有一段是悬崖，如果掉入悬崖或者到达终点会结束动作并回到起点。每走一步的奖励是 -1，掉入悬崖的奖励是 -100。

下面用代码实现这个环境。

In [30]:
import copy

# 悬崖漫步环境
class CliffWalkingEnv:
    def __init__ (self, ncol=12, nrow=4):
        self.ncol = ncol # 定义网格图的列
        self.nrow = nrow # 定义网格图的行
        self.P = self.createP()

    def createP (self):
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]] # 四种动作
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    if (i == self.nrow - 1 and j > 0): # 目前位于悬崖或者终点
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]
                        continue
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或终点
                    if (next_y == self.nrow - 1 and next_x > 0):
                        done = True
                        if (next_x != self.ncol - 1):
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

### 策略迭代算法

#### 策略评估

根据贝尔曼期望方程 $$V^{\pi}(s)=\sum_{a\in A}\pi(a|s)\left(r(s,a)+\gamma\sum_{s'\in S}p(s'|s,a)V^{\pi}(s')\right)$$

可知可以根据下一个状态的价值来计算当前状态的价值，用子问题的解来求解当前问题，即 $$V^{k+1}(s)=\sum_{a\in A}\pi(a|s)\left(r(s,a)+\gamma\sum_{s'\in S}p(s'|s,a)V^k(s')\right)$$

选定任意初始值 $V^0$，可以证明当 $k\to\infty$ 时，序列 $\{V^k\}$ 会收敛到 $V^{\pi}$。在实际实现过程中，当某一轮的 $\max_{s\in S}|V^{k+1}(s)-V^k(s)|$ 非常小，则可以提前结束策略评估。

#### 策略提升

如果智能体在状态 $s$ 采取动作 $a$，之后的动作遵循策略 $\pi$，其动作价值 $Q^{\pi}(s,a)>V^{\pi}(s)$，则说明在状态 $s$ 采取动作 $a$ 比原来的策略更好。假设存在一个确定性策略 $\pi'$，在任意状态 $s$ 都满足 $$Q^{\pi}(s,\pi'(s))\geq V^{\pi}(s)$$

则有 $$V^{\pi'}(s)\geq V^{\pi}(s)$$

这便是策略提升定理。于是可以贪心地在每个状态选择动作价值最大的动作，即 $$\pi'(s)=\argmax_a Q^{\pi}(s,a)=\argmax_a \{r(s,a)+\gamma\sum_{s'}P(s'|s,a)V^{\pi}(s')\}$$

当策略提升后得到的策略 $\pi'$ 和之前的策略 $\pi$ 一样时，说明策略迭代达到了收敛，此时即是最优策略。

策略提升定理证明：

$$
\begin{aligned}
V^{\pi}(s)&\leq Q^{\pi}(s,\pi'(s))\\
&=\mathbb{E}_{\pi'}[R_t+\gamma V^{\pi}(S_{t+1})|S_t=s]\\
&\leq\mathbb{E}_{\pi'}[R_t+\gamma Q^{\pi}(S_{t+1},\pi'(S_{t+1}))|S_t=s]\\
&=\mathbb{E}_{\pi'}[R_t+\gamma R_{t+1}+\gamma^2 V^{\pi}(S_{t+2})|S_t=s]\\
&\ \ \vdots\\
&\leq\mathbb{E}_{\pi'}[R_t+\gamma R_{t+1}+\gamma^2R_{t+2}+\gamma^3R_{t+3}+\cdots|S_t=s]\\
&=V^{\pi'}(s)
\end{aligned}
$$

#### 策略迭代算法

结合策略评估和策略提升，得到以下策略迭代算法：

- 随机初始化策略 $\pi(s)$ 和价值函数 $V(s)$
- while $\Delta>\theta$ do：(策略评估循环)
- $\ \ \ \ \ \ \ \ \Delta\gets 0$ 
- $\ \ \ \ \ \ \ $ 对于每个状态 $s\in S$：
- $\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ v\gets V(s)$
- $\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ V(s)\gets r(s,\pi(s))+\gamma\sum_{s'}P(s'|s,\pi(s))V(s')$
- $\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \Delta\gets\max(\Delta,|v-V(s)|)$
- end while
- $\pi_{old}\gets\pi$
- 对于每一个状态 $s\in S$:
- $\ \ \ \ \ \ \ \ \pi(s)\gets\argmax_a r(s,a)+\gamma\sum_{s'}P(s'|s,a)V(s')$
- 若 $\pi_{old}=\pi$，则停止并返回 $V$ 和 $\pi$；否则转到策略评估循环

下面用代码实现这个过程。

In [31]:
# 策略迭代算法
class PolicyIteration:
    def __init__ (self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow # 初始化价值
        self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)] # 初始化策略为均匀随机策略
        self.theta = theta
        self.gamma = gamma

    # 策略评估
    def policy_evaluation (self):
        cnt = 1
        while 1:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):
                qsa_list = [] # 计算该状态下的 Q(s, a) 动作价值
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(self.pi[s][a] * qsa)
                new_v[s] = sum(qsa_list) # 所有动作的动作价值函数之和即为状态价值函数
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if (max_diff < self.theta):
                break # 满足收敛条件，退出评估迭代
            cnt += 1
        print("策略评估进行 %d 轮后完成" % cnt)
    
    # 策略提升
    def policy_improvement (self):
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list] # 令动作价值最大的动作均分概率
        print("策略提升完成")
        return self.pi
    
    # 策略迭代
    def policy_iteration (self):
        while 1:
            self.policy_evaluation()
            old_pi = copy.deepcopy(self.pi)
            new_pi = self.policy_improvement()
            if (old_pi == new_pi):
                break

# 将策略可视化
def print_agent(agent, action_meaning, disaster=[], end=[]):
    print("状态价值：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')
        print()

    print("策略：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            if (i * agent.env.ncol + j) in disaster: # 悬崖状态
                print('****', end=' ')
            elif (i * agent.env.ncol + j) in end:  # 目标状态
                print('EEEE', end=' ')
            else:
                a = agent.pi[i * agent.env.ncol + j]
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
        print()

env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
agent = PolicyIteration(env, theta=0.001, gamma=0.9)
agent.policy_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])


策略评估进行 60 轮后完成
策略提升完成
策略评估进行 72 轮后完成
策略提升完成
策略评估进行 44 轮后完成
策略提升完成
策略评估进行 12 轮后完成
策略提升完成
策略评估进行 1 轮后完成
策略提升完成
状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


### 价值迭代算法

可以发现策略迭代算法中的策略评估需要进行很多轮才能收敛到某一策略的状态函数，但是我们可以利用贝尔曼最优方程，将其写成迭代更新的方式 $$V^{k+1}(s)=\max_{a\in A}\{r(s,a)+\gamma\sum_{s'\in S}P(s'|s,a)V^k(s')\}$$

当 $V^{k+1}$ 和 $V^k$ 相等的时候，此时对应着最优状态价值函数 $V^*$。然后可以利用 $\pi(s)=\argmax_a\{r(s,a)+\gamma\sum_{s'}p(s'|s,a)V^{k+1}(s')\}$，恢复出最优策略。

价值迭代算法流程如下：

- 随机初始化 $V(s)$
- while $\Delta>\theta$ do：
- $\ \ \ \ \ \ \ \ \Delta\gets 0$
- $\ \ \ \ \ \ \ $ 对于每一个状态 $s\in S$：
- $\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ v\gets V(s)$
- $\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ V(s)\gets \max_a r(s,a)+\gamma\sum_{s'}P(s'|s,a)V(s')$
- $\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \Delta\gets\max(\Delta,|v-V(s)|)$
- end while
- 返回一个确定性策略 $\pi(s)=\argmax_a\{r(s,a)+\gamma\sum_{s'}p(s'|s,a)V^{k+1}(s')\}$

下面用代码实现。

In [32]:
# 价值迭代算法
class ValueIteration:
    def __init__ (self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow
        self.theta = theta
        self.gamma = gamma
        self.pi = [None for i in range(self.env.ncol * self.env.nrow)]
    
    def value_iteration (self):
        cnt = 0
        while 1:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):
                qsa_list = []
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(qsa)
                new_v[s] = max(qsa_list) # 选择最大的动作价值函数
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if (max_diff < self.theta):
                break
            cnt += 1
        print("价值迭代一共进行 %d 轮" % cnt)
        self.get_policy()
    
    # 根据价值函数导出策略
    def get_policy (self):
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list] # 令动作价值最大的动作均分概率

env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
agent = ValueIteration(env, theta=0.001, gamma=0.9)
agent.value_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])

价值迭代一共进行 14 轮
状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


### 冰湖环境



冰湖是 OpenAI Gym 库中的一个环境，是一个大小为 4*4 的网格世界，起点状态 $S$ 在左上角，目标状态 $G$ 在右下角，中间还有若干冰洞 $H$。每个状态可以采取上、下、左、右 4 个动作，由于在冰面上行走，每次行走有一定概率滑行到附近的其他状态。到达冰洞或目标状态都会提前结束，每次行走的奖励是 0，到达目标的奖励是 1。

下面用代码实现。

In [33]:
import gym
env = gym.make("FrozenLake-v1") # 创建环境
env = env.unwrapped # 解封装才能访问状态转移矩阵P

holes = set()
ends = set()
for s in env.P:
    for a in env.P[s]:
        for s_ in env.P[s][a]:
            if s_[2] == 1.0: # 获得奖励为 1，代表是目标
                ends.add(s_[1])
            if s_[3] == True: # 若提前终止，说明是冰洞或目标
                holes.add(s_[1])
holes = holes - ends

print("冰洞的索引:", holes)
print("目标的索引:", ends)

冰洞的索引: {11, 12, 5, 7}
目标的索引: {15}


In [34]:
action_meaning = ['<', 'v', '>', '^']
agent = PolicyIteration(env, theta=1e-5, gamma=0.9)
agent.policy_iteration()
print_agent(agent, action_meaning, list(holes), list(ends))

策略评估进行 25 轮后完成
策略提升完成
策略评估进行 58 轮后完成
策略提升完成
状态价值：
 0.069  0.061  0.074  0.056 
 0.092  0.000  0.112  0.000 
 0.145  0.247  0.300  0.000 
 0.000  0.380  0.639  0.000 
策略：
<ooo ooo^ <ooo ooo^ 
<ooo **** <o>o **** 
ooo^ ovoo <ooo **** 
**** oo>o ovoo EEEE 


In [35]:
action_meaning = ['<', 'v', '>', '^']
agent = ValueIteration(env, theta=1e-5, gamma=0.9)
agent.value_iteration()
print_agent(agent, action_meaning, list(holes), list(ends))

价值迭代一共进行 60 轮
状态价值：
 0.069  0.061  0.074  0.056 
 0.092  0.000  0.112  0.000 
 0.145  0.247  0.300  0.000 
 0.000  0.380  0.639  0.000 
策略：
<ooo ooo^ <ooo ooo^ 
<ooo **** <o>o **** 
ooo^ ovoo <ooo **** 
**** oo>o ovoo EEEE 
