# 贝尔曼公式

In [None]:
import numpy as np

#状态转移概率矩阵
P = np.array([
    [0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
    [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
    [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
])

#到达每一个状态的奖励
R = np.array([-1, -2, -2, 10, 1, 0])

$G_t$为一个轨迹的discounted return
$$
\begin{aligned}
G_t &= R_{t+1}+\gamma R_{t+2} + \gamma^2 R_{t+3} + …… \\
	&= R_{t+1}+\gamma (R_{t+2} + \gamma R_{t+3} + ……) \\
	&= R_{t+1} + \gamma G_{t+1}
\end{aligned}
$$

In [None]:
#给定一条序列,计算回报,计算的就是discounted return
def value_by_chain(chain):
    s = 0
    for i, c in enumerate(chain):
        #给每一步的反馈做一个系数,随着步数往前衰减
        s += R[c] * 0.5**i

    #最终的反馈是所有步数衰减后的求和
    return s
value_by_chain(np.array([0, 1, 2, 5]))

贝尔曼公式为
- 代数形式
描述了不同状态值之间的关系，对于所有状态都成立，即一个状态对应一个式子
    $$
    \begin{aligned}
    v_\pi(s)
    &= \mathbb{E}[R_{t+1}|S_t=s] +\gamma\mathbb{E}[G_{t+1}|S_t=s], \\
    &= \underbrace{\sum_{a}\pi(a|s)\sum_r p(r|s,a)r}_{当前奖励的平均值} +\underbrace{\gamma \sum_a \pi(a|s)\sum_{s'}p(s'|s,a)v_{\pi}(s')}_{未来奖励的平均值}\\
    &= 	\sum_{a}\pi(a|s) \left[ \sum_{r}p(r|s,a)r+\gamma\sum_{s'}p(s’|s,a)v_{\pi}(s') \right], \forall s\in S.
    \end{aligned}
    $$

- 矩阵形式
    本次实验的贝尔曼公式（矩阵形式）
    $$
        v_{\pi}(s_i)=r_{\pi}(s_i)+\gamma \sum_{s_j}p_{\pi}(s_j|s_i)v_{\pi}(s_j)
    $$
    $$
    \underbrace{v_{\pi}(s)}_{value[i]} = \underbrace{r(s)}_{R[i]} + \underbrace{γP_{\pi}v_{\pi}(s')}_{γ=0.5,\ 0.5 * P[i].dot(value)}
    $$

In [None]:
#梯度下降法计算贝尔曼矩阵,近似值
# 由于对所有状态都成立，所以要遍历所有状态
def get_bellman():
    #初始化values
    value = np.ones([6])
    # 迭代200次
    for _ in range(200):
        # 6个动作
        for i in range(6):
            #每一行的概率和它对应的value相乘，乘以gamma，然后和奖励相加
            #反复计算，就收敛到了贝尔曼方程矩阵
            value[i] = R[i] + 0.5 * P[i].dot(value)

    return value
get_bellman()

In [None]:
#解析解贝尔曼方程矩阵，真实值
def get_bellman():
    mat = np.eye(*P.shape)
    mat -= 0.5 * P
    mat = np.linalg.inv(mat)

    return mat.dot(R)


get_bellman()

# 贝尔曼最优公式
贝尔曼最优公式是一个理论基础，它需要与特定的算法结合，才能实际应用于解决问题。

In [25]:
# 构造一个网格环境(3x3)
def EnvSetting(row, col):
    if row == 0 and col == 1:
        return 'trap'
    if row == 2 and col == 0:
        return 'trap'
    if row == 2 and col ==2:
        return 'terminal'
    return 'ground'

In [31]:
# 设值agent在每一个格子里的动作
def move(row, col, action):
    #如果当前已经在陷阱或者终点，则不能执行任何动作，反馈都是0
    if EnvSetting(row, col) in ['trap', 'terminal']:
        return row, col, 0
    #↑
    if action == 0:
        row -= 1
    #↓
    if action == 1:
        row += 1
    #←
    if action == 2:
        col -= 1
    #→
    if action == 3:
        col += 1
    #不允许走到地图外面去
    row = max(0, row)
    row = min(2, row)
    col = max(0, col)
    col = min(2, col)

    #是陷阱的话，奖励是-100，否则都是-1
    #这样强迫了机器尽快结束游戏,因为每走一步都要扣一分
    #结束最好是以走到终点的形式,避免被扣100分
    reward = -1
    if EnvSetting(row, col) == 'trap':
        reward = -100

    return row, col, reward

In [32]:
import numpy as np

#初始化每个格子的价值
values = np.zeros([3, 3])

#初始化每个格子下采用动作的概率
pi = np.ones([3, 3, 4]) * 0.25

In [33]:
#值评估,返回更新后的状态值矩阵
def valuesEvaluation():

    #初始化一个新的values,重新评估所有格子的分数
    new_values = np.zeros([3, 3])
    # 0.9是折扣因子γ
    gamma = 0.9
    #遍历所有格子
    for row in range(3):
        for col in range(3):
            # 如果下个状态是终点或者陷阱,就跳过计算
            if EnvSetting(row, col) in ['trap', 'terminal']:
                continue

            # 应用贝尔曼最优公式得出最佳价值
            maxValue = -float('inf')
            for action in range(4):
                next_row, next_col, reward = move(row, col, action)
                # 计算当前状态下执行动作后的预期价值
                value = reward + gamma * values[next_row, next_col]
                maxValue = max(maxValue, value)
            new_values[row, col] = maxValue

    return new_values

In [34]:
# 策略更新
def policyUpdate():
    #重新初始化每个格子下采用动作的概率,重新评估
    new_pi = np.zeros([3, 3, 4])
    # 0.9是折扣因子γ
    gamma = 0.9
    #遍历所有格子
    for row in range(3):
        for col in range(3):

            if EnvSetting(row, col) in ['trap', 'terminal']:
                continue

            #计算当前格子4个动作分别的分数
            action_value = np.zeros(4)

            #遍历所有动作
            for action in range(4):
                next_row, next_col, reward = move(row, col, action)
                # 计算当前状态下执行动作后的预期价值
                action_value[action] = reward + gamma * values[next_row, next_col]

            #计算当前状态下，达到最大分数的动作有几个
            count = (action_value == action_value.max()).sum()

            #让这些动作均分概率
            for action in range(4):
                if action_value[action] == action_value.max():
                    new_pi[row, col, action] = 1 / count
                else:
                    new_pi[row, col, action] = 0

    return new_pi

In [35]:
#循环迭代策略评估和策略提升，寻找最优解
for _ in range(10):
    for _ in range(100):
        values = valuesEvaluation()
    pi = policyUpdate()

values,pi

(array([[-3.439,  0.   , -1.9  ],
        [-2.71 , -1.9  , -1.   ],
        [ 0.   , -1.   ,  0.   ]]),
 array([[[0. , 1. , 0. , 0. ],
         [0. , 0. , 0. , 0. ],
         [0. , 1. , 0. , 0. ]],
 
        [[0. , 0. , 0. , 1. ],
         [0. , 0.5, 0. , 0.5],
         [0. , 1. , 0. , 0. ]],
 
        [[0. , 0. , 0. , 0. ],
         [0. , 0. , 0. , 1. ],
         [0. , 0. , 0. , 0. ]]]))

In [36]:
# 打印游戏，方便测试
def show(row, col, action):
    graph = [
        '□', '□', '○', '□', '□', '□', '○', '□', '❤'
    ]

    action = {0: '↑', 1: '↓', 2: '←', 3: '→'}[action]

    graph[row * 3 + col] = action

    graph = ''.join(graph)

    for i in range(0, 3 * 3, 3):
        print(graph[i:i + 3])


show(1, 1, 0)

□□○
□↑□
○□❤


In [40]:
from IPython import display
import time


def test():
    #起点在0,0
    row = 0
    col = 0

    #最多玩N步
    for _ in range(200):

        #选择一个动作
        action = np.random.choice(np.arange(4), size=1, p=pi[row, col])[0]

        #打印这个动作
        display.clear_output(wait=True)
        time.sleep(0.5)
        show(row, col, action)

        #执行动作
        row, col, reward = move(row, col, action)

        #获取当前状态，如果状态是终点或者掉陷阱则终止
        if EnvSetting(row, col) in ['trap', 'terminal']:
            break

In [41]:
test()

□□○
□□□
○→❤
