# Model free prediction Methos

In [7]:
import numpy as np

def join(str1, str2): # 这是一个简单的字符串拼接算法
    return str1 + '-' + str2

![image](Images/image1.png)
![image](Images/image2.png)

下面是对一个简单的MDP过程的基本信息做一些定义

In [8]:
np.random.seed(0)
# 定义状态转移概率矩阵P
P = [
    [0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
    [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
    [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
]
P = np.array(P)

rewards = [-1, -2, -2, 10, 1, 0]  # 定义奖励函数
gamma = 0.5  # 定义折扣因子

S = ["s1", "s2", "s3", "s4", "s5"]  # 状态集合
A = ["保持s1", "前往s1", "前往s2", "前往s3", "前往s4", "前往s5", "概率前往"]  # 动作集合
# 状态转移函数
P = {
    "s1-保持s1-s1": 1.0,
    "s1-前往s2-s2": 1.0,
    "s2-前往s1-s1": 1.0,
    "s2-前往s3-s3": 1.0,
    "s3-前往s4-s4": 1.0,
    "s3-前往s5-s5": 1.0,
    "s4-前往s5-s5": 1.0,
    "s4-概率前往-s2": 0.2,
    "s4-概率前往-s3": 0.4,
    "s4-概率前往-s4": 0.4,
}
# 奖励函数
R = {
    "s1-保持s1": -1,
    "s1-前往s2": 0,
    "s2-前往s1": -1,
    "s2-前往s3": -2,
    "s3-前往s4": -2,
    "s3-前往s5": 0,
    "s4-前往s5": 10,
    "s4-概率前往": 1,
}
gamma = 0.5  # 折扣因子
MDP = (S, A, P, R, gamma)

# 策略1,随机策略
Pi_1 = {
    "s1-保持s1": 0.5,
    "s1-前往s2": 0.5,
    "s2-前往s1": 0.5,
    "s2-前往s3": 0.5,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.5,
    "s4-概率前往": 0.5,
}
# 策略2
Pi_2 = {
    "s1-保持s1": 0.6,
    "s1-前往s2": 0.4,
    "s2-前往s1": 0.3,
    "s2-前往s3": 0.7,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.1,
    "s4-概率前往": 0.9,
}

## 蒙特卡洛方法

### 蒙特卡洛方法的采样的实现

首先进行状态的采样

In [46]:
def sample(MDP, Pi, timestep_max, number):
    """
    这个函数是根据一个已知的MDP环境进行采样的算法
    每个episode的组织方法是每一步都是一个元组（s,a,r,s_next）即当前状态,执行的动作,得到的奖励,以及下一步的状态
    :param MDP: MDP的基本参数, 这里是模拟环境, 在真实环境下没有状态转移矩阵等
    :param Pi: 具体的策略
    :param timestep_max: 限制最大的时间步
    :param number: 总共需要采样的序列数
    :return: 返回值为采样好的episode
    """
    S, A, P, R, gamma = MDP
    episodes = []
    for _ in range(number):
        episode = []
        timestep = 0
        s = S[np.random.randint(4)]  # 随机选择一个除s5以外的状态s作为起点
        # 当前状态为终止状态或者时间步太长时,一次采样结束
        while s != "s5" and timestep <= timestep_max:
            timestep += 1
            # 在状态s下根据策略选择动作, 因为是策略是一个随机分布下面的代码是根据随机成成一个动作
            rand, temp = np.random.rand(), 0
            for a_opt in A:
                temp += Pi.get(join(s, a_opt), 0) # 从策略字典中查询在某一状态下执行某一动作的概率, 如果没有这个则返回0
                if temp > rand:
                    a = a_opt
                    r = R.get(join(s, a), 0)
                    break
            # 根据状态转移概率得到下一个状态s_next
            rand, temp = np.random.rand(), 0
            for s_opt in S:
                temp += P.get(join(join(s, a), s_opt), 0)# 在s状态下执行a动作到达s_opt的概率
                if temp > rand:
                    s_next = s_opt
                    break
            episode.append((s, a, r, s_next))  # 把（s,a,r,s_next）元组放入序列中
            s = s_next  # s_next变成当前状态,开始接下来的循环
        episodes.append(episode)
    return episodes

In [47]:
episodes = sample(MDP, Pi_1, 20, 5)
print('第一条序列\n', episodes[0])
print('第二条序列\n', episodes[1])
print('第五条序列\n', episodes[4])

第一条序列
 [('s2', '前往s3', -2, 's3'), ('s3', '前往s5', 0, 's5')]
第二条序列
 [('s2', '前往s1', -1, 's1'), ('s1', '前往s2', 0, 's2'), ('s2', '前往s1', -1, 's1'), ('s1', '保持s1', -1, 's1'), ('s1', '保持s1', -1, 's1'), ('s1', '前往s2', 0, 's2'), ('s2', '前往s1', -1, 's1'), ('s1', '前往s2', 0, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '前往s5', 10, 's5')]
第五条序列
 [('s4', '概率前往', 1, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '概率前往', 1, 's4'), ('s4', '概率前往', 1, 's4'), ('s4', '概率前往', 1, 's2'), ('s2', '前往s3', -2, 's3'), ('s3', '前往s5', 0, 's5')]


### 蒙特卡洛方法主体的实现

我们采用累进更新形式的蒙特卡洛算法其公式为:
$$V(S_t)\leftarrow V(S_t)+\frac{1}{N(S_t)}\left(G_t-V(S_t)\right)$$

In [48]:
# 对所有采样序列计算所有状态的价值
def MC(episodes, V, N, gamma):
    """
    :param episodes: 采样好的episode
    :param V: 状态的价值函数应该是一个一维的向量
    :param N: 状态的计数器应该是一个矩阵
    :param gamma: 衰减因子
    :return: null
    """
    for episode in episodes:
        G = 0
        for i in range(len(episode) - 1, -1, -1):  #一个序列从后往前计算
            (s, a, r, s_next) = episode[i]
            G = r + gamma * G
            N[s] = N[s] + 1
            V[s] = V[s] + (G - V[s]) / N[s]

In [49]:
timestep_max = 20
# 采样1000次,可以自行修改
episodes = sample(MDP, Pi_1, timestep_max, 1000)
gamma = 0.5
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
MC(episodes, V, N, gamma)
print("使用蒙特卡洛方法计算MDP的状态价值为\n", V)

使用蒙特卡洛方法计算MDP的状态价值为
 {'s1': -1.1921303313152458, 's2': -1.660432153769328, 's3': 0.5369444492885036, 's4': 6.081644157323337, 's5': 0}


## 时序差分算法

### TD(0)时序差分算法

TD(O)时序差分算法的迭代公式为:
$$V(S_t)\leftarrow V(S_t)+\alpha\left({R_{t+1}+\gamma V(S_{t+1})}-V(S_t)\right)$$

In [50]:
def TDZero(episodes, V, N, gamma, alpha=1.0):
    """
    :param episodes: 采样好的episode
    :param V: 状态的价值函数应该是一个一维的向量
    :param N: 状态的计数器应该是一个矩阵
    :param gamma: 衰减因子
    :return: null
    """
    for episode in episodes:
        for i in range(len(episode)):  #一个序列从后往前计算
            (s, a, r, s_next) = episode[i]
            V[s] = V[s] + alpha*(r + gamma*V[s_next] - V[s]) 

In [51]:
timestep_max = 20
# 采样1000次,可以自行修改
episodes = sample(MDP, Pi_1, timestep_max, 1000)
gamma = 0.5
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
TDZero(episodes, V, N, gamma, alpha=0.5)
print("使用TD(0)方法计算MDP的状态价值为\n", V)

使用TD(0)方法计算MDP的状态价值为
 {'s1': -1.2284732210578542, 's2': -1.8065500547063014, 's3': -0.019549975641035618, 's4': 6.773078763112501, 's5': 0}


### $TD(\lambda)$算法

$TD(\lambda)$需要对每一个状态维护一个资格迹:
$$E_t(s)=\gamma \lambda E_{t-1}(s)+\mathbf{1}(S_t=s)$$
其迭代公式为:
$$V(s) \leftarrow V(s)+\alpha\delta_tE_t(s)$$
其中$\delta_t$为TD-error,$R_{t+1}+\gamma V(S_{t+1})-V(S_t)$

In [52]:
def TDLambda(episodes, V, N, E, gamma, alpha=1.0, l=0.5):
    """
    :param episodes: 采样好的episode
    :param V: 状态的价值函数应该是一个一维的向量
    :param N: 状态的计数器应该是一个矩阵
    :param gamma: 衰减因子
    :return: null
    """
    for episode in episodes:
        for i in range(len(episode)):  #一个序列从后往前计算
            (s, a, r, s_next) = episode[i]
            delta = r + gamma*V[s_next]-V[s]
            for e in E:
                E[e] = gamma*l*E[e]
                if e == s:
                    E[e] = E[e] + 1
            V[s] = V[s] + alpha*delta*E[s]

In [53]:
timestep_max = 20
# 采样1000次,可以自行修改
episodes = sample(MDP, Pi_1, timestep_max, 100)
gamma = 0.5
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
E = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
TDLambda(episodes, V, N, E, gamma, alpha=0.5, l=0.5)
print("使用TD(lambda)方法计算MDP的状态价值为\n", V)

使用TD(lambda)方法计算MDP的状态价值为
 {'s1': -0.7546977657290456, 's2': -1.57085726206068, 's3': 1.3744922429932185, 's4': 9.631480542689186, 's5': 0}
