In [1]:
import numpy as np

In [2]:
np.random.seed(0)
# 定义状态转移概率矩阵P
P = np.array([
    [0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
    [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
    [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
])

rewards = [-1, -2, -2, 10, 1, 0] # 定义奖励函数R(s)
gamma = 0.5 # 定义折扣因子

In [3]:
# 给定一条序列，计算从某个索引（起始状态）开始到序列最后（终止状态）得到的回报
def compute_return(start_index, chain, gamma):
    G = 0
    # 用reverse实现 γ^(i-start_index) 是因为贝尔曼方程用于计算状态值或动作值函数的更新，其中的累积奖励（G）的计算需要考虑未来的奖励。
    # 反向迭代的方式保证了在计算当前状态或动作的累积奖励时，已经考虑了所有未来奖励的折扣，可以保持一致性和准确性。
    for i in reversed(range(start_index, len(chain))):
        G = gamma * G + rewards[chain[i] - 1]
    return G

In [4]:
# 给一个状态序列，s1-s2-s3-s6
chain = [1, 2, 3, 6]
start_index = 0
G = compute_return(start_index, chain, gamma)
print("根据本序列计算得到回报为：%s" % G)

根据本序列计算得到回报为：-2.5


In [5]:
# 计算状态价值函数 V = (E - γP)^(-1)*R
def compute_value(P, R, gamma, states_num):
    R = np.array(R).reshape((-1, 1)) # 展成一列
    I = np.eye(states_num) # 生成一个对角矩阵
    C = I - np.multiply(gamma, P)
    C_1 = np.linalg.inv(C) # 矩阵求逆 用inv和pinv结果不同
    return np.dot(C_1, R)

In [6]:
V = compute_value(P, rewards, gamma, P.shape[0])
print("MRP中每个状态价值为\n", V)

MRP中每个状态价值为
 [[-2.01950168]
 [-2.21451846]
 [ 1.16142785]
 [10.53809283]
 [ 3.58728554]
 [ 0.        ]]


In [7]:
# MDP的计算 转化成MRP 用MRP的方法
S = ["s1", "s2", "s3", "s4", "s5"]  # 状态集合
A = ["保持s1", "前往s1", "前往s2", "前往s3", "前往s4", "前往s5", "概率前往"]  # 动作集合
# 状态转移函数p(s'|s,a) 前者是策略决定 后者是环境决定
P = {
    "s1-保持s1-s1": 1.0,
    "s1-前往s2-s2": 1.0,
    "s2-前往s1-s1": 1.0,
    "s2-前往s3-s3": 1.0,
    "s3-前往s4-s4": 1.0,
    "s3-前往s5-s5": 1.0,
    "s4-前往s5-s5": 1.0,
    "s4-概率前往-s2": 0.2,
    "s4-概率前往-s3": 0.4,
    "s4-概率前往-s4": 0.4,
}
# 奖励函数 这里定义成r(s,a) 这意味着无论从状态s转移到哪个状态s' 奖励的大小都是相同的
R = {
    "s1-保持s1": -1,
    "s1-前往s2": 0,
    "s2-前往s1": -1,
    "s2-前往s3": -2,
    "s3-前往s4": -2,
    "s3-前往s5": 0,
    "s4-前往s5": 10,
    "s4-概率前往": 1,
}
gamma = 0.5  # 折扣因子
MDP = (S, A, P, R, gamma)

# 策略1,随机策略
Pi_1 = {
    "s1-保持s1": 0.5,
    "s1-前往s2": 0.5,
    "s2-前往s1": 0.5,
    "s2-前往s3": 0.5,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.5,
    "s4-概率前往": 0.5,
}
# 策略2
Pi_2 = {
    "s1-保持s1": 0.6,
    "s1-前往s2": 0.4,
    "s2-前往s1": 0.3,
    "s2-前往s3": 0.7,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.1,
    "s4-概率前往": 0.9,
}


# 把输入的两个字符串通过"-"连接 便于使用上述定义的P、R变量
def join(str1, str2):
    return str1 + '-' + str2

In [8]:
# 本例中给定的是R(s',s,a) 故由Pi_1和P的MDP转化得的MRP 用MRP的方式进行计算
MRP = [
    [0.5, 0.5, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.5, 0.5],
    [0.0, 0.1, 0.2, 0.2, 0.5],
    [0.0, 0.0, 0.0, 0.0, 1.0],
]
MRP = np.array(MRP)
# 由R和Pi_1得到的奖励矩阵
R = [-0.5, -1.5, -1.0, 5.5, 0]

V = compute_value(MRP, R, gamma, len(S))
print("MRP中每个状态价值为\n", V)

MRP中每个状态价值为
 [[-1.22555411]
 [-1.67666232]
 [ 0.51890482]
 [ 6.0756193 ]
 [ 0.        ]]


In [9]:
def sample(MDP, Pi, timestep_max, number):
    """蒙特卡洛法采样序列
        MDP 马尔可夫参数
        Pi 策略
        timestep_max 限制最长时间步
        number 总共采样序列数
    """
    S, A, P, R, gamma = MDP
    episodes = []
    # 采样number次
    for _ in range(number):
        episode = []
        timestep = 0
        # 随机初始化一个状态s0 这里不让它一开始就是s5
        s = S[np.random.randint(0, 4)]
        # 如果采样数达到最大或到达最终状态s5 停止
        while timestep < timestep_max and s != "s5":
            timestep += 1
            rand, temp = np.random.rand(), 0
            # 由策略&当前状态 选择当前动作
            for a_opt in A:
                temp += Pi.get(join(s,a_opt), 0)
                if rand < temp:
                    a = a_opt
                    r = R.get(join(s, a_opt), 0)
                    break
            # 由状态转移矩阵&当前状态&当前动作 选择下一个状态
            rand, temp = np.random.rand(), 0
            for s_opt in S:
                temp += P.get(join(join(s,a), s_opt), 0)
                if rand < temp:
                    s_next = s_opt
                    break
            # 放到池子里
            episode.append((s, a, r, s_next))
            s = s_next
        episodes.append(episode)
    return episodes

In [10]:
# 采样5次 最大20步 示例
timestep_max, number = 20, 5
episodes = sample(MDP, Pi_1, timestep_max, number)
print("第一条序列：\n", episodes[0])
print("第二条序列：\n", episodes[1])
print("第三条序列：\n", episodes[2])
print("第四条序列：\n", episodes[3])
print("第五条序列：\n", episodes[4])

第一条序列：
 [('s1', '前往s2', 0, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s5', 0, 's5')]
第二条序列：
 [('s4', '概率前往', 1, 's4'), ('s4', '前往s5', 10, 's5')]
第三条序列：
 [('s4', '前往s5', 10, 's5')]
第四条序列：
 [('s2', '前往s1', -1, 's1'), ('s1', '保持s1', -1, 's1'), ('s1', '前往s2', 0, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '前往s5', 10, 's5')]
第五条序列：
 [('s2', '前往s3', -2, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '前往s5', 10, 's5')]


In [13]:
def MC(episodes, V, N, gamma):
    """
        蒙特卡洛法，每当s_i出现，计算对应的G_t，累加后取均值
    """
    for episode in episodes:
        G = 0
        # 从后往前计算
        for i in range(len(episode) - 1, -1, -1):
            (s, a, r, s_next) = episode[i]
            G = r + gamma * G
            N[s] += 1
            # 和对比老虎机一样 常用的累加计算方式
            V[s] += (G - V[s]) / N[s]

In [14]:
# 采样1000次 每次20步 可以自行修改
episodes = sample(MDP, Pi_1, 20, 1000)
gamma = 0.5
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
MC(episodes, V, N, gamma)
print("使用蒙特卡洛方法计算MDP的状态价值为\n", V)

使用蒙特卡洛方法计算MDP的状态价值为
 {'s1': -1.213994389377003, 's2': -1.6519564617977478, 's3': 0.5647939497667427, 's4': 6.193547684776152, 's5': 0}


In [None]:
import copy


class CliffWalkingEnv:
    """ 悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 定义网格世界的列
        self.nrow = nrow  # 定义网格世界的行
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
        self.P = self.createP()

    def createP(self):
        # 初始化
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        # 4种动作 change[0]:上 change[1]:下 change[2]:左 change[3]:右 坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态因为无法继续交互 任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
                                                    True)]
                        continue
                    # 其他位置
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

In [None]:
class PolicyIteration:
    """ 策略迭代算法 """
    def __init__(self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0
        self.pi = [[0.25, 0.25, 0.25, 0.25]
                   for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略
        self.theta = theta  # 策略评估收敛阈值
        self.gamma = gamma  # 折扣因子

    def policy_evaluation(self):  # 策略评估
        cnt = 1  # 计数器
        while 1:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):
                qsa_list = []  # 开始计算状态s下的所有Q(s,a)价值
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(self.pi[s][a] * qsa)
                new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break  # 满足收敛条件 退出评估迭代
            cnt += 1
        print("策略评估进行%d轮后完成" % cnt)

    def policy_improvement(self):  # 策略提升
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
            # 让这些动作均分概率
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
        print("策略提升完成")
        return self.pi

    def policy_iteration(self):  # 策略迭代
        while 1:
            self.policy_evaluation()
            old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较
            new_pi = self.policy_improvement()
            if old_pi == new_pi: break