In [1]:
import numpy as np
np.random.seed(0)

In [2]:
# 定义状态转移概率矩阵P
P = [
    [0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
    [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
    [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
]

P = np.array(P)

rewards = [-1, -2, -2, 10, 1, 0]  # 定义奖励函数
gamma = 0.5  # 定义折扣因子

In [4]:
# 给定一条序列,计算从某个索引（起始状态）开始到序列最后（终止状态）得到的回报

from copy import deepcopy
def compute_return(start_index, chain, gamma):
    G = 0
    ret = []
    for i in reversed(range(start_index, len(chain))): ## 运算的方向要倒着来
        g_ = deepcopy(G)
        G = gamma * G + rewards[chain[i] - 1]       ## 从后往前依次运算求出结果
        ret.append([G, gamma, g_, rewards[chain[i] - 1], chain[i]]) ## 保存
    '''
    逆向，先算后面的，
     0.0 = 0.5 *  0     +   0         R_6
    -2.0 = 0.5 *  0     +  -2         R_6->R_3
    -3.0 = 0.5 * -2.0   +  -2         R_3->R_2
    -2.5 = 0.5 * -3.0   +  -1         R_2->R_1
   ret=    [[ 0.0, 0.5,  0,    0, 6],
            [-2.0, 0.5,  0.0, -2, 3],
            [-3.0, 0.5, -2.0, -2, 2],
            [-2.5, 0.5, -3.0, -1, 1]]
    '''
    return G

In [5]:
# 一个状态序列,s1-s2-s3-s6
chain = [1, 2, 3, 6]
start_index = 0
G = compute_return(start_index, chain, gamma)
print("根据本序列计算得到回报为：%s。" % G)

根据本序列计算得到回报为：-2.5。


In [6]:
def compute(P, rewards, gamma, states_num):
    ''' 利用贝尔曼方程的矩阵形式计算解析解,states_num是MRP的状态数 '''
    rewards = np.array(rewards).reshape((-1, 1))  #将rewards写成列向量形式
    value = np.dot(np.linalg.inv(np.eye(states_num, states_num) - gamma * P),
                   rewards) # 对应求解析解的公式，V=(1-γP)^(-1)r
    return value


V = compute(P, rewards, gamma, 6)
print("MRP中每个状态价值分别为\n", V)

MRP中每个状态价值分别为
 [[-2.01950168]
 [-2.21451846]
 [ 1.16142785]
 [10.53809283]
 [ 3.58728554]
 [ 0.        ]]


In [7]:
S = ["s1", "s2", "s3", "s4", "s5"]  # 状态集合
A = ["保持s1", "前往s1", "前往s2", "前往s3", "前往s4", "前往s5", "概率前往"]  # 动作集合
# 状态转移函数
P = {
    "s1-保持s1-s1": 1.0,
    "s1-前往s2-s2": 1.0,
    "s2-前往s1-s1": 1.0,
    "s2-前往s3-s3": 1.0,
    "s3-前往s4-s4": 1.0,
    "s3-前往s5-s5": 1.0,
    "s4-前往s5-s5": 1.0,
    "s4-概率前往-s2": 0.2,
    "s4-概率前往-s3": 0.4,
    "s4-概率前往-s4": 0.4,
}
# 奖励函数
R = {
    "s1-保持s1": -1,
    "s1-前往s2": 0,
    "s2-前往s1": -1,
    "s2-前往s3": -2,
    "s3-前往s4": -2,
    "s3-前往s5": 0,
    "s4-前往s5": 10,
    "s4-概率前往": 1,
}
gamma = 0.5  # 折扣因子
MDP = (S, A, P, R, gamma)

# 策略1,随机策略
Pi_1 = {
    "s1-保持s1": 0.5,
    "s1-前往s2": 0.5,
    "s2-前往s1": 0.5,
    "s2-前往s3": 0.5,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.5,
    "s4-概率前往": 0.5,
}
# 策略2
Pi_2 = {
    "s1-保持s1": 0.6,
    "s1-前往s2": 0.4,
    "s2-前往s1": 0.3,
    "s2-前往s3": 0.7,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.1,
    "s4-概率前往": 0.9,
}


# 把输入的两个字符串通过“-”连接,便于使用上述定义的P、R变量
def join(str1, str2):
    return str1 + '-' + str2

In [8]:
gamma = 0.5
# 转化后的MRP的状态转移矩阵
P_from_mdp_to_mrp = [
    [0.5, 0.5, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.5, 0.5],
    [0.0, 0.1, 0.2, 0.2, 0.5],
    [0.0, 0.0, 0.0, 0.0, 1.0],
]

'''
P(1|1) = Pi_1["s1-保持s1"] * P["s1-保持s1-s1"] = 1 * 0.5 = 0.5
P(2|1) = Pi_1["s1-前往s2"] * P["s1-前往s2-s2"] = 1 * 0.5 = 0.5
P(1|2) = Pi_1["s2-前往s1"] * P["s2-前往s1-s1"] = 1 * 0.5 = 0.5
P(3|2) = Pi_1["s2-前往s3"] * P["s2-前往s3-s3"] = 1 * 0.5 = 0.5
P(4|3) = Pi_1["s3-前往s4"] * P["s3-前往s4-s4"] = 1 * 0.5 = 0.5
P(5|3) = Pi_1["s3-前往s5"] * P["s3-前往s5-s5"] = 1 * 0.5 = 0.5
P(2|4) = Pi_1["s4-概率前往"] * P["s4-概率前往-s2"] = 0.5 * 0.2 = 0.1
P(3|4) = Pi_1["s4-概率前往"] * P["s4-概率前往-s3"] = 0.5 * 0.4 = 0.2
P(4|4) = Pi_1["s4-概率前往"] * P["s4-概率前往-s4"] = 0.5 * 0.4 = 0.2
P(5|4) = Pi_1["s4-前往s5"] * P["s4-前往s5-s5"] = 0.5 * 1 = 0.5
P(5|5) = 1
'''

P_from_mdp_to_mrp = np.array(P_from_mdp_to_mrp)


R_from_mdp_to_mrp = [-0.5, -1.5, -1.0, 5.5, 0]
'''
r(s1) = Pi_1["s1-保持s1"] * R["s1-保持s1"] + Pi_1["s1-前往s2"] * R["s1-前往s2"] = -1 * 0.5 + 0 * 0.5 = -0.5
r(s2) = Pi_1["s2-前往s1"] * R["s2-前往s1"] + Pi_1["s2-前往s3"] * R["s2-前往s3"] = -1 * 0.5 + -2 * 0.5 = -1.5
r(s3) = Pi_1["s3-前往s4"] * R["s3-前往s4"] + Pi_1["s3-前往s5"] * R["s3-前往s5"] = -2 * 0.5 + 0 * 0.5 = -1
r(s4) = Pi_1["s4-前往s5"] * R["s4-前往s5"] + Pi_1["s4-概率前往"] * R["s4-概率前往"] = 10 * 0.5 + 1 * 0.5 = 5.5
r(s5) = 0
也就求出了R_from_mdp_to_mrp 奖励函数的
'''

V = compute(P_from_mdp_to_mrp, R_from_mdp_to_mrp, gamma, 5)
print("MDP中每个状态价值分别为\n", V)

MDP中每个状态价值分别为
 [[-1.22555411]
 [-1.67666232]
 [ 0.51890482]
 [ 6.0756193 ]
 [ 0.        ]]


In [9]:
def sample(MDP, Pi, timestep_max, number):
    ''' 采样函数,策略Pi,限制最长时间步timestep_max,总共采样序列数number '''
    S, A, P, R, gamma = MDP
    episodes = []
    for _ in range(number):
        episode = []
        timestep = 0
        s = S[np.random.randint(4)]  # 随机选择一个除s5以外的状态s作为起点
        # 当前状态为终止状态或者时间步太长时,一次采样结束
        while s != "s5" and timestep <= timestep_max:
            timestep += 1
            rand, temp = np.random.rand(), 0   ## 初始化概率，以及动作概率是0
            # 在状态s下根据策略选择动作
            for a_opt in A:       ## 遍历每个动作
                temp += Pi.get(join(s, a_opt), 0)    ## 拿到状态+动作的名称，然后从策略Pi_1内拿到对应的动作概率
                if temp > rand:   ## 该状态和动作对应的概率，满足概率条件，则执行动作action
                    a = a_opt        ## 执行动作的action，a是动作名称
                    r = R.get(join(s, a), 0)    ## 拿到状态+动作的名称，然后从状态+动作的奖励 R 内拿到对应的动作奖励
                    break                       ## 已经执行了动作，退出动作的循环
            rand, temp = np.random.rand(), 0    ## 初始化概率，以及状态的概率
            # 根据状态转移概率得到下一个状态s_next
            for s_opt in S:   ## 遍历状态列表
                temp += P.get(join(join(s, a), s_opt), 0)  ## 组合了状态+动作+转移的状态名称，然后从状态+动作+转移状态的概率 P 内拿到对应的转移概率
                if temp > rand:      ##    满足概率条件，则执行状态转移
                    s_next = s_opt   ##    转移到的状态是 s_next
                    break            ##    已经转移了状态，退出状态的循环
            ## s是当前的状态，a是执行的动作，r是执行动作的奖励，s_next是执行动作以后转移到的状态
            episode.append((s, a, r, s_next))  # 把（s,a,r,s_next）元组放入序列中
            s = s_next  # s_next变成当前状态,开始接下来的循环
        episodes.append(episode)
    return episodes

In [10]:
def occupancy(episodes, s, a, timestep_max, gamma):
    ''' 计算状态动作对（s,a）出现的频率,以此来估算策略的占用度量 '''
    rho = 0
    total_times = np.zeros(timestep_max)  # 记录每个时间步t各被经历过几次
    occur_times = np.zeros(timestep_max)  # 记录(s_t,a_t)=(s,a)的次数
    for episode in episodes:              ## 遍历每个采样的内容
        for i in range(len(episode)):     ## 遍历每个采样内部的状态、动作的内容
            (s_opt, a_opt, r, s_next) = episode[i]   ## 拿到每个采样内的 状态、动作、动作的奖励、下一个状态
            total_times[i] += 1                      ## 统计序列所在index的次数
            if s == s_opt and a == a_opt:            ## 状态、动作和给定的相同
                occur_times[i] += 1                  ## 也就是（状态，动作）对的次数+1
    for i in reversed(range(timestep_max)):       ## 逆序算占用度量
        if total_times[i]:                        ## 序列所在的index有值
            rho += gamma**i * occur_times[i] / total_times[i]        ## 按照公式运算
    return (1 - gamma) * rho

In [11]:
gamma = 0.5
timestep_max = 1000

episodes_1 = sample(MDP, Pi_1, timestep_max, 1000)
episodes_2 = sample(MDP, Pi_2, timestep_max, 1000)
rho_1 = occupancy(episodes_1, "s4", "概率前往", timestep_max, gamma)
rho_2 = occupancy(episodes_2, "s4", "概率前往", timestep_max, gamma)
print(rho_1, rho_2)

0.10933789486751247 0.22662902209217917


[1]: https://zhuanlan.zhihu.com/p/655615836

[1]: https://zhuanlan.zhihu.com/p/655615836