In [None]:
import copy


class CliffWalkingEnv:
    """ 悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 定义网格世界的列
        self.nrow = nrow  # 定义网格世界的行
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
        self.P = self.createP()

    def createP(self):
        # 初始化
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        # 第一个参数是概率，表示在当前状态下采取动作a后，以概率p转移到下一个状态
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
                                                    True)]
                        continue
                    # 其他位置
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

In [None]:
class PolicyIteration:
    def __init__(self, env, gamma, theta):
        self.env = env
        self.gamma = gamma
        self.theta = theta
        self.V = [0 for _ in range(env.nrow * env.ncol)]
        self.pi = [[0.25 for _ in range(4)] for _ in range(env.nrow * env.ncol)] # 初始化策略
    
    def policy_evaluation(self):
        cnt = 1  # 记录迭代次数
        while True:
            max_diff = 0
            new_v = [0] * (self.env.nrow * self.env.ncol)
            for s in range(self.env.nrow * self.env.ncol):
                qsa_list = []
                for a in range(4):
                    qsa = 0
                    for prob, next_state, reward, done in self.env.P[s][a]:
                        qsa += prob * (reward + self.gamma * self.V[next_state] * (1 - done))
                    qsa_list.append(qsa * self.pi[s][a])
                
                new_v[s] = sum(qsa_list)
                max_diff = max(max_diff, abs(new_v[s] - self.V[s]))
            self.V = new_v
            if max_diff < self.theta:
                break
            cnt += 1
        return cnt

1