In [1]:
# 动态规划算法求解最优策略
import copy

In [8]:
'''悬崖漫步环境定义'''
class CliffWalkingEnv:
    def __init__(self,ncol=12, nrow=4):
        # 定义网格。行和列
        self.ncol = ncol
        self.nrow = nrow
        self.actions = [[0, -1], [0, 1], [-1, 0], [1, 0]] # 行动
        # 转移矩阵p
        # 元素结构 (p, next_state, reward, done) 概率，下一个状态，奖励
        # 访问方法 p[state][action] 由当前状态和action共同决定
        self.P = self.createP()
    
    def createP(self):
        # 建空list,外层是状态数 row * col，内层是动作数(4,上下左右移动)
        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # (0,0)在左下角
        P = [[[]for j in range(4)]for i in range(self.ncol * self.nrow)]
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]

        # 丰富状态
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 当前状态是终止状态：到达目的地或者在悬崖,任何动作奖励都为0
                    # 其下一个状态就是本身
                    if i == self.nrow - 1 and  j > 0:
                        P[i*self.ncol + j][a] = [(1,i * self.ncol + j,0,True)]
                        continue
                    
                    # 其他状态
                    reward = -1 # 奖励
                    done=False
                    p = 1
                    next_col = min(self.ncol-1,max(0,j + change[a][0])) # 限定范围
                    next_row = min(self.nrow-1,max(0,i + change[a][1]))
                    next_state = next_row * self.ncol + next_col

                    # 如果下一个位置在悬崖或者终点，需要更新reward和done
                    if next_row == self.nrow - 1 and next_col > 0:
                        done = True
                        # 在悬崖
                        if next_col != self.ncol - 1:
                            reward = -100
                    
                    P[i * self.ncol + j][a] = [(p,next_state,reward,done)]
        
        return P

In [27]:
'''策略迭代算法'''
class PolicyIteration:
    def __init__(self,env,theta,gamma):
        # MDP需要有：S,A,GAMMA,P, R 环境env能够提供

        self.env = env # 环境
        self.theta = theta # 收敛阈值
        self.gamma = gamma # 折扣因子

        # 状态价值函数，和状态有关，大小 env.ncol * env.nrow
        self.v = [0] * self.env.ncol * self.env.nrow 
        # 策略，初始化 是 均匀策略
        self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)]
    
    '''策略迭代是策略评估和策略提升交替进行'''
    def policy_evaluation(self):
        '''策略评估,修改的是state-value，self.v'''
        '''流程：根据策略函数pi,先生成动作价值函数，然后计算状态价值函数，然后更新，比较'''
        cnt = 1
        while 1:
            # 更新state-value function
            max_diff = 0 # 新状态和旧状态value之间最大的差距
            new_v = [0] * self.env.ncol * self.env.nrow
            # 遍历所有状态
            for s in range(self.env.ncol * self.env.nrow):
                qsa_list = []
                for a in range(4):
                    # 遍历行为,此时s,a已经确定
                    qsa = 0
                    # 遍历这种s，a状态下的转移可能性，计算qsa
                    for res in self.env.P[s][a]:
                        p, next_state, r,done = res
                        qsa += p * (r + self.gamma* self.v[next_state] * (1 - done))
                    qsa_list.append(self.pi[s][a] * qsa)
                
                # 根据state-value 和 action-value的关系，计算特定状态s下的state-value
                new_v[s] = sum(qsa_list)
                max_diff = max(max_diff,abs(new_v[s] - self.v[s]))
            
            self.v = new_v #更新状态value
            #终止条件，state-value不再变化
            if max_diff < self.theta: break
            cnt += 1
        print("策略评估进行%d轮后完成" % cnt)
                        
    def policy_improvement(self):
        '''策略提升,修改的是当前策略,self.pi'''
        '''以状态为单位进行更新'''
        for s in range(self.env.ncol * self.env.nrow):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p*(r + self.gamma * self.v[next_state]*(1-done))
                qsa_list.append(qsa)
            
            #这里确定，在当前状态s下，采取的能使return最大的action，也就是action-value最高的行为
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq) # 最大action-value的个数
            # 这些动作平分概率，更新策略函数
            self.pi[s] = [1/cntq if q == maxq else 0 for q in qsa_list]
        
        print("策略提升完成")
        return self.pi

    # 策略迭代
    def policy_iteration(self):
        while 1:
            # 策略评估，也就是根据当前策略更新状态价值函数直到稳定
            self.policy_evaluation()

            # 策略提升，按照更新的状态价值，更新新的策略函数
            old_pi = copy.deepcopy(self.pi) # 保留之前的pi用于比较
            new_pi = self.policy_improvement()

            # 终止条件，pi策略不再更新
            if old_pi == new_pi: break

In [32]:
'''价值迭代算法'''
class ValueIteration:
    def __init__(self, env, theta, gamma):
        self.env = env
        self.theta = theta
        self.gamma = gamma

        self.v = [0] * self.env.nrow * self.env.ncol
        # 价值迭代结束后，会得到pi，一开始没有pi
        self.pi = [None for i in range(self.env.nrow * self.env.ncol)]

    
    def value_iteration(self):
        '''更新state-value'''
        cnt = 0
        while 1:
            max_diff = 0
            new_v = [0] * self.env.nrow * self.env.ncol

            for s in range(self.env.nrow * self.env.ncol):
                qsa_list = []
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r,done = res
                        qsa += p * (r + self.gamma * self.v[next_state]*(1-done))
                    qsa_list.append(qsa)
                
                #!!!!!!这是价值迭代和策略迭代的主要区别
                new_v[s] = max(qsa_list) # 直接就确定，选择使得价值最大的动作
                max_diff = max(max_diff,abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break
            cnt += 1
        print("价值迭代一共进行%d轮" % cnt)

        # 更新了state-value以后，更新policy
        self.get_policy()
    
    def get_policy(self):
        # 根据稳定的状态价值函数，导出对应的策略，这里和策略提升的过程是一样的
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state,r,done = res
                    qsa += p * (r + self.gamma*self.v[next_state]*(1-done))
                qsa_list.append(qsa)
            
            # 找到最大value的行为
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)
            # 均分概率
            self.pi[s] = [1/cntq if q == maxq else 0 for q in qsa_list]
                    

In [19]:
# 工具函数，用于打印policy 打印当前策略在每个状态下的价值以及智能体会采取的动作

def print_agent(agent, action_meaning, disaster = [],end = []):
    '''
    disaster : 非正常终止的状态，比如悬崖状态
    end：目标终止状态
    '''
    # 打印agent.v
    print('状态价值(state-value)：')
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')
        print()
    
    # 打印policy
    print("策略policy：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            
            s = i * agent.env.ncol + j
            if s in disaster:
                print('****', end=' ')
            elif s in end:
                print('EEEE', end=' ')
            else:
                a = agent.pi[s] # 当前状态下，策略分布
                pi_str = ''
                # 对每种动作进行遍历
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str,end=" ")
        print()

In [31]:
# 定义环境
col = 12
row = 4
env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>'] # 与环境中的action对应
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env,theta,gamma)
agent.policy_iteration()


# 计算悬崖状态和end
end = [row * col -1]
disaster_s = (row-1) * col + 1
disaster_e = row*col - 1
disaster = list(range(disaster_s,disaster_e))

print_agent(agent,action_meaning,disaster,end)

策略评估进行60轮后完成
策略提升完成
策略评估进行72轮后完成
策略提升完成
策略评估进行44轮后完成
策略提升完成
策略评估进行12轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
状态价值(state-value)：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略policy：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 


In [33]:
# 价值迭代
env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9

# 计算悬崖状态和end
col = 12
row = 4
end = [row * col -1]
disaster_s = (row-1) * col + 1
disaster_e = row*col - 1
disaster = list(range(disaster_s,disaster_e))

agent = ValueIteration(env,theta,gamma)
agent.value_iteration()
print_agent(agent,action_meaning,disaster,end)

价值迭代一共进行14轮
状态价值(state-value)：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
策略policy：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 
