关于马尔科夫决策过程和贝克曼公式，贝克曼最优公式的推导和使用参考：[强化学习的数学原理](https://www.bilibili.com/video/BV1sd4y167NS?spm_id_from=333.788.videopod.episodes&vd_source=afcebd8a8957f7c00644d992cf4c88b4&p=2)

讲的太好导致没有动力整理笔记（

In [4]:
import numpy as np
np.random.seed(0)

P = [[0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
     [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
     [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
     [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
     [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
     [0.0, 0.0, 0.0, 0.0, 0.0, 1.0]]
P = np.array(P)

rewards = [-1, -2, -2, 10, 1, 0]
gamma = 0.5

def compute_return(start_index, chain, gamma):
    G = 0 
    for i in reversed(range(start_index, len(chain))):
        G = gamma * G + rewards[chain[i] - 1]
    return G

chain = [1, 2, 3, 6]
start_index = 0
G = compute_return(start_index, chain, gamma)
print(f"根据本序列计算得到回报为：{G}")

根据本序列计算得到回报为：-2.5


In [5]:
def compute(P, rewards, gamma, states_num):
    rewards = np.array(rewards).reshape((-1, 1))
    value = np.dot(np.linalg.inv(np.eye(states_num, states_num) - gamma * P),
                   rewards)
    return value

V = compute(P, rewards, gamma, 6)
print(f"MRP中每个状态价值分别为\n{V}")

MRP中每个状态价值分别为
[[-2.01950168]
 [-2.21451846]
 [ 1.16142785]
 [10.53809283]
 [ 3.58728554]
 [ 0.        ]]


In [6]:
S = ["s1", "s2", "s3", "s4", "s5"]  # 状态集合
A = ["保持s1", "前往s1", "前往s2", "前往s3", "前往s4", "前往s5", "概率前往"]  # 动作集合
# 状态转移函数
P = {
    "s1-保持s1-s1": 1.0,
    "s1-前往s2-s2": 1.0,
    "s2-前往s1-s1": 1.0,
    "s2-前往s3-s3": 1.0,
    "s3-前往s4-s4": 1.0,
    "s3-前往s5-s5": 1.0,
    "s4-前往s5-s5": 1.0,
    "s4-概率前往-s2": 0.2,
    "s4-概率前往-s3": 0.4,
    "s4-概率前往-s4": 0.4,
}
# 奖励函数
R = {
    "s1-保持s1": -1,
    "s1-前往s2": 0,
    "s2-前往s1": -1,
    "s2-前往s3": -2,
    "s3-前往s4": -2,
    "s3-前往s5": 0,
    "s4-前往s5": 10,
    "s4-概率前往": 1,
}
gamma = 0.5  # 折扣因子
MDP = (S, A, P, R, gamma)

# 策略1,随机策略
Pi_1 = {
    "s1-保持s1": 0.5,
    "s1-前往s2": 0.5,
    "s2-前往s1": 0.5,
    "s2-前往s3": 0.5,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.5,
    "s4-概率前往": 0.5,
}
# 策略2
Pi_2 = {
    "s1-保持s1": 0.6,
    "s1-前往s2": 0.4,
    "s2-前往s1": 0.3,
    "s2-前往s3": 0.7,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.1,
    "s4-概率前往": 0.9,
}


# 把输入的两个字符串通过“-”连接,便于使用上述定义的P、R变量
def join(str1, str2):
    return str1 + '-' + str2

In [7]:
gamma = 0.5
# 转化后的MRP的状态转移矩阵
P_from_mdp_to_mrp = [
    [0.5, 0.5, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.5, 0.5],
    [0.0, 0.1, 0.2, 0.2, 0.5],
    [0.0, 0.0, 0.0, 0.0, 1.0],
]

P_from_mdp_to_mrp = np.array(P_from_mdp_to_mrp)
R_from_mdp_to_mrp = [-0.5, -1.5, -1.0, 5.5, 0.0]
V = compute(P_from_mdp_to_mrp, R_from_mdp_to_mrp, gamma, 5)
print(f"根据策略1计算得到的MRP中每个状态价值分别为\n{V}")

根据策略1计算得到的MRP中每个状态价值分别为
[[-1.22555411]
 [-1.67666232]
 [ 0.51890482]
 [ 6.0756193 ]
 [ 0.        ]]


## 蒙特卡洛方法

在马尔科夫决策过程中，state value 其实就是从一个状态 s 出发，采取不同 action 后得到 return 的期望。

$$V^\pi(s)=\mathbb{E}_\pi[G_t|S_t=s]$$

在使用 value iteration 或者 policy iteration 求解最优策略时，都需要我们知道模型的具体参数，即状态转移概率矩阵 P 和奖励函数 R。

$$\mathcal{V}=\mathcal{R}+\gamma\mathcal{PV}$$

但是在实际问题中，我们往往并不知道模型的结构，就比如在一个迷宫中我们只能看到自己眼前的一小块区域，并不知道整个迷宫的路线。所以我们需要一个 model free 的算法来估计 state value。

这就是蒙特卡洛方法的核心：使用多次采样后求均值来估计 state value。
$$V^\pi(s)=\mathbb{E}_\pi[G_t|S_t=s]\approx\frac{1}{N}\sum_{i=1}^NG_t^{(i)}$$

具体计算过程如下：

(1) 随机指定一个初始策略 $\pi$

(2) 根据策略采样 $N$ 条序列

$$s_0^{(i)}\xrightarrow{a_0^{(i)}}r_0^{(i)},s_1^{(i)}\xrightarrow{a_1^{(i)}}r_1^{(i)},s_2^{(i)}\xrightarrow{a_2^{(i)}}\cdots\xrightarrow{a_{T-1}^{(i)}}r_{T-1}^{(i)},s_T^{(i)}$$

(3) 对每一条序列中的每一个时间步 $t$ 的状态进行以下操作：
- 更新状态$s$的计数器 $N(s)\leftarrow N(s)+1$
- 更新状态$s$的总回报 $M(s)\leftarrow M(s)+G_t$

(4) 每一个状态的 state value 被估计为回报的平均值 $V(s)=M(s)/N(s)$

(5) 使用 $V(s)$ 估计新的策略 $\pi$

(6) 重复上述步骤，直到 $V(s)$ 收敛到指定值

接下来定义一个采样函数：

In [14]:
def sample(MDP, Pi, timestep_max, number):
    S, A, P, R, gamma = MDP
    episodes = []
    for _ in range(number):
        episode = []
        timestep = 0
        s = S[np.random.randint(4)] # 随机选择初始状态
        while s != "s5" and timestep <= timestep_max:
            timestep += 1
            rand, temp = np.random.rand(), 0
            for a_opt in A:
                temp += Pi.get(join(s, a_opt), 0)
                if temp > rand:
                    a = a_opt
                    r = R.get(join(s, a), 0)
                    break
            rand, temp = np.random.rand(), 0
            
            # 根据状态转移概率得到下一个状态s_next
            for s_opt in S:
                temp += P.get(join(join(s, a), s_opt), 0)
                if temp > rand:
                    s_next = s_opt
                    break
            episode.append((s, a, r, s_next))
            s = s_next
        episodes.append(episode)
    return episodes

MDP = (S, A, P, R, gamma)

episodes = sample(MDP, Pi_1, 20, 5)
print('第一条序列\n', episodes[0])
print('第二条序列\n', episodes[1])
print('第五条序列\n', episodes[4])

第一条序列
 [('s3', '前往s4', -2, 's4'), ('s4', '前往s5', 10, 's5')]
第二条序列
 [('s3', '前往s5', 0, 's5')]
第五条序列
 [('s3', '前往s4', -2, 's4'), ('s4', '概率前往', 1, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '概率前往', 1, 's4'), ('s4', '前往s5', 10, 's5')]


In [18]:
# 对所有采样序列计算所有 state value
def MC(dpisodes, V, N, gamma):
    for episode in episodes:
        G = 0
        for i in range(len(episode) - 1, -1, -1):
            (s, a, r, s_next) = episode[i]
            G = gamma * G + r
            N[s] += 1
            V[s] += (G - V[s]) / N[s]

timestep_max = 20
episodes = sample(MDP, Pi_1, timestep_max, 1000)
gamma = 0.5
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
MC(episodes, V, N, gamma)
print("使用蒙特卡洛方法计算MDP的状态价值为\n", V)

使用蒙特卡洛方法计算MDP的状态价值为
 {'s1': -1.2216331203480515, 's2': -1.7015842442548526, 's3': 0.4722391543181039, 's4': 6.012219554391401, 's5': 0}


## 占用度量


占用度量用来描述在给定策略下，智能体在各个状态停留的"频率"或"密度"。
$$\rho^\pi(s,a)=(1-\gamma)\sum_{t=0}^\infty\gamma^tP_t^\pi(s)\pi(a|s)$$

In [22]:
def occupancy(episodes, s, a, timestep_max, gamma):
    rho = 0
    total_times = np.zeros(timestep_max)  # 记录每个时间步t各被经历过几次
    occur_times = np.zeros(timestep_max)  # 记录(s_t,a_t)=(s,a)的次数
    for episode in episodes:
        for i in range(len(episode)):
            (s_opt, a_opt, r, s_next) = episode[i]
            total_times[i] += 1
            if s == s_opt and a == a_opt:
                occur_times[i] += 1
    for i in reversed(range(timestep_max)):
        if total_times[i]:
            rho += gamma**i * occur_times[i] / total_times[i]
    return (1 - gamma) * rho


gamma = 0.5
timestep_max = 1000

episodes_1 = sample(MDP, Pi_1, timestep_max, 1000)
episodes_2 = sample(MDP, Pi_2, timestep_max, 1000)
rho_1 = occupancy(episodes_1, "s4", "概率前往", timestep_max, gamma)
rho_2 = occupancy(episodes_2, "s4", "概率前往", timestep_max, gamma)
print(rho_1, rho_2)

0.11227417176395682 0.24183443514107383
