In [1]:
import numpy as np

# 动态规划寻找最优策略
## 策略评估、策略迭代、价值迭代
- 预测：
已知一个马尔科夫决策过程$MDP<S,A,P,R,\gamma>$和一个策略$\pi$，或者是给定一个马尔科夫奖励过程$MRP<S,P_{\pi},R_{\pi},\gamma>$，求解基于该策略的价值函数$v_{\pi}$
- 控制：
已知一个马尔科夫决策过程$MDP<S,A,P,R,\gamma>$，求解最优价值函数$v_{*}$和最优策略$\pi_{*}$
## 1. 定义小型方格世界环境

In [2]:
class ENV(object):
    def __init__(self):
        self.action = np.arange(0,4,1)
        self.state = np.arange(0,16,1)
        self.action_space_n = len(self.action)
        self.state_space_n = len(self.state)
        self.P = self.get_P()
        
    def get_P(self):
        rst = dict()
        for s in self.state:
            action_dict = dict()
            for a in self.action:
                next_state = self.get_next_state(a, s)
                reward = -1
                flag = False
                if next_state == 15:
                    flag = True
                    reward = 0  
                action_dict[a] = [[self.get_next_state(a, s), reward, 1.0, flag]]
            rst[s] = action_dict
        return rst
                
    def get_next_state(self, action, state):
        if action == 0:#up
            return (lambda x: x if x - 4 < 0 else x - 4)(state)
        elif action == 1:#down
            return (lambda x: x if x + 4 > 15 else x + 4)(state)
        elif action == 2:#left
            return (lambda x: x if x % 4 == 0 else x - 1)(state)
        else:
            return (lambda x: x if x in [3, 7, 11, 15] else x + 1)(state)
        

In [3]:
env = ENV()

In [4]:
env.P

{0: {0: [[0, -1, 1.0, False]],
  1: [[4, -1, 1.0, False]],
  2: [[0, -1, 1.0, False]],
  3: [[1, -1, 1.0, False]]},
 1: {0: [[1, -1, 1.0, False]],
  1: [[5, -1, 1.0, False]],
  2: [[0, -1, 1.0, False]],
  3: [[2, -1, 1.0, False]]},
 2: {0: [[2, -1, 1.0, False]],
  1: [[6, -1, 1.0, False]],
  2: [[1, -1, 1.0, False]],
  3: [[3, -1, 1.0, False]]},
 3: {0: [[3, -1, 1.0, False]],
  1: [[7, -1, 1.0, False]],
  2: [[2, -1, 1.0, False]],
  3: [[3, -1, 1.0, False]]},
 4: {0: [[0, -1, 1.0, False]],
  1: [[8, -1, 1.0, False]],
  2: [[4, -1, 1.0, False]],
  3: [[5, -1, 1.0, False]]},
 5: {0: [[1, -1, 1.0, False]],
  1: [[9, -1, 1.0, False]],
  2: [[4, -1, 1.0, False]],
  3: [[6, -1, 1.0, False]]},
 6: {0: [[2, -1, 1.0, False]],
  1: [[10, -1, 1.0, False]],
  2: [[5, -1, 1.0, False]],
  3: [[7, -1, 1.0, False]]},
 7: {0: [[3, -1, 1.0, False]],
  1: [[11, -1, 1.0, False]],
  2: [[6, -1, 1.0, False]],
  3: [[7, -1, 1.0, False]]},
 8: {0: [[4, -1, 1.0, False]],
  1: [[12, -1, 1.0, False]],
  2: [[8, 

In [5]:
env.action

array([0, 1, 2, 3])

In [6]:
env.state

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15])

In [7]:
print(env.state_space_n)
print(env.action_space_n)

16
4


# 2. 策略评估[小型方格世界]
### 基于均一随机策略
$$
V_{k+1}(s) = \sum_{a \in A}\pi (a|s) (R_{s}^{a} + \gamma \sum_{s^{'} \in S} P^{a}_{s^{'}}v_{k}(s^{'}))
$$
迭代过程中价值函数更新方式:https://subaochen.github.io/reinforcement%20learning/2019/06/16/policy-evaluation/

In [8]:
def convergence_flag(value_table1, value_table2, threshold = 1e-2):
    if np.sum(np.fabs(value_table1-value_table2)) < threshold:
        return True
    else:
        return False

In [9]:
value_table = np.zeros(16)
pi = 0.25
gamma = 1.0
value_table
iteration_num = 100000

In [10]:
for k in range(iteration_num):
    value_table_tmp = np.copy(value_table)
    for s in range(env.state_space_n):
        if s not in [0, 15]:
            value_s = []
            for a in env.action:
                q_tmp = []
                rewards = 0
                for next_state_info in env.P[s][a]:
                    next_state, reward, P, done =  next_state_info
                    rewards += reward * P
                    q_tmp.append(P * value_table_tmp[next_state])
                #print("{}*({} + {} * {} * {})".format(pi, rewards, gamma, P, value_table[next_state]))
                value_s.append(pi * (rewards+ gamma * np.sum(q_tmp)))
            value_table[s] = np.sum(value_s)
    value_table_print = np.copy(value_table)
    value_table_print = value_table_print.reshape([4, 4])
    if k < 10:
        print("[Time k={}]".format(k))
        print(value_table_print)
    if convergence_flag(value_table_tmp, value_table):
        break

[Time k=0]
[[ 0.   -1.   -1.   -1.  ]
 [-1.   -1.   -1.   -1.  ]
 [-1.   -1.   -1.   -0.75]
 [-1.   -1.   -0.75  0.  ]]
[Time k=1]
[[ 0.     -1.75   -2.     -2.    ]
 [-1.75   -2.     -2.     -1.9375]
 [-2.     -2.     -1.875  -1.4375]
 [-2.     -1.9375 -1.4375  0.    ]]
[Time k=2]
[[ 0.       -2.4375   -2.9375   -2.984375]
 [-2.4375   -2.875    -2.953125 -2.84375 ]
 [-2.9375   -2.953125 -2.71875  -2.0625  ]
 [-2.984375 -2.84375  -2.0625    0.      ]]
[Time k=3]
[[ 0.        -3.0625    -3.828125  -3.9375   ]
 [-3.0625    -3.6953125 -3.84375   -3.7109375]
 [-3.828125  -3.84375   -3.5078125 -2.65625  ]
 [-3.9375    -3.7109375 -2.65625    0.       ]]
[Time k=4]
[[ 0.         -3.64648438 -4.66796875 -4.85351562]
 [-3.64648438 -4.453125   -4.68554688 -4.53710938]
 [-4.66796875 -4.68554688 -4.25       -3.21875   ]
 [-4.85351562 -4.53710938 -3.21875     0.        ]]
[Time k=5]
[[ 0.         -4.19189453 -5.46337891 -5.72802734]
 [-4.19189453 -5.16601562 -5.47705078 -5.32373047]
 [-5.46337891 -

In [11]:
value_table.reshape([4, 4])

array([[  0.        , -13.72157009, -19.56329161, -21.48474559],
       [-13.72157009, -17.6033761 , -19.48645972, -19.40944546],
       [-19.56329161, -19.48645972, -17.37260687, -13.26003163],
       [-21.48474559, -19.40944546, -13.26003163,   0.        ]])

# 3. 策略迭代[小型方格世界]
我们在策略评估过程中使用贪婪策略更新我们的策略$\pi$, 即$\pi^{'} = greedy(\pi)$
- 问题：为什么取$\pi^{'} = greedy(\pi)$收敛可到达最优解证明：</br>

见：笔记 p35

In [12]:
def get_next_state(action, state):
    if action == 0:#up
        return (lambda x: x if x - 4 < 0 else x - 4)(state)
    elif action == 1:#down
        return (lambda x: x if x + 4 > 15 else x + 4)(state)
    elif action == 2:#left
        return (lambda x: x if x % 4 == 0 else x - 1)(state)
    else:
        return (lambda x: x if x in [3, 7, 11, 15] else x + 1)(state)
def get_max_state(value_table, actions, state):
    max_value = -1000
    rst = 0
    for action in actions:
        next_state = get_next_state(action, state)
        if value_table[next_state] > max_value:
            max_value = value_table[next_state]
            rst = next_state
    return rst

In [13]:
value_table = np.zeros(16)
pi = 0.25
gamma = 1.0
value_table
iteration_num = 100000

In [14]:
for k in range(iteration_num):
    value_table_tmp = np.copy(value_table)
    for s in range(env.state_space_n):
        if s not in [0, 15]:
            value_s = []
            next_state_max = get_max_state(value_table_tmp, env.action, s)
            for a in env.action:
                q_tmp = []
                rewards = 0
                for next_state_info in env.P[s][a]:
                    next_state, reward, P, done =  next_state_info
                    if next_state == next_state_max:#选择能到达最大value状态的随机一个action
                        rewards += reward * 1
                        q_tmp.append(1 * value_table_tmp[next_state])
                        break
                #print("{}*({} + {} * {} * {})".format(pi, rewards, gamma, P, value_table[next_state]))
                value_s.append(pi * (rewards + gamma * np.sum(q_tmp)))
            value_table[s] = np.sum(value_s)
    value_table_print = np.copy(value_table)
    value_table_print = value_table_print.reshape([4, 4])
    if k <10:
        print("[Time k={}]".format(k))
        print(value_table_print)
    if convergence_flag(value_table_tmp, value_table):
        break

[Time k=0]
[[ 0.   -0.25 -0.25 -0.5 ]
 [-0.25 -0.25 -0.25 -0.25]
 [-0.25 -0.25 -0.25 -0.25]
 [-0.25 -0.25 -0.25  0.  ]]
[Time k=1]
[[ 0.     -0.25   -0.3125 -0.3125]
 [-0.25   -0.3125 -0.3125 -0.3125]
 [-0.3125 -0.3125 -0.3125  0.    ]
 [-0.3125 -0.3125  0.      0.    ]]
[Time k=2]
[[ 0.       -0.25     -0.3125   -0.65625 ]
 [-0.25     -0.3125   -0.328125 -0.25    ]
 [-0.3125   -0.328125 -0.25      0.      ]
 [-0.328125 -0.25     -0.25      0.      ]]
[Time k=3]
[[ 0.     -0.25   -0.3125 -0.3125]
 [-0.25   -0.3125 -0.3125 -0.25  ]
 [-0.3125 -0.3125 -0.25    0.    ]
 [-0.3125 -0.3125  0.      0.    ]]
[Time k=4]
[[ 0.       -0.25     -0.3125   -0.3125  ]
 [-0.25     -0.3125   -0.3125   -0.25    ]
 [-0.3125   -0.3125   -0.25      0.      ]
 [-0.328125 -0.25     -0.25      0.      ]]
[Time k=5]
[[ 0.     -0.25   -0.3125 -0.3125]
 [-0.25   -0.3125 -0.3125 -0.25  ]
 [-0.3125 -0.3125 -0.25    0.    ]
 [-0.3125 -0.3125  0.      0.    ]]
[Time k=6]
[[ 0.       -0.25     -0.3125   -0.3125  ]
 [

In [15]:
value_table.reshape([4, 4])

array([[ 0.    , -0.25  , -0.3125, -0.3125],
       [-0.25  , -0.3125, -0.3125, -0.25  ],
       [-0.3125, -0.3125, -0.25  ,  0.    ],
       [-0.3125, -0.3125,  0.    ,  0.    ]])

# 4. 价值迭代：
$$
v_{*}(s)=max_{a \in A}(R_{s}^{a} + \gamma \sum_{s^{'} \in S}P_{ss^{'}}^{a}v_{*}(s^{'}))
$$
- 求解时对于某一个state来说：

迭代以下公式，对于某个state的value来说直到$v_{k+1}(s)$与$v_{k}(s)$的$\delta$基本为0时得到state s下最优$v_{*}(s)$
$$
v_{k+1}(s)=max_{a \in A}(R_{s}^{a} + \gamma \sum_{s^{'} \in S}P_{ss^{'}}^{a}v_{k}(s^{'}))
$$
## (a)已知终态与终态价值根据如下公式迭代求解：
从终态开始往前迭代使用DP求解之前状态的最优$v(s)$
## 环境定义：[小型方格世界最短路径问题]
从除$s_{0}$的任何位置出发以最短路径到达$s_{0}$

终点为$s_{0}$,到达终点reward为0，其余state reward为-1

In [16]:
class ENV(object):
    def __init__(self):
        self.action = np.arange(0,4,1)
        self.state = np.arange(0,16,1)
        self.action_space_n = len(self.action)
        self.state_space_n = len(self.state)
        self.P = self.get_P()
        
    def get_P(self):
        rst = dict()
        for s in self.state:
            action_dict = dict()
            for a in self.action:
                next_state = self.get_next_state(a, s)
                reward = -1
                flag = False
                if next_state == 0:
                    flag = True
                    reward = 0  
                action_dict[a] = [[self.get_next_state(a, s), reward, 1.0, flag]]
            rst[s] = action_dict
        return rst
                
    def get_next_state(self, action, state):
        if action == 0:#up
            return (lambda x: x if x - 4 < 0 else x - 4)(state)
        elif action == 1:#down
            return (lambda x: x if x + 4 > 15 else x + 4)(state)
        elif action == 2:#left
            return (lambda x: x if x % 4 == 0 else x - 1)(state)
        else:
            return (lambda x: x if x in [3, 7, 11, 15] else x + 1)(state)

In [17]:
env = ENV()
env.P

{0: {0: [[0, 0, 1.0, True]],
  1: [[4, -1, 1.0, False]],
  2: [[0, 0, 1.0, True]],
  3: [[1, -1, 1.0, False]]},
 1: {0: [[1, -1, 1.0, False]],
  1: [[5, -1, 1.0, False]],
  2: [[0, 0, 1.0, True]],
  3: [[2, -1, 1.0, False]]},
 2: {0: [[2, -1, 1.0, False]],
  1: [[6, -1, 1.0, False]],
  2: [[1, -1, 1.0, False]],
  3: [[3, -1, 1.0, False]]},
 3: {0: [[3, -1, 1.0, False]],
  1: [[7, -1, 1.0, False]],
  2: [[2, -1, 1.0, False]],
  3: [[3, -1, 1.0, False]]},
 4: {0: [[0, 0, 1.0, True]],
  1: [[8, -1, 1.0, False]],
  2: [[4, -1, 1.0, False]],
  3: [[5, -1, 1.0, False]]},
 5: {0: [[1, -1, 1.0, False]],
  1: [[9, -1, 1.0, False]],
  2: [[4, -1, 1.0, False]],
  3: [[6, -1, 1.0, False]]},
 6: {0: [[2, -1, 1.0, False]],
  1: [[10, -1, 1.0, False]],
  2: [[5, -1, 1.0, False]],
  3: [[7, -1, 1.0, False]]},
 7: {0: [[3, -1, 1.0, False]],
  1: [[11, -1, 1.0, False]],
  2: [[6, -1, 1.0, False]],
  3: [[7, -1, 1.0, False]]},
 8: {0: [[4, -1, 1.0, False]],
  1: [[12, -1, 1.0, False]],
  2: [[8, -1, 1.0,

In [18]:
value_table = np.zeros(16) + -0xffff
value_table[0] = 0
gamma = 1.0
final_state = 0
value_table.reshape([4, 4])

array([[     0., -65535., -65535., -65535.],
       [-65535., -65535., -65535., -65535.],
       [-65535., -65535., -65535., -65535.],
       [-65535., -65535., -65535., -65535.]])

In [19]:
def one_step_lookahead(state, value_table):
    q_table = np.zeros(env.action_space_n)
    for a in range(env.action_space_n):
        for next_state, reward, prob, done in env.P[state][a]:
            #print([prob, next_state, reward, done])
            q_table[a] += prob * (reward + gamma * value_table[next_state])
            #print(q_table)
    return q_table
def get_next_state(action, state):
    if action == 0:#up
        return (lambda x: x if x - 4 < 0 else x - 4)(state)
    elif action == 1:#down
        return (lambda x: x if x + 4 > 15 else x + 4)(state)
    elif action == 2:#left
        return (lambda x: x if x % 4 == 0 else x - 1)(state)
    else:
        return (lambda x: x if x in [3, 7, 11, 15] else x + 1)(state)
one_step_lookahead(4, value_table)

array([     0., -65536., -65536., -65536.])

In [20]:
already_processed_state = [final_state]
def get_nearby_state(s):
    rst = []
    for action in env.action:
        next_state = get_next_state (action, s)
        if next_state not in already_processed_state:
            rst.append(next_state)
    return rst

In [21]:
episode = 1
while True:
    print("[Episode {}]:".format(episode))
    process_state_queue = [final_state]
    while len(process_state_queue) != 0:
        max_value_next_state = process_state_queue.pop(0)
        nearby_states = get_nearby_state(max_value_next_state)
        for state in nearby_states:
            q_value = one_step_lookahead(state, value_table)
            value_state_old = np.copy(value_table)
            value_table[state] = np.max(q_value)
            print("After update state:{}".format(state))
            print(value_table.reshape([4, 4]))
            process_state_queue.insert(0, state)
            already_processed_state.append(state)
    print("\n\n\n")
    already_processed_state = [final_state]
    if convergence_flag(value_state_old, value_table):
        break
print("[Final Value Table] is:")
print(value_table.reshape([4, 4]))

[Episode 1]:
After update state:4
[[     0. -65535. -65535. -65535.]
 [     0. -65535. -65535. -65535.]
 [-65535. -65535. -65535. -65535.]
 [-65535. -65535. -65535. -65535.]]
After update state:1
[[     0.      0. -65535. -65535.]
 [     0. -65535. -65535. -65535.]
 [-65535. -65535. -65535. -65535.]
 [-65535. -65535. -65535. -65535.]]
After update state:5
[[ 0.0000e+00  0.0000e+00 -6.5535e+04 -6.5535e+04]
 [ 0.0000e+00 -1.0000e+00 -6.5535e+04 -6.5535e+04]
 [-6.5535e+04 -6.5535e+04 -6.5535e+04 -6.5535e+04]
 [-6.5535e+04 -6.5535e+04 -6.5535e+04 -6.5535e+04]]
After update state:2
[[ 0.0000e+00  0.0000e+00 -1.0000e+00 -6.5535e+04]
 [ 0.0000e+00 -1.0000e+00 -6.5535e+04 -6.5535e+04]
 [-6.5535e+04 -6.5535e+04 -6.5535e+04 -6.5535e+04]
 [-6.5535e+04 -6.5535e+04 -6.5535e+04 -6.5535e+04]]
After update state:6
[[ 0.0000e+00  0.0000e+00 -1.0000e+00 -6.5535e+04]
 [ 0.0000e+00 -1.0000e+00 -2.0000e+00 -6.5535e+04]
 [-6.5535e+04 -6.5535e+04 -6.5535e+04 -6.5535e+04]
 [-6.5535e+04 -6.5535e+04 -6.5535e+04

## 2.个体不知道终态位置，通过迭代每次对所有状态价值进行更新：

In [22]:
value_table = np.zeros(16)
pi = 0.25
gamma = 1.0
value_table.reshape([4, 4])

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

In [23]:
episode = 1
while True:
    print("[Episode {}]:".format(episode))
    for state in env.state:
        q_value = one_step_lookahead(state, value_table)
        value_state_old = np.copy(value_table)
        value_table[state] = np.max(q_value)
    print("After update state:{}".format(state))
    print(value_table.reshape([4, 4]))
    print("\n\n\n")
    if convergence_flag(value_state_old, value_table):
        break
print("[Final Value Table] is:")
print(value_table.reshape([4, 4]))

[Episode 1]:
After update state:15
[[ 0.  0. -1. -1.]
 [ 0. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]]




[Episode 1]:
After update state:15
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -2.]
 [-1. -2. -2. -2.]
 [-2. -2. -2. -2.]]




[Episode 1]:
After update state:15
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -3.]
 [-1. -2. -3. -3.]
 [-2. -3. -3. -3.]]




[Episode 1]:
After update state:15
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -3.]
 [-1. -2. -3. -4.]
 [-2. -3. -4. -4.]]




[Episode 1]:
After update state:15
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -3.]
 [-1. -2. -3. -4.]
 [-2. -3. -4. -5.]]




[Episode 1]:
After update state:15
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -3.]
 [-1. -2. -3. -4.]
 [-2. -3. -4. -5.]]




[Final Value Table] is:
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -3.]
 [-1. -2. -3. -4.]
 [-2. -3. -4. -5.]]
