# 编程作业
- 使用第三课的编程作业中的格子迷宫环境。需要实现以下算法。 (在更新算法时，必须时无模型的，即不能使用 , ), 可以与第三课的作业进行对比
- (在策略) 使用增量式蒙特卡洛的方法，计算随机策略的值函数。
- 使用常量步长，计算随机策略的值函数。对比不同的值函数初始化下 (比如把值函数的初始值设为一个均值很大的正态分布等常量步长和一般增量式蒙特卡洛的收敛性。
- 实现 GLIE 蒙特卡洛优化算法，求出最优策略。
- 尝试在蒙特卡洛方法中使用贪婪的策略提升,是否能找出最优策略?跟什么有关?
- (离策略) 选择行为策略 μ 为随机策略，然后使用增量式离策略每次拜访蒙特卡洛优化算法，求出此时的最优策略。

In [2]:
import numpy as np
from code3_DP import Env
import time
import copy
WORLD_SIZE = 5
N_ACTIONS = 4  # [North, South, West, East]
gamma = 0.9

In [3]:
class Agent:
    def random_policy(self, s):
        return np.random.randint(0, 4)

In [4]:
def reset(self):
    return np.random.randint(0, WORLD_SIZE, size=2)

Agent.reset = reset

In [5]:
def MCSample(env, policy, init_s, T=100):
    episode = []
    s = init_s
    for t in range(T):
        a = policy(s)
        s_n, r = env.step(s, a)
        episode.append(dict(s=s, a=a, r=r))
        s = s_n
    return episode

In [6]:
env = Env()
agent = Agent()
MCSample(env, agent.random_policy, [0, 0], T=10)

[{'s': [0, 0], 'a': 0, 'r': -1.0},
 {'s': [0, 0], 'a': 2, 'r': -1.0},
 {'s': [0, 0], 'a': 3, 'r': 0.0},
 {'s': [0, 1], 'a': 0, 'r': 10.0},
 {'s': [4, 1], 'a': 2, 'r': 0.0},
 {'s': [4, 0], 'a': 3, 'r': 0.0},
 {'s': [4, 1], 'a': 0, 'r': 0.0},
 {'s': [3, 1], 'a': 0, 'r': 0.0},
 {'s': [2, 1], 'a': 2, 'r': 0.0},
 {'s': [2, 0], 'a': 1, 'r': 0.0}]

In [7]:
random_V = random_policy_evaluation(2000, 200)
print(random_V)

[[ 3.3258646   8.839572    4.5535564   5.311384    1.6135999 ]
 [ 1.5139716   2.992304    2.31242     1.8493847   0.5372376 ]
 [ 0.12444662  0.77103907  0.7184072   0.35925037 -0.4071942 ]
 [-0.91814077 -0.4002903  -0.32358122 -0.5665105  -1.2136256 ]
 [-1.811033   -1.3002293  -1.1849865  -1.3909183  -1.981357  ]]


![](http://ww4.sinaimg.cn/large/006tNc79ly1g5ghw0qhnmj30wk0erwio.jpg)

In [8]:
def random_policy_evaluation(max_time_step=1000, T=200, V_init=None, alpha=None):
    """
    这里alpha常量步长算法中的步长。如果alpha=None,表示不使用常量步长。
    """
    if V_init is None:
        V = np.zeros((WORLD_SIZE, WORLD_SIZE), dtype=np.float32)
    else:
        V = V_init
    env = Env()
    agent = Agent()
    if alpha is None:
        s_count = np.zeros((WORLD_SIZE, WORLD_SIZE), dtype=np.float32)
    for _ in range(max_time_step):

        init_s = agent.reset()
        episode = MCSample(env, agent.random_policy, init_s, T=T)
        g = 0
        drop_step = 0
        # 为了降低计算复杂度，可以反向遍历，可以只用维护一个g变量
        for transition in episode[::-1]:
            s, a, r = transition['s'], transition['a'], transition['r']
            g = r + gamma * g
            drop_step += 1
            if drop_step > T / 2:
                if alpha is None:
                    s_count[s[0], s[1]] += 1
                    V[s[0], s[1]] = V[s[0], s[1]] + (g - V[s[0], s[1]]) / s_count[s[0], s[1]]
                else:
                    V[s[0], s[1]] = V[s[0], s[1]] + (g - V[s[0], s[1]]) * alpha
    return V

In [11]:
random_V = random_policy_evaluation(2000, 200)
print(random_V)

[[ 3.3351688   8.803972    4.4152913   5.3230157   1.6214232 ]
 [ 1.5292075   3.0158775   2.2903326   1.9095056   0.60355246]
 [ 0.04469734  0.7028334   0.6592238   0.36669415 -0.33925962]
 [-0.95392966 -0.4575108  -0.3852042  -0.558248   -1.1887605 ]
 [-1.8218725  -1.3281966  -1.2188294  -1.4320998  -2.0372064 ]]


![](http://ww1.sinaimg.cn/large/006tNc79ly1g5ghxws3jlj30m6055gmk.jpg)

In [12]:
random_V = random_policy_evaluation(2000, 200, alpha=0.1)
print(random_V)

[[ 5.011682    8.85752     5.1491265   4.8443146   0.3194144 ]
 [-0.21691914  2.823933    3.7166386   2.9417336   0.8124478 ]
 [-1.0914559   0.5311721   0.2672691   0.3495525   0.40145755]
 [-1.3017095   0.13145821 -0.6047236  -0.8255357  -0.6181143 ]
 [-2.6748776  -0.29108274 -1.0828218  -1.5781964  -1.2179748 ]]


可以看出常量步长下的误差更大


接下来我们开始实现GLIE的MC优化算法，来寻找最优的策略。和之前的算法不同，GLIE有这么几个关键点：

- 1 为了能够从值函数导出策略，我们需要更新的是Q函数，而不是V函数。
- 2 我们需要保证探索（比如 ε -greedy)
- 3 随着训练的进行，策略需要收敛到贪婪策略（比如，不断地衰减 ε 的值)
实现GLIE如下

首先我们先实现一个通过当前Q值和 ε 的值返回一个策略的函数

In [13]:
def epsilon_greedy_policy(self, Q, epsilon):
    """Q: [x, y, a]
    """ 
    def _policy(s):
        Qs = Q[s[0], s[1]]
        if np.random.uniform(0, 1) > epsilon:
            return np.argmax(Qs)
        else:
            return np.random.randint(0, 4)
    return _policy

Agent.epsilon_greedy_policy = epsilon_greedy_policy

In [14]:
def GLIE(max_time_step=1000, T=200):
    env = Env()
    agent = Agent()
    counts = np.zeros((WORLD_SIZE, WORLD_SIZE, N_ACTIONS), dtype=np.float32)
    Q = np.random.random((WORLD_SIZE, WORLD_SIZE, N_ACTIONS))
    epsilon = 0.99
    for k in range(1, max_time_step):
        epsilon *= 0.99
        policy = agent.epsilon_greedy_policy(Q, epsilon)
        init_s = agent.reset()
        episode = MCSample(env, policy, init_s, T=T)
        g = 0
        drop_step = 0
        for transition in episode[::-1]:
            s, a, r = transition['s'], transition['a'], transition['r']
            g = r + gamma * g
            drop_step += 1
            if drop_step > T / 2:
                counts[s[0], s[1], a] += 1
                Q[s[0], s[1], a] += 1.0 / counts[s[0], s[1], a] * (g - Q[s[0], s[1], a])
    return Q

In [15]:
optim_Q = GLIE(2000, 200)
optim_policy = np.argmax(optim_Q, axis=-1)
print(optim_policy)

[[3 0 2 1 2]
 [3 0 2 0 2]
 [0 0 0 0 0]
 [0 0 0 0 2]
 [0 0 0 0 0]]


![](http://ww2.sinaimg.cn/large/006tNc79ly1g5gi1r6k1oj30rp0ay3zw.jpg)

In [16]:
def greedy_Optim(max_time_step=1000, T=200):
    env = Env()
    agent = Agent()
    counts = np.zeros((WORLD_SIZE, WORLD_SIZE, N_ACTIONS), dtype=np.float32)
    Q = np.random.random((WORLD_SIZE, WORLD_SIZE, N_ACTIONS))
    for k in range(1, max_time_step):
        epsilon = -1
        policy = agent.epsilon_greedy_policy(Q, epsilon)
        init_s = agent.reset()
        episode = MCSample(env, policy, init_s, T=T)
        g = 0
        drop_step = 0
        for transition in episode[::-1]:
            s, a, r = transition['s'], transition['a'], transition['r']
            g = r + gamma * g
            drop_step += 1
            if drop_step > T / 2:
                counts[s[0], s[1], a] += 1
                Q[s[0], s[1], a] += 1.0 / counts[s[0], s[1], a] * (g - Q[s[0], s[1], a])
    return Q

In [17]:
optim_Q = greedy_Optim(3000, 200)
optim_policy = np.argmax(optim_Q, axis=-1)
print(optim_policy)

[[3 3 2 0 2]
 [0 0 0 0 2]
 [0 0 0 0 0]
 [0 1 0 0 0]
 [0 0 0 0 0]]


接下来我们实现增量式离策略每次拜访优化算法：

In [18]:
def increment_off_policy(max_time_step=1000, T=200):
    env = Env()
    agent = Agent()
    counts = np.zeros((WORLD_SIZE, WORLD_SIZE, N_ACTIONS), dtype=np.float32)
    Q = np.random.random((WORLD_SIZE, WORLD_SIZE, N_ACTIONS))
    for k in range(max_time_step):
        init_s = agent.reset()
        episode = MCSample(env, agent.random_policy, init_s, T=T)  # 使用随机策略mu进行探索
        g = 0
        W = 1
        drop_step = 0
        for transition in episode[::-1]:
            s, a, r = transition['s'], transition['a'], transition['r']
            g = r + gamma * g
            drop_step += 1
            if drop_step > T / 2:
                counts[s[0], s[1], a] += W
                Q[s[0], s[1], a] += W / counts[s[0], s[1], a] * (g - Q[s[0], s[1], a])
                if a != np.argmax(Q[s[0], s[1]]):
                    break
                W /= 0.25
    return Q

In [20]:
optim_Q = increment_off_policy(10000, 200)
optim_policy = np.argmax(optim_Q, axis=-1)
print(optim_policy)

[[3 1 2 0 1]
 [1 0 0 2 1]
 [3 0 0 2 2]
 [3 0 2 2 2]
 [0 3 0 3 2]]
