# 策略迭代 Policy Iteration 基于贝尔曼公式

## 悬崖漫步环境

In [1]:
import copy
class CliffWalkingEnv:
    """ 悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 定义网格世界的列
        self.nrow = nrow  # 定义网格世界的行
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
        self.P = self.createP()

    def createP(self):
        # 初始化P
        #P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        nS = self.nrow * self.ncol   # 48 个状态
        nA = 4                       # 4 个动作
        P = []                       # 最外层大列表
        for i in range(nS):          # i = 0,1,2,…,47
            tmp = []                 # 准备放当前状态的所有动作
            for j in range(nA):      # j = 0,1,2,3
                tmp.append([])       # 先放一个空列表
            P.append(tmp)            # 把这一行挂到大列表上

        # 4种动作, change[0]，即[0,-1]是向上走,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角，坐标系原点 (0,0) 定义在左上角，往右和下坐标值会变大
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]

        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,True)]
                        continue
                    # 其他位置
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

In [2]:
env = CliffWalkingEnv(ncol=12, nrow=4)
env.P

[[[(1, 0, -1, False)],
  [(1, 12, -1, False)],
  [(1, 0, -1, False)],
  [(1, 1, -1, False)]],
 [[(1, 1, -1, False)],
  [(1, 13, -1, False)],
  [(1, 0, -1, False)],
  [(1, 2, -1, False)]],
 [[(1, 2, -1, False)],
  [(1, 14, -1, False)],
  [(1, 1, -1, False)],
  [(1, 3, -1, False)]],
 [[(1, 3, -1, False)],
  [(1, 15, -1, False)],
  [(1, 2, -1, False)],
  [(1, 4, -1, False)]],
 [[(1, 4, -1, False)],
  [(1, 16, -1, False)],
  [(1, 3, -1, False)],
  [(1, 5, -1, False)]],
 [[(1, 5, -1, False)],
  [(1, 17, -1, False)],
  [(1, 4, -1, False)],
  [(1, 6, -1, False)]],
 [[(1, 6, -1, False)],
  [(1, 18, -1, False)],
  [(1, 5, -1, False)],
  [(1, 7, -1, False)]],
 [[(1, 7, -1, False)],
  [(1, 19, -1, False)],
  [(1, 6, -1, False)],
  [(1, 8, -1, False)]],
 [[(1, 8, -1, False)],
  [(1, 20, -1, False)],
  [(1, 7, -1, False)],
  [(1, 9, -1, False)]],
 [[(1, 9, -1, False)],
  [(1, 21, -1, False)],
  [(1, 8, -1, False)],
  [(1, 10, -1, False)]],
 [[(1, 10, -1, False)],
  [(1, 22, -1, False)],
  [(1, 9, -

## Policy Iteration

In [3]:
class PolicyIteration:
    """ 策略迭代算法 """
    def __init__(self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow  # 初始化48个state的state value都为0
        # 初始化策略，策略迭代要初始化策略
        self.pi = [[0.25, 0.25, 0.25, 0.25]
                   for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略
        #pi是个二维列表，包含48个元素，每个元素是包含4个元素的一维列表，表示每个状态采取动作的概率
        self.theta = theta  # 策略评估收敛阈值
        self.gamma = gamma  # 折扣因子

    def policy_evaluation(self):   # 策略评估，迭代法计算根据贝尔曼求state value
        cnt = 1   #计数器，为什么这里是1不是0，因为后面只有不满足结束条件才+1,当满足条件跳出循环时直接跳出来不加1了
        while 1:  #无限循环，只有break是才跳出循环
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow    #初始化每个状态的state value,全都是0
            for s in range(self.env.ncol * self.env.nrow):  #s从0-47，48个状态
                qsa_list = []         # 开始计算状态s下的所有Q(s,a)价值，里面会存4个值，一个状态下4个动作的qsa
                for a in range(4):    #确定了一个状态，开始求它的4个动作的qsa
                    qsa = 0
                    for p, next_state, r, done in self.env.P[s][a]:    #调取P中的(p,next_state,reward,done),s,a到达next_state的概率p，这就是model-based
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))  #这里算q就用到了p，而MC算q不用p,是采样求平均
                        # 本章环境比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘
                    qsa_list.append(self.pi[s][a] * qsa)
                new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系，求出这个状态的state value,但是个中间量，不是最终的
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break  # 满足收敛条件,退出评估迭代
            cnt += 1
        print("策略评估进行%d轮后完成" % cnt)
        #这里输出的策略评估进行了多少轮完成的意思是求解每个state的state value使用迭代法迭代了多少轮

    def policy_improvement(self):  # 策略提升
        for s in range(self.env.nrow * self.env.ncol):    #每一个状态
            qsa_list = []            #里面会存4个值，一个状态下4个动作的qsa
            for a in range(4):       #确定了一个状态，开始求它的4个动作的qsa
                qsa = 0
                for p, next_state, r, done in self.env.P[s][a]:
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            count_q = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
            # 让这些动作均分概率
            self.pi[s] = [1 / count_q if q == maxq else 0    for q in qsa_list]
            # 首先1/count_q 已经确定了概率是多少，然后看是在4个位置中的哪个，如果这个q=maxq,那么这个位置就是这个概率，否则是0
        print("策略提升完成")
        return self.pi

    def policy_iteration(self):  # 策略迭代
        while 1:  #无线循环，除非触发后面的break
            self.policy_evaluation()            #调用policy_evaluation()，就是用迭代求解贝尔曼方程，求每个state的state value
            old_pi = copy.deepcopy(self.pi)     # 将列表进行深拷贝,方便接下来进行比较 ，把上一步的那个策略记录下来
            new_pi = self.policy_improvement()  #调用policy_improvement()改进策略，新策略存起来
            if old_pi == new_pi: break          #直到新旧两次策略一样，停止

## 算法文字描述
先随便初始化一个策略，然后算出这个策略下的v,计算方法就是迭代法(或解析法)
<br>然后计算每个状态的每个动作值q(s,a),要用到刚刚的v，用贝尔曼公式
<br>然后根据算出来的每个状态的每个动作值q(s,a)，选择最大的q(s,a),更新策略，
<br>如果这个策略和前一次的不一样，则再计算这个策略下的v,重复前面的操作，知道前后两次策略一样为止

## 训练

In [4]:
#环境
env = CliffWalkingEnv()
theta = 0.001
gamma = 0.9
#实例化算法
agent = PolicyIteration(env, theta, gamma)
#开始迭代
agent.policy_iteration()    #至此迭代就结束了，已经找出来策略，下面是打印

策略评估进行60轮后完成
策略提升完成
策略评估进行72轮后完成
策略提升完成
策略评估进行44轮后完成
策略提升完成
策略评估进行12轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成


In [5]:
agent.pi

[[0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 1.0, 0, 0],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 0.5, 0, 0.5],
 [0, 1.0, 0, 0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 0, 0, 1.0],
 [0, 1.0, 0, 0],
 [1.0, 0, 0, 0],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25],
 [0.25, 0.25, 0.25, 0.25]]

## 至此迭代完成，已经找出了策略

In [6]:
action_meaning = ['^', 'v', '<', '>']
def print_agent(agent, action_meaning, disaster=[], end=[]):
    print("状态价值：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 为了输出美观,保持输出6个字符
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')    #对state value数值进行操作
        print()  #打印完一行换行

    print("策略：")
    for i in range(agent.env.nrow):   #先看第一行
        for j in range(agent.env.ncol):  #第一行的各个列
            # 一些特殊的状态,例如悬崖漫步中的悬崖
            if (i * agent.env.ncol + j) in disaster:   #如果是悬崖
                print('****', end=' ')
            elif (i * agent.env.ncol + j) in end:      # 目标状态
                print('EEEE', end=' ')
            else:
                a = agent.pi[i * agent.env.ncol + j]  #第一行，正好i=0,看是第几列
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
        print()

In [7]:
#打印每个状态的state value和策略
print_agent(agent, action_meaning, list(range(37, 47)), [47])

状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 
