In [6]:
import copy


class CliffWalkingEnv:
    """ 
    悬崖漫步环境
    有一个 4×12 的网格世界，每一个网格表示一个状态。
    智能体的起点是左下角的状态，目标是右下角的状态，
    智能体在每一个状态都可以采取 4 种动作：上、下、左、右。
    如果智能体采取动作后触碰到边界墙壁则状态不发生改变，否则就会相应到达下一个状态。
    环境中有一段悬崖，智能体掉入悬崖或到达目标状态都会结束动作并回到起点，也就是说掉入悬崖或者达到目标状态是终止状态。
    智能体每走一步的奖励是 −1，掉入悬崖的奖励是 −100。
    """
    def __init__(self, ncol=12, nrow=4):
        '''
        初始化悬崖漫步环境
        ncol: 网格世界的列
        nrow: 网格世界的行
        '''
        
        # 定义网格世界的列
        self.ncol = ncol
        
        # 定义网格世界的行
        self.nrow = nrow  
        
        # 创建转移矩阵P
        # P[state][action] = [(p(状态转移概率), next_state(下一个状态), reward(奖励), done(结束标识)]
        self.P = self.createP()

    def createP(self):
        '''
        创建状态转移矩阵
        
        P[state][action] = [(p, next_state, reward, done)]
        state_num = self.nrow * self.ncol，整个棋盘的格子数
        action_num = 4, 上、下、左、右
        '''
        
        # 初始化状态转移矩阵P
        # P[state][action]
        # state_num = self.nrow * self.ncol 状态的数量
        # action_num = 4, 上、下、左、右 动作的数量
        P = [
                [
                    [] for j in range(4)
                ] for i in range(self.nrow * self.ncol)
            ]
        
        
        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。
        # 坐标系原点(0,0)定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        
        # 遍历地图每一行
        for i in range(self.nrow):
            # 遍历地图每一列
            for j in range(self.ncol):
                # 遍历每一个动作
                for a in range(4):
                    # 当前位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        # 设置对应的状态转移矩阵的元素
                        # P[state][action] = [(p, next_state, reward, done)]
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]
                        continue
                        
                    # 通过动作获得下一个坐标
                    # 范围[0, self.ncol - 1]
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))  
                    # 范围[0, self.nrow - 1]
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))  
                    
                    # 下一个状态
                    next_state = next_y * self.ncol + next_x
                    
                    # 进入下一个状态的奖励
                    reward = -1
                    done = False
                    
                    # 如果下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        
                        # 下一个位置在悬崖
                        # 下边界在悬崖, 右边界不在悬崖
                        if next_x != self.ncol - 1:  
                            reward = -100
                    
                    # 设置对应的状态转移矩阵的元素
                    # P[state][action] = [(p, next_state, reward, done)]
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

In [7]:
class PolicyIteration:
    """ 
    策略迭代算法 
    """
    def __init__(self, env, theta, gamma):
        """
        初始化
        env: 环境
        theta: 策略评估收敛阈值
        gamma: 回报折扣因子
        """
        
        # 环境
        self.env = env
        
        # 初始化状态价值为 [0] * 地图格子数
        self.v = [0] * self.env.ncol * self.env.nrow  
        
        # 初始化策略为均匀随机策略
        # 即在每个状态下选择不同动作的概率
        self.pi = [
            [0.25, 0.25, 0.25, 0.25]
            for i in range(self.env.ncol * self.env.nrow)]  
        
        # 策略评估收敛阈值
        self.theta = theta  
        
        # 折扣因子
        self.gamma = gamma  

    def policy_evaluation(self):  
        """
        策略评估过程
        更新状态价值函数 v(s)
        """
        # 计数器
        cnt = 1
        
        # 开始策略评估过程
        while 1:
            max_diff = 0
            
            # 创建新的状态价值
            new_v = [0] * self.env.ncol * self.env.nrow
            
            # 对于每个状态s
            for s in range(self.env.ncol * self.env.nrow):
                # 开始计算状态s下的所有Q(s,a)价值
                qsa_list = []  
                
                # 遍历动作，获得当前状态s下该动作a的价值
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        # 获得(s, a) 对应的概率， 下一个状态，奖励，结束标识
                        p, next_state, r, done = res
                        
                        # 计算获得 Q(s, a),奖励和下一个状态有关,所以需要和状态转移概率相乘
                        # 这里主要更新的点就在于v函数的更新，每次完整遍历后会更新一遍v,从而导致下一次的策略评估发生变化
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                        
                    # 将qsa添加到qsa_list,需要乘以对应的动作概率，即策略函数
                    qsa_list.append(self.pi[s][a] * qsa)
                    
                # 状态价值函数和动作价值函数之间的关系
                # 当前的状态的价值 = 该状态下每个动作的价值求和
                new_v[s] = sum(qsa_list)  
                
                # 更新状态价值的最大差，用来判断是否应该停止评估
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
                
            # 更新状态价值函数
            self.v = new_v
            
            # 满足收敛条件,退出评估迭代
            if max_diff < self.theta: 
                break  
                
            # 未收敛，迭代次数+1
            cnt += 1
            
        print("策略评估进行%d轮后完成" % cnt)

    def policy_improvement(self): 
        """
        策略提升
        更新策略 pi(s)
        """
        # 遍历所有的状态
        for s in range(self.env.nrow * self.env.ncol):
            # 遍历该状态下的所有动作，获得每个动作的Q(s,a)
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    # 获得(s, a) 对应的概率， 下一个状态，奖励，结束标识
                    p, next_state, r, done = res
                    
                    # 计算获得 Q(s, a), 奖励和下一个状态有关,所以需要和状态转移概率相乘
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                
                qsa_list.append(qsa)
                
            # 获得最大的 Q(s,a), 可能有多个动作获得了最大的期望
            maxq = max(qsa_list)
            
            # 计算有几个动作得到了最大的Q值
            cntq = qsa_list.count(maxq)
            
            # 让这些maxq动作均分概率
            self.pi[s] = [1 / cntq if q == maxq 
                          else 0 
                          for q in qsa_list]
        print("策略提升完成")
        
        # 返回更新后的策略 pi
        return self.pi

    def policy_iteration(self):
        '''
        策略迭代
        1. 策略评估 
        2. 策略提升
        '''
        
        while 1:
            # 策略评估，更新状态价值函数 v
            self.policy_evaluation()
            
            # 深拷贝策略pi, 方便后续比较
            old_pi = copy.deepcopy(self.pi)  
            
            # 更新策略 pi
            new_pi = self.policy_improvement()
            
            # 判断是否结束迭代
            if old_pi == new_pi: 
                print("策略迭代完成！！！！！！")
                break

In [8]:
def print_agent(agent, action_meaning, disaster=[], end=[]):
    '''
    打印状态价值与策略
    agent: 智能体
    action_meaning: 动作意义
    disaster: 悬崖状态列表
    end: 结束状态列表
    
    '''
    
    # 打印各个状态价值
    print("状态价值：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 为了输出美观,保持输出6个字符
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]),
                  end=' ')
        print()

    # 打印各个状态下的策略
    print("策略：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 打印悬崖状态
            if (i * agent.env.ncol + j) in disaster:
                print('****', end=' ')
            # 打印目标状态
            elif (i * agent.env.ncol + j) in end:  
                print('EEEE', end=' ')
            # 打印其他状态
            else:
                # 获取该状态下的策略，即各个动作的概率
                a = agent.pi[i * agent.env.ncol + j]
                
                # 遍历四个动作 
                # 当该动作的概率大于0时，打印该动作符号，否则打印 'o'
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
                
        print()


# 创建悬崖漫步环境环境
env = CliffWalkingEnv()

# 创建动作符号
action_meaning = ['^', 'v', '<', '>']

# 策略评估收敛阈值
theta = 0.001
# 回报折扣因子
gamma = 0.9

# 创建策略迭代智能体
agent = PolicyIteration(env, theta, gamma)

# 智能体进行策略迭代
agent.policy_iteration()

# 设置悬崖状态
disaster = list(range(37, 47))
# 设置目标状态
end = [47]

# 打印迭代后的智能体的状态价值与策略
print_agent(agent, action_meaning, disaster, end)

# 策略评估进行60轮后完成
# 策略提升完成
# 策略评估进行72轮后完成
# 策略提升完成
# 策略评估进行44轮后完成
# 策略提升完成
# 策略评估进行12轮后完成
# 策略提升完成
# 策略评估进行1轮后完成
# 策略提升完成
# 状态价值：
# -7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710
# -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900
# -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000
# -7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000
# 策略：
# ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
# ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
# ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
# ^ooo **** **** **** **** **** **** **** **** **** **** EEEE

策略评估进行60轮后完成
策略提升完成
策略评估进行72轮后完成
策略提升完成
策略评估进行44轮后完成
策略提升完成
策略评估进行12轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
策略迭代完成！！！！！！
状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


In [9]:
class ValueIteration:
    """ 
    价值迭代算法
    可以看做一种策略评估只进行了一轮更新的策略迭代算法
    """
    
    def __init__(self, env, theta, gamma):
        '''
        初始化
        env: 环境
        theta: 价值收敛阈值
        gamma: 回报折扣因子
        '''
        
        # 设置环境
        self.env = env
        
        # 初始化所有状态的价值为 0
        self.v = [0] * self.env.ncol * self.env.nrow 
        
        # 价值收敛阈值
        self.theta = theta
        
        # 回报折扣因子
        self.gamma = gamma
        
        # 价值迭代结束后得到的策略
        self.pi = [None for i in range(self.env.ncol * self.env.nrow)]

    def value_iteration(self):
        '''
        价值迭代
        '''
        
        cnt = 0
        while 1:
            max_diff = 0
            
            # 新建状态价值
            new_v = [0] * self.env.ncol * self.env.nrow
            
            # 遍历所有状态
            for s in range(self.env.ncol * self.env.nrow):
                
                # 开始计算状态s下的所有Q(s,a)价值
                qsa_list = []  
                # 遍历该状态下的四个动作
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        
                        # 计算 Q(s, a), 这一步需要考虑转换到下一状态的状态转移概率
                        qsa += p * (r + self.gamma * self.v[next_state] *(1 - done))
                        
                    # 这里如果是策略迭代，需要乘以每个动作的概率
                    # 价值迭代不需要
                    qsa_list.append(qsa)  
                    
                # 获取当前状态的最大 Q(s, a)
                # 这里是价值迭代与策略迭代最大的区别
                # 对于价值迭代来说，这里的策略是明确的，直接选取最大的Q(s,a),不需要动作概率
                new_v[s] = max(qsa_list)
                
                # 状态价值的更新最大差
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
                
            # 更新状态价值
            self.v = new_v
            
            # 检查状态价值更新最大差，满足收敛条件,退出评估迭代
            if max_diff < self.theta: 
                break
            
            # 迭代次数+1
            cnt += 1
            
            
        print("价值迭代一共进行%d轮" % cnt)
        
        # 根据状态价值获取策略
        self.get_policy()

    def get_policy(self):
        '''
        根据价值函数导出一个贪婪策略
        '''
        # 遍历所有状态
        for s in range(self.env.nrow * self.env.ncol):
            # 获取该状态s下每个动作的Q(s,a),即动作价值函数
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
                
            # 获取最大的动作价值
            maxq = max(qsa_list)
            
            # 计算有几个动作得到了最大的Q值
            cntq = qsa_list.count(maxq)
            
            # 让这些动作均分概率
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]


# 创建环境
env = CliffWalkingEnv()

# 定义动作符号
action_meaning = ['^', 'v', '<', '>']

# 价值收敛阈值
theta = 0.001

# 回报折扣因子
gamma = 0.9

# 创建价值迭代智能体
agent = ValueIteration(env, theta, gamma)

# 进行价值迭代
agent.value_iteration()

# 打印最终的状态价值与策略
print_agent(agent, action_meaning, list(range(37, 47)), [47])

# 价值迭代一共进行14轮
# 效率比策略迭代更快
# 状态价值：
# -7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710
# -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900
# -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000
# -7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000
# 策略：
# ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
# ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
# ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
# ^ooo **** **** **** **** **** **** **** **** **** **** EEEE

价值迭代一共进行14轮
状态价值：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


In [10]:
import gym
# 创建 gym 环境
env = gym.make("FrozenLake-v1")  
# 创建环境后需要reset,否则无法正常显示
env.reset()
# 解封装才能访问状态转移矩阵P
env = env.unwrapped 
# 环境渲染,通常是弹窗显示或打印出可视化的环境
env.render()  

# 冰湖环境的洞
holes = set()
# 冰湖环境的目标
ends = set()

# 遍历状态
for s in env.P:
    # 遍历动作
    for a in env.P[s]:
        # 获得 状态-动作对
        for s_ in env.P[s][a]:
            # [p, next_state, reward, done]
            # 获得奖励为1,代表是目标
            if s_[2] == 1.0:  
                ends.add(s_[1])
            # 洞或者目标结束done, 检查是否是洞或者目标，最终需要洞的堆减去目标的堆
            if s_[3] == True:
                holes.add(s_[1])
                
# 洞和目标都是done的状态，真正的洞需要减去目标
holes = holes - ends
print("冰洞的索引:", holes)
print("目标的索引:", ends)

# 查看目标左边一格的状态转移信息
# 每一个动作可能会导致不同的前往不同的状态，由于 is_slippery
for a in env.P[14]:  
    print(env.P[14][a])

    

# SFFF
# FHFH
# FFFH
# HFFG
# 起点状态（索引为 0）在左上角
# 冰洞的索引: {11, 12, 5, 7}
# 目标的索引: {15}
# 每个动作都会等概率“滑行”到 3 种可能的结果
# [(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False),
#  (0.3333333333333333, 14, 0.0, False)]
# [(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False),
#  (0.3333333333333333, 15, 1.0, True)]
# [(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True),
#  (0.3333333333333333, 10, 0.0, False)]
# [(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False),
#  (0.3333333333333333, 13, 0.0, False)]

冰洞的索引: {11, 12, 5, 7}
目标的索引: {15}
[(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)]
[(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)]
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
[(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]


In [11]:
# 这个动作意义是Gym库针对冰湖环境事先规定好的
action_meaning = ['<', 'v', '>', '^']

# 策略价值收敛阈值
theta = 1e-5

# 回报折扣因子
gamma = 0.9

# 获取策略迭代智能体
agent = PolicyIteration(env, theta, gamma)

# 进行策略迭代
agent.policy_iteration()

# 打印策略迭代之后的状态价值和策略
hole_as_disaster = [5, 7, 11, 12]
end = [15]
print_agent(agent, action_meaning, hole_as_disaster, end)

# 策略评估进行25轮后完成
# 策略提升完成
# 策略评估进行58轮后完成
# 策略提升完成
# 状态价值：
#  0.069  0.061  0.074  0.056
#  0.092  0.000  0.112  0.000
#  0.145  0.247  0.300  0.000
#  0.000  0.380  0.639  0.000
# 策略：
# <ooo ooo^ <ooo ooo^
# <ooo **** <o>o ****
# ooo^ ovoo <ooo ****
# **** oo>o ovoo EEEE

# 这个最优策略很看上去比较反直觉，其原因是这是一个智能体会随机滑向其他状态的冰冻湖面。
# 例如，在目标左边一格的状态，采取向右的动作时，它有可能会滑到目标左上角的位置，
# 从该位置再次到达目标会更加困难，所以此时采取向下的动作是更为保险的，并且有一定概率能够滑到目标。

策略评估进行25轮后完成
策略提升完成
策略评估进行58轮后完成
策略提升完成
策略迭代完成！！！！！！
状态价值：
 0.069  0.061  0.074  0.056 
 0.092  0.000  0.112  0.000 
 0.145  0.247  0.300  0.000 
 0.000  0.380  0.639  0.000 
策略：
<ooo ooo^ <ooo ooo^ 
<ooo **** <o>o **** 
ooo^ ovoo <ooo **** 
**** oo>o ovoo EEEE 


In [13]:
# 定义动作符号
action_meaning = ['<', 'v', '>', '^']

# 价值收敛阈值
theta = 1e-5

# 回报折扣因子
gamma = 0.9

# 价值迭代智能体
agent = ValueIteration(env, theta, gamma)

# 进行价值迭代
agent.value_iteration()

# 打印状态价值和策略
hole_as_disaster = [5, 7, 11, 12]
end = [15]
print_agent(agent, action_meaning, hole_as_disaster, end)

# 价值迭代一共进行60轮
# 状态价值：
#  0.069  0.061  0.074  0.056
#  0.092  0.000  0.112  0.000
#  0.145  0.247  0.300  0.000
#  0.000  0.380  0.639  0.000
# 策略：
# <ooo ooo^ <ooo ooo^
# <ooo **** <o>o ****
# ooo^ ovoo <ooo ****
# **** oo>o ovoo EEEE

# 价值迭代算法的结果和策略迭代算法的结果完全一致
# 靠滑行的概率来完成

价值迭代一共进行60轮
状态价值：
 0.069  0.061  0.074  0.056 
 0.092  0.000  0.112  0.000 
 0.145  0.247  0.300  0.000 
 0.000  0.380  0.639  0.000 
策略：
<ooo ooo^ <ooo ooo^ 
<ooo **** <o>o **** 
ooo^ ovoo <ooo **** 
**** oo>o ovoo EEEE 
