# 策略迭代 & 价值迭代

In [1]:
import gym
import numpy as np

In [2]:
env = gym.make('FrozenLake-v1')
print('观察空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space)) # 左下右上
print('观测空间大小 = {}'.format(env.observation_space.n))
print('动作空间大小 = {}'.format(env.action_space.n))
print(env.unwrapped.P[14][2]) # unwrapped 是一个获取原始对象的方法,在这里不适用也可以

观察空间 = Discrete(16)
动作空间 = Discrete(4)
观测空间大小 = 16
动作空间大小 = 4
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]


In [3]:
def play_policy(env, policy, render=False):
    total_reward = 0
    observation, _ = env.reset()
    while True:
        if render:
            env.render()
        action = np.random.choice(env.action_space.n, p=policy[observation])
        observation, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        if terminated or truncated:
            break
    return total_reward

使用随机策略玩一把

In [4]:
random_policy = np.ones((env.observation_space.n, env.action_space.n)) /env.action_space.n
# 向四个方向的概率都是一样的
episode_rewards = [play_policy(env, random_policy) for _ in range(10000)]
print(f'使用随机策略下的平均奖励是: {np.mean(episode_rewards)}')

使用随机策略下的平均奖励是: 0.0145


策略评估

In [5]:
def v2q_Recursion(env, v, s=None, gamma=1.0):
    # 效率慢
    if s is not None:
        q = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for prob, next_state, reward, terminated in env.P[s][a]:
                q[a] += prob * (reward + gamma * v[next_state] * (1 - terminated))
    else:
        q = np.zeros((env.observation_space.n, env.action_space.n))
        for s in range(env.observation_space.n):
            q[s] = v2q_Recursion(env, v, s, gamma)
    return q

def v2q(env, v, gamma=1.0):
    q = np.zeros((env.observation_space.n, env.action_space.n))
    for s in range(env.observation_space.n):
        for a in range(env.action_space.n):
            for prob, next_state, reward, terminated in env.P[s][a]:
                q[s,a] += prob * (reward + gamma * v[next_state] * (1 - terminated))
                # 加上 terminated 之后就把结束状态考虑进去了
    return q

def evaluate_policy(env, policy, gamma=1.0, tolerant=1e-6):
    v = np.zeros(env.observation_space.n)
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            vs = sum(policy[s] * v2q_Recursion(env, v, s, gamma))
            # 因为每次都会更新 v 所以算出来的值是不一样的, 上面的那种写法会很方便的求出q
            delta = max(delta, abs(v[s]-vs))
            v[s] = vs
        if delta < tolerant:
            break
    return v

测试

In [6]:
v_random = evaluate_policy(env, random_policy)
print('状态价值函数:')
print(v_random.reshape(4,4))
print('动作价值函数:')
print(v2q_Recursion(env, v_random))

状态价值函数:
[[0.0139372  0.01162942 0.02095187 0.01047569]
 [0.01624741 0.         0.04075119 0.        ]
 [0.03480561 0.08816967 0.14205297 0.        ]
 [0.         0.17582021 0.43929104 0.        ]]
动作价值函数:
[[0.01470727 0.01393801 0.01393801 0.01316794]
 [0.00852221 0.01162969 0.01086043 0.01550616]
 [0.02444416 0.0209521  0.02405958 0.01435233]
 [0.01047585 0.01047585 0.00698379 0.01396775]
 [0.02166341 0.01701767 0.0162476  0.01006154]
 [0.         0.         0.         0.        ]
 [0.05433495 0.04735099 0.05433495 0.00698396]
 [0.         0.         0.         0.        ]
 [0.01701767 0.04099176 0.03480569 0.04640756]
 [0.0702086  0.11755959 0.10595772 0.05895286]
 [0.18940397 0.17582024 0.16001408 0.04297362]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.08799662 0.20503708 0.23442697 0.17582024]
 [0.25238807 0.53837042 0.52711467 0.43929106]
 [0.         0.         0.         0.        ]]


In [7]:
def improve_policy(env, v, policy, gamma=1.0):
    optimal = True
    for s in range(env.observation_space.n):
        q = v2q_Recursion(env, v, s, gamma)
        a = np.argmax(q)
        if policy[s][a] != 1.0:
            optimal = False
            policy[s] = 0.0
            policy[s][a] = 1.0
    return optimal

对随机策略进行改进

In [8]:
policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy) # optimal == True 表示已经到达了最优了, 没有更新
print(policy)

[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]


In [9]:
bepisode_rewards = [play_policy(env, policy) for _ in range(100)]
print(f'使用优化后的策略下的平均奖励是: {np.mean(episode_rewards)}')

使用优化后的策略下的平均奖励是: 0.0145


策略迭代

In [10]:
def iterate_policy(env, gamma=1.0, tolerant=1e-9): # 迭代中又有迭代, 所以速度很慢
    # 生成一个随机的动作
    policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
    while True: # 一直迭代到收敛为止
        v = evaluate_policy(env, policy, gamma, tolerant) # 一次迭代
        if improve_policy(env, v, policy):
            break
    return policy, v

In [11]:
policy, v = iterate_policy(env)
print(v.reshape(4, 4))
print(np.argmax(policy, axis=1).reshape(4, 4))

[[0.8235294  0.82352939 0.82352939 0.82352938]
 [0.8235294  0.         0.52941175 0.        ]
 [0.8235294  0.8235294  0.76470587 0.        ]
 [0.         0.88235293 0.94117647 0.        ]]
[[0 3 3 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]


In [12]:
episode_rewards = [play_policy(env, policy)  for _ in range(100)]
print("策略迭代 平均奖励：{}".format(np.mean(episode_rewards)))

策略迭代 平均奖励：0.8


价值迭代

In [15]:
def iterate_value(env, gamma=1.0, tolerant=1e-6):
    v = np.zeros(env.observation_space.n)
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            vmax = max(v2q_Recursion(env, v, s, gamma))
            delta = max(delta, abs(v[s] - vmax))
            v[s] = vmax
        if delta < tolerant: 
            break
    policy = np.zeros((env.observation_space.n, env.action_space.n))
    for s in range(env.observation_space.n):
        a = np.argmax(v2q_Recursion(env, v, s, gamma))
        policy[s][a] = 1.0
    return policy, v

In [16]:
policy, v = iterate_value(env)
print(v.reshape(4, 4))
print(np.argmax(policy, axis=1).reshape(4, 4))

[[0.82351232 0.82350671 0.82350281 0.82350083]
 [0.82351404 0.         0.52940011 0.        ]
 [0.82351673 0.82352018 0.76469779 0.        ]
 [0.         0.88234653 0.94117321 0.        ]]
[[0 3 3 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]


In [17]:
episode_rewards = [play_policy(env, policy) for _ in range(100)]
print("价值迭代 平均奖励：{}".format(np.mean(episode_rewards)))

价值迭代 平均奖励：0.76
