In [1]:
import numpy as np
np.random.seed(0)
import scipy
import gym

In [2]:
env = gym.make('CliffWalking-v0')
env.seed(0)

[0]

In [3]:
def play_once(env, policy):
    total_reward = 0
    state = env.reset()
    while True:
        loc = np.unravel_index(state, env.shape)
        #print('状态 = {}, 位置 = {}'.format(state, loc), end=' ')
        action = np.random.choice(env.action_space.n, p=policy[state])
        next_state, reward, done, _ = env.step(action)
        #print('动作 = {}, 奖励 = {}'.format(action, reward))
        total_reward += reward
        if done:
            break
        state = next_state
    return total_reward

In [4]:
def v2q(env, v, s=None, gamma=1.): # 根据状态价值函数计算动作价值函数
    if s is not None: # 针对单个状态求解
        q = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for prob, next_state, reward, done in env.unwrapped.P[s][a]:
                q[a] += prob * \
                        (reward + gamma * v[next_state] * (1. - done))
    else: # 针对所有状态求解
        q = np.zeros((env.observation_space.n, env.action_space.n))
        for s in range(env.observation_space.n):
            q[s] = v2q(env, v, s, gamma)
    return q

def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):
    v = np.zeros(env.observation_space.n) # 初始化状态价值函数
    while True:
        delta =0
        for s in range(env.observation_space.n):
            vs=sum(policy[s]*v2q(env,v,s,gamma))#v状态价值，s当前状态
            delta=max(delta,abs(v[s]-vs))
            v[s]=vs
        if delta<tolerant:
            break
    return v

In [5]:
def improve_policy(env, v, policy, gamma=1.):
    optimal = True
    for s in range(env.observation_space.n):
        olda=policy[s]
        q=v2q(env,v,s,gamma)
        m=np.max(q)
        policy[s]=0
        for i in range(env.action_space.n):
            if q[i]==m:
                policy[s][i]=1
        policy[s]/=np.sum(policy[s])#最大价值的动作平分概率，其余为0
        if not (olda==policy[s]).all():
            optimal=False
    # 课堂练习
    return optimal

In [6]:
def iterate_policy(env, gamma=1., tolerant=1e-6):
     # 初始化为任意一个策略
    policy = np.ones((env.observation_space.n, env.action_space.n)) \
            / env.action_space.n
    while True:
        v = evaluate_policy(env, policy, gamma, tolerant) # 策略评估
        if improve_policy(env, v, policy): # 策略改进
            break
    return policy, v

In [7]:
policy_pi, v_pi = iterate_policy(env)
print('状态价值函数 =')
print(v_pi.reshape(4,12))
print('最优策略 =')
print(np.argmax(policy_pi, axis=1).reshape(4,12))


状态价值函数 =
[[-65104.83314117 -65038.57833489 -64886.59708602 -64615.82401465
  -64172.32023635 -63470.84818323 -62382.03534589 -60720.8672752
  -58253.73785602 -54780.58207859 -50443.22378433 -46534.93775419]
 [-65167.08795145 -65120.30478046 -65001.38891151 -64784.55472454
  -64426.28851411 -63854.18897036 -62950.39058209 -61522.82862645
  -59255.76421692 -55640.78459788 -50010.15152246 -42622.65172613]
 [-65272.12593572 -65270.16392598 -65210.09905702 -65090.71745986
  -64890.09012716 -64565.22860396 -64038.50938757 -63160.29243348
  -61601.70578911 -58512.64057522 -51329.94598295 -31318.86590286]
 [-65375.12593272 -65399.38543103 -65409.11921379 -65379.2738145
  -65329.11698133 -65247.90160053 -65116.22179643 -64896.66755791
  -64507.02089681 -63734.75459334 -45570.54946209 -24199.2479589 ]]
最优策略 =
[[1 1 1 1 1 1 1 1 1 1 1 2]
 [0 1 1 1 1 1 1 1 1 1 1 2]
 [0 0 0 0 0 0 0 0 1 1 1 2]
 [0 0 0 0 0 0 0 0 0 0 1 1]]


In [8]:
episode_rewards = [play_once(env, policy_pi)  for _ in range(100)]
print("策略迭代 平均奖励：{}".format(np.mean(episode_rewards)))

策略迭代 平均奖励：-17.0


In [9]:
def iterate_value(env, gamma=1, tolerant=1e-6):
    v = np.zeros(env.observation_space.n)
    while True:
        delta=0
        for s in range(env.observation_space.n):
            vs=np.max(v2q(env,v,s,gamma))
            delta=max(delta,abs(v[s]-vs))
            v[s]=vs
        if delta<tolerant:
            break
    policy=v2q(env,v)
    for s in range(env.observation_space.n):
        m=np.argmax(policy[s])
        policy[s]=0
        policy[s][m]=1
    # 课堂练习
    return policy, v

In [10]:
policy_pi, v_pi = iterate_policy(env)
print('状态价值函数 =')
print(v_pi.reshape(4,12))
print('最优策略 =')
print(np.argmax(policy_pi, axis=1).reshape(4,12))


状态价值函数 =
[[-65104.83314117 -65038.57833489 -64886.59708602 -64615.82401465
  -64172.32023635 -63470.84818323 -62382.03534589 -60720.8672752
  -58253.73785602 -54780.58207859 -50443.22378433 -46534.93775419]
 [-65167.08795145 -65120.30478046 -65001.38891151 -64784.55472454
  -64426.28851411 -63854.18897036 -62950.39058209 -61522.82862645
  -59255.76421692 -55640.78459788 -50010.15152246 -42622.65172613]
 [-65272.12593572 -65270.16392598 -65210.09905702 -65090.71745986
  -64890.09012716 -64565.22860396 -64038.50938757 -63160.29243348
  -61601.70578911 -58512.64057522 -51329.94598295 -31318.86590286]
 [-65375.12593272 -65399.38543103 -65409.11921379 -65379.2738145
  -65329.11698133 -65247.90160053 -65116.22179643 -64896.66755791
  -64507.02089681 -63734.75459334 -45570.54946209 -24199.2479589 ]]
最优策略 =
[[1 1 1 1 1 1 1 1 1 1 1 2]
 [0 1 1 1 1 1 1 1 1 1 1 2]
 [0 0 0 0 0 0 0 0 1 1 1 2]
 [0 0 0 0 0 0 0 0 0 0 1 1]]


In [11]:
episode_rewards = [play_once(env, policy_pi)  for _ in range(100)]
print("策略迭代 平均奖励：{}".format(np.mean(episode_rewards)))

策略迭代 平均奖励：-17.0
