In [9]:
import copy

In [10]:
class CliffWalkingEnv:
    """ 悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 定义网格世界的列
        self.nrow = nrow  # 定义网格世界的行
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
        self.P = self.createP()
        

    def createP(self):
        # 初始化
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
                                                    True)]
                        continue
                    # 其他位置
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

In [32]:
class PolicyIteration:
    def __init__(self,env,theta,gamma):
        self.env = env
        self.theta = theta
        self.gamma = gamma
        self.v = [0] * env.ncol * env.nrow
        self.q = [[0]*4] * env.ncol * env.nrow
        self.pi = [[0.25]*4] * env.ncol * env.nrow
        self.cnt=0
        
    # 策略评估是完成了对于状态价值函数V的更新
    # new_v = v + sum(p(s,a) * (r(s,a) + sum(gamma * v(s,a'))))
    # 第一个sum是：对于状态s下，可能采取的动作a的sum
    # 第二个sum是：对于状态s下采取了动作a，下一时刻可能的状态s‘ （其实对于悬崖迷宫而言，下一时刻是可以确定的）

    # 对于策略评估，注意不需要对他进行一个学习率的考虑，因为我们是进行了当前状态价值函数的估计计算而不是学习 
    
    
    
    def policy_evaluation(self):
        t = 0
        new_v = self.v
        max_dff = 0
        while 1:
            t += 1
            for i in range(len(self.v)):
                for a in range(4):
                    new_q = 0;
                    next_v = self.env.P[i][a]
                    for next_s in (next_v):
                        (p, next_state, r, done) = next_s
                        new_q += p * (r + self.gamma * self.v[next_state]) if done else p*r     
                    print(self.pi[i])
                    new_v[i] += self.pi[i][a] * new_q
                    self.q[i][a] = new_q
            max_diff = abs(max([new_v[tmp] - self.v[tmp] for tmp in range(len(new_v))]))
            if max_diff < self.theta:
                print("the evaluation for policy is done after %d times" % t)
            else:
                self.v = new_v
            break
            
    # 在improvement中，我们同样不需要进行学习率的考量，因为我们直接贪心：将最优的动作价值函数的动作作为策略
    # self.pi = max(Q(s,a)) / (num(max(S,a)))
    def policy_improvement(self):
        for i in range(len(self.pi)):
            self.pi = [1 / (self.q[i].count(max(self.q[i]))) if a_q == max(self.q[i]) else 0 for a_q in self.q[i] ]     
        print("the improvement is done")
        return self.pi
    def policy_iteration(self):
        
        for _ in range(100):
            self.cnt += 1;
            self.policy_evaluation()
            old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较
            new_pi = self.policy_improvement()
            if old_pi == new_pi: 
                print("the learning is done after %d iteration" % self.cnt)
                break
        

In [33]:
def print_agent(agent, action_meaning, disaster=[], end=[]):
    print("状态价值：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 为了输出美观,保持输出6个字符
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]),
                  end=' ')
        print()

    print("策略：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 一些特殊的状态,例如悬崖漫步中的悬崖
            if (i * agent.env.ncol + j) in disaster:
                print('****', end=' ')
            elif (i * agent.env.ncol + j) in end:  # 目标状态
                print('EEEE', end=' ')
            else:
                a = agent.pi[i * agent.env.ncol + j]
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
        print()

In [34]:

env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])

[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]


TypeError: 'float' object is not subscriptable