In [3]:
import numpy as np
np.random.seed(0)
# 定义状态转移概率矩阵P
P = [
    [0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
    [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
    [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
]
P = np.array(P)

rewards = [-1, -2, -2, 10, 1, 0]  # 定义奖励函数
gamma = 0.5  # 定义折扣因子


# 给定一条序列,计算从某个索引（起始状态）开始到序列最后（终止状态）得到的回报
def compute_return(start_index, chain, gamma):
    G = 0
    for i in reversed(range(start_index, len(chain))):
        G = gamma * G + rewards[chain[i] - 1]
    return G


# 一个状态序列,s1-s2-s3-s6
chain = [1, 2, 3, 6]
start_index = 0
G = compute_return(start_index, chain, gamma)
print("根据本序列计算得到回报为：%s。" % G)

根据本序列计算得到回报为：-2.5。


In [4]:
def compute(P, rewards, gamma, states_num):
    ''' 利用贝尔曼方程的矩阵形式计算解析解,states_num是MRP的状态数 '''
    rewards = np.array(rewards).reshape((-1, 1))  #将rewards写成列向量形式
    value = np.dot(np.linalg.inv(np.eye(states_num, states_num) - gamma * P),
                   rewards)
    return value


V = compute(P, rewards, gamma, 6)
print("MRP中每个状态价值分别为\n", V)

MRP中每个状态价值分别为
 [[-2.01950168]
 [-2.21451846]
 [ 1.16142785]
 [10.53809283]
 [ 3.58728554]
 [ 0.        ]]


In [5]:
S = ["s1", "s2", "s3", "s4", "s5"]  # 状态集合
A = ["保持s1", "前往s1", "前往s2", "前往s3", "前往s4", "前往s5", "概率前往"]  # 动作集合
# 状态转移函数
P = {
    "s1-保持s1-s1": 1.0,
    "s1-前往s2-s2": 1.0,
    "s2-前往s1-s1": 1.0,
    "s2-前往s3-s3": 1.0,
    "s3-前往s4-s4": 1.0,
    "s3-前往s5-s5": 1.0,
    "s4-前往s5-s5": 1.0,
    "s4-概率前往-s2": 0.2,
    "s4-概率前往-s3": 0.4,
    "s4-概率前往-s4": 0.4,
}
# 奖励函数
R = {
    "s1-保持s1": -1,
    "s1-前往s2": 0,
    "s2-前往s1": -1,
    "s2-前往s3": -2,
    "s3-前往s4": -2,
    "s3-前往s5": 0,
    "s4-前往s5": 10,
    "s4-概率前往": 1,
}
gamma = 0.5  # 折扣因子
MDP = (S, A, P, R, gamma)
# 策略1,随机策略
Pi_1 = {
    "s1-保持s1": 0.5,
    "s1-前往s2": 0.5,
    "s2-前往s1": 0.5,
    "s2-前往s3": 0.5,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.5,
    "s4-概率前往": 0.5,
}
# 策略2
Pi_2 = {
    "s1-保持s1": 0.6,
    "s1-前往s2": 0.4,
    "s2-前往s1": 0.3,
    "s2-前往s3": 0.7,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.1,
    "s4-概率前往": 0.9,
}
# 把输入的两个字符串通过“-”连接,便于使用上述定义的P、R变量
def join(str1, str2):
    return str1 + '-' + str2

In [6]:
gamma = 0.5
P_from_mdp_to_mrp = [
    [0.5, 0.5, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.5, 0.5],
    [0.0, 0.1, 0.2, 0.2, 0.5],
    [0.0, 0.0, 0.0, 0.0, 1.0],
]
P_from_mdp_to_mrp = np.array(P_from_mdp_to_mrp)
R_from_mdp_to_mrp = [-0.5, -1.5, -1.0, 5.5, 0]

V = compute(P_from_mdp_to_mrp, R_from_mdp_to_mrp, gamma, 5)
print("MDP中每个状态价值分别为\n", V)

MDP中每个状态价值分别为
 [[-1.22555411]
 [-1.67666232]
 [ 0.51890482]
 [ 6.0756193 ]
 [ 0.        ]]


In [None]:
def sample(MDP ,Pi, timestep_max, number):
    ''' 采样函数,策略Pi,限制最长时间步timestep_max,总共采样序列数number '''
    S, A, P, R, gamma = MDP# 提取MDP相关参数
    episodes = []# 储存总采样序列
    for i in range(number):# 循环number次， 进行序列采样
        episode = []# 初始化采样序列
        timestep = 0# 初始数时间节点
        s = S[np.random.randint(4)]# 随机选择一个除s5以外的状态s作为起点
        while s != "s5" and timestep <= timestep_max:#* 循环，当前状态为终止状态或者时间步太长时,一次采样结束，注意，逻辑错误，错误搞成了while s== "s5" or timestep >= timestep_max:
            timestep += 1# 时间步增加
            rand, temp = np.random.rand(), 0 #* 创建随机概率与策略概率
            for a in A:# 在状态s下根据策略选择动作并获得奖励（累加轮盘赌选择）
                temp += Pi.get(join(s,a), 0)#
                if rand < temp:#* 此处应该是小于而非大于
                    r = R[join(s,a)]#
                    break#
            rand, temp = np.random.rand(), 0#* 重置随机概率与策略概率
            for  s_next in S:# 根据状态转移概率得到下一个状态s_next（累加轮盘赌选择）
                temp += P.get(join(join(s ,a), s_next), 0)#
                if rand < temp:#*
                    break#
            episode.append((s ,a, r, s_next))# 把（s,a,r,s_next）元组放入序列中
            s = s_next# s_next变成当前状态,开始接下来的循环
        episodes.append(episode)# 扩展总采样序列
    return episodes# 返回总采样序列

episodes = sample(MDP ,Pi_1, 20, 5)# 采样5次,每个序列最长不超过20步
print('第一条序列\n', episodes[0])
print('第二条序列\n', episodes[1])
print('第五条序列\n', episodes[4])

第一条序列
 [('s2', '前往s3', -2, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '前往s5', 10, 's5')]
第二条序列
 [('s4', '概率前往', 1, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s5', 0, 's5')]
第五条序列
 [('s4', '前往s5', 10, 's5')]


In [None]:
def MC(episodes, V, N, gamma):
    """对所有采样序列计算所有状态的价值"""
    for episode in episodes:# 循环，提取每一幕的数据
        G = 0# 初始化回报
        for i in range(len(episode) -1 , -1 , -1): #* 一个序列从后往前计算回报
            (s, a, r, s_next) = episode[i] #*
            G = gamma * G + r#
            N[s] += 1#
            V[s] = V[s] + 1/N[s]*(G - V[s])#

timestep_max = 20
episodes = sample(MDP ,Pi_1, 20, 1000)# 采样1000次,可以自行修改
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}# 初始化价值函数
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}# 初始化状态被访问次数
MC(episodes, V, N, gamma)# 运行蒙特卡洛方法
print("使用蒙特卡洛方法计算MDP的状态价值为\n", V)

使用蒙特卡洛方法计算MDP的状态价值为
 {'s1': -1.22211291493596, 's2': -1.663971217074737, 's3': 0.5021584130065688, 's4': 6.0046082707616, 's5': 0}


In [None]:
def occupancy(timestep_max, episodes):
    ''' 计算状态动作对（s,a）出现的频率,以此来估算策略的占用度量 '''
    rho = 0# 初始化占用度量
    total_times = np.zeros(timestep_max)# 记录每个时间步t各被经历过几次
    occur_times = np.zeros(timestep_max)# 记录(s_t,a_t)=(s,a)的次数
    for episode in episodes:# 循环，提取每一幕的数据
        for i in range(len(episode)):#循环，提取每一时间步的数据
            (s, a, r, s_next) = episode[i]# 提取状态，动作，奖励，下一个状态
            total_times[i] += 1 # 累计时间步t被经历的次数
            if s == "s4" and a == "概率前往":# 如果状态和动作与输入的相同
                occur_times[i] += 1# 累计时间步t状态动作对出现的次数
    for i in range(timestep_max):# 计算
        if total_times[i] != 0:# 如果时间步t被经历过
            rho += gamma**i * (occur_times[i]/total_times[i])#* 计算占用度量
    return (1 - gamma) * rho# 返回占用度量

gamma = 0.5
timestep_max = 1000
np.random.seed(0)
episodes_1 = sample(MDP, Pi_1, timestep_max, 1000)
episodes_2 = sample(MDP, Pi_2, timestep_max, 1000)
rho_1 = occupancy(timestep_max, episodes_1)# 计算episodes_1"s4", "概率前往"的占用度量
rho_2 = occupancy(timestep_max, episodes_2)# 计算episodes_2"s4", "概率前往"的占用度量
print(rho_1, rho_2)

0.10933789486751244 0.22662902209217917
