Note : 
We have two important functions : 
- $V^\pi(s)$ is the state-value function of MDP (Markov Decision Process). It's the expected return starting from state s following policy $\pi$. 
- $Q^\pi(s)$ is the action-value function. It is the expected return starting from state s, following policy $\pi$, taking action $a$

The Bellman Equations are : 

$V_{\pi}(s)=\sum_{a \in \mathcal{A}} \pi(a \mid s)\left(R_{s}^{a}+\sum_{s^{\prime} \in S} \gamma \mathcal{P}_{s s^{\prime}}^{a} V_{\pi}\left(s^{\prime}\right)\right)$ with $R$ the reward, $\gamma$ the discount, $P$ the transition matrix probability, $V$ the value function, $s$ the state, $a$ the action and $\pi$ the probability of making the decision in state $s$ (policy).

$q_{\pi}(s, a)=R_{s}^{a}+\sum_{s^{\prime} \in \mathcal{S}} \gamma \mathcal{P}_{s s^{\prime}}^{a} \underset{a \in \mathcal{A}}{\pi\left(a \mid s^{\prime}\right)} q_{\pi}\left(s^{\prime}, a\right)$ with $a$ the action, and $P$  the probability that action $a$ in state $s$ time t will lead to state $s′$ at time $t + 1$

Usually, we are interested in the optimal state function or action-state function in order to find then the best policy :

\begin{equation*}V^*_{\pi}(s)=\sum_{max  a} \pi(a \mid s)\left(R_{s}^{a}+\sum_{s^{\prime} \in S} \gamma \mathcal{P}_{s s^{\prime}}^{a} V_{\pi}\left(s^{\prime}\right)\right)\end{equation*} \\
\begin{equation*}
q^*_{\pi}(s, a)=R_{s}^{a}+ max_{a\prime} \sum_{s^{\prime} \in \mathcal{S}} \gamma \mathcal{P}_{s s^{\prime}}^{a} \underset{a \in \mathcal{A}}{\pi\left(a\prime \mid s^{\prime}\right)} q_{\pi}\left(s^{\prime}, a\prime\right)
\end{equation*}

Then we can extract the policy such as : 

\begin{equation*}\pi^* = argmax_a \left(R_{s}^{a}+\sum_{s^{\prime} \in S} \gamma \mathcal{P}_{s s^{\prime}}^{a} V_{\pi}\left(s^{\prime}\right)\right)\end{equation*} 

In this notebook, we will reproduce two dynamic programming algorithm which allow to find the optimal policy for a known environment. Our hypothesis are : 
- no move outside of the map (square of size 5x5)
- the coordinate (5,5) are the final destination, and the reward is equal to 1. The reward of the other positions are 0.
<img src="img/map.jpg">

# Value Iteration

<img src="img/value_iteration.png" width="400">

In [1]:
# Value Iteration
import numpy as np

table = np.zeros((5,5))
table[0, 1] = np.nan
table[0, 3] = np.nan
table[2, 1] = np.nan
table[3, 1] = np.nan
table[2,2] = np.nan
print("MAP : \n", table)
R = table.copy()
R[-1,-1] = 1.0
print("REWARD : \n", R)

MAP : 
 [[ 0. nan  0. nan  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0.  0.]
 [ 0. nan  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]]
REWARD : 
 [[ 0. nan  0. nan  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0.  0.]
 [ 0. nan  0.  0.  0.]
 [ 0.  0.  0.  0.  1.]]


In [5]:
def compute_max_state(V, R, discount, i, j):
    max_ = []
    max_i = V.shape[0]
    max_j = V.shape[1]
    # top
    if i-1>=0 and not np.isnan(V[i-1,j]):
        max_.append(R[i,j] + discount*V[i-1,j])
    else:
        max_.append(float("-inf"))
    
    # right
    if j+1<max_j and not np.isnan(V[i,j+1]):
        max_.append(R[i,j] + discount*V[i,j+1])
    else:
        max_.append(float("-inf"))
        
    # bot
    if i+1<max_i and not np.isnan(V[i+1,j]):
        max_.append(R[i,j] + discount*V[i+1,j])
    else:
        max_.append(float("-inf"))    
    # left
    if j-1>=0 and not np.isnan(V[i,j-1]):
        max_.append(R[i,j] + discount*V[i,j-1])
    else:
        max_.append(float("-inf"))
        
    return max_

def run_VI(table, R, discount=0.9, epsilon=0.01): # Value Iteration algo
    V = table.copy()
    temp = table.copy()
    k = 0
    while np.abs(np.nan_to_num(V - temp)).sum() > epsilon or k == 0:
        # look every state
        V = temp.copy()
        temp = table.copy()
        for i in range(table.shape[0]):
            for j in range(table.shape[1]):
                if not np.isnan(V[i,j]):
                    if i == table.shape[0]-1 and j == table.shape[1]-1:
                        temp[i,j] = R[i,j]
                    else:
                        temp[i,j] = max(compute_max_state(V, R, discount,i,j))
                
        print(f'K = {k}')
        print(temp)
        
        k += 1
    return temp
def extract_policy(V, R, policy, discount):
    mapping = {0:"↑", 1:"→", 2:"↓", 3:"←"}
    
    for i in range(V.shape[0]):
        for j in range(V.shape[1]):
            if not np.isnan(V[i,j]):
                if i == table.shape[0]-1 and j == table.shape[1]-1:
                    policy[i,j] = 0
                else:
                    policy[i,j] = mapping[np.argmax(compute_max_state(V, R, discount, i, j))]
    return policy
V = run_VI(table, R, discount=0.9, epsilon=0.01)
policy = extract_policy(V, R, table.copy().astype(object), discount=0.9)    
print('POLICY \n',policy)            
    

K = 0
[[ 0. nan  0. nan  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0.  0.]
 [ 0. nan  0.  0.  0.]
 [ 0.  0.  0.  0.  1.]]
K = 1
[[0.  nan 0.  nan 0. ]
 [0.  0.  0.  0.  0. ]
 [0.  nan nan 0.  0. ]
 [0.  nan 0.  0.  0.9]
 [0.  0.  0.  0.9 1. ]]
K = 2
[[0.    nan 0.    nan 0.  ]
 [0.   0.   0.   0.   0.  ]
 [0.    nan  nan 0.   0.81]
 [0.    nan 0.   0.81 0.9 ]
 [0.   0.   0.81 0.9  1.  ]]
K = 3
[[0.      nan 0.      nan 0.   ]
 [0.    0.    0.    0.    0.729]
 [0.      nan   nan 0.729 0.81 ]
 [0.      nan 0.729 0.81  0.9  ]
 [0.    0.729 0.81  0.9   1.   ]]
K = 4
[[0.        nan 0.        nan 0.6561]
 [0.     0.     0.     0.6561 0.729 ]
 [0.        nan    nan 0.729  0.81  ]
 [0.        nan 0.729  0.81   0.9   ]
 [0.6561 0.729  0.81   0.9    1.    ]]
K = 5
[[0.          nan 0.          nan 0.6561 ]
 [0.      0.      0.59049 0.6561  0.729  ]
 [0.          nan     nan 0.729   0.81   ]
 [0.59049     nan 0.729   0.81    0.9    ]
 [0.6561  0.729   0.81    0.9     1.     ]]
K = 6
[[0.        

The policy seems to work

# Policy Iteration

<img src="img/Policy_iteration.png">

In [7]:
politique = table.copy()
politique = np.nan_to_num(politique, nan=-1).astype(object) # 0 top, 1 right, 2 = bot , 3 = left
mapping = {"↑":0, "→":1, "↓":2, "←":3}

politique[:-1, 0] = "↓"
politique[0,2] = "↓"
politique[0,4] = "↓"
politique[1, 3] = "↓"
politique[3,3] = "↓"
politique[1:-1, -1] = "↑"
politique[-1,:-1] = "→"
politique[3,2] = "→"
politique[1, 1:3] = "→"
politique[2,3] = "→"



def compute_value(V, R, policy, discount, mapping, i, j): # the policy is determinism in this exercise
    l = 0
    h = 0
    
    if policy[i,j] == 0:
        return R[i,j]
    
    if  mapping[policy[i,j]] == 0:
        h = -1
    elif mapping[policy[i,j]] == 1:
        l = 1
    elif mapping[policy[i,j]] == 2:
        h = 1
    elif mapping[policy[i,j]] == 3:
        l = -1
    
    return R[i,j] + discount * V[i+h, j+l]

def policy_evaluation(V, R, policy, mapping, discount, epsilon=0.1):
    temp = V.copy()
    
    k = 0
    delta = 0
    while delta > epsilon or k ==0:
        delta = 0
        V = temp.copy()
        #temp = V.copy()
        for i in range(V.shape[0]):
            for j in range(V.shape[1]):
                if not np.isnan(V[i,j]):
                    temp[i,j] = compute_value(V, R, policy, discount, mapping, i, j)
        #print(temp)
                    delta = max(delta, np.abs(np.nan_to_num(V[i,j]-temp[i,j])))
        k +=1
        #print(delta)
    V = temp.copy()
    return V

def compute_best_action(V, R, discount, i, j, previous_policy):
    max_ = []
    max_i = V.shape[0]
    max_j = V.shape[1]
    # top
    if i-1>=0 and not np.isnan(V[i-1,j]):
        max_.append(R[i,j] + discount*V[i-1,j])
    else:
        max_.append(-float("inf"))
    # right
    if j+1<max_j and not np.isnan(V[i,j+1]):
        max_.append(R[i,j] + discount*V[i,j+1])
    else:
        max_.append(-float("inf"))
    # bot
    if i+1<max_i and not np.isnan(V[i+1,j]):
        max_.append(R[i,j] + discount*V[i+1,j])
    else:
        max_.append(-float("inf"))    
    # left
    if j-1>=0 and not np.isnan(V[i,j-1]):
        max_.append(R[i,j] + discount*V[i,j-1])
    else:
        max_.append(-float("inf"))
        
    # if multiple argmax, and if previous policy is part of the argmax return it
    argmaxes = np.argwhere(max_ == np.amax(max_)).flatten()
    #print(i,j, max_)
    #print(argmaxes)
    for idx in argmaxes:
        if idx == previous_policy:
            return idx
    
    return np.argmax(np.array(max_))

def policy_improvement(V, R, policy, mapping, discount):
    reverse = {v:k for k,v in mapping.items()} # int to arrow
    new = policy.copy()
    for i in range(V.shape[0]):
        for j in range(V.shape[1]):
            if (i != V.shape[0]-1 or j != V.shape[1]-1) and not np.isnan(V[i,j]):
                new[i,j] = reverse[compute_best_action(V,R, discount=discount, i=i, j=j, 
                                                       previous_policy=mapping[policy[i,j]]) ]
    return new


def PolicyIteration(V,R,policy,mapping,discount,epsilon):
    new_policy = policy.copy()
    
    k = 0
    while k == 0 or (new_policy==policy).all() == False:
        policy = new_policy.copy()
        
        # evaluate policy
        V = policy_evaluation(V, R, policy, mapping, discount, epsilon=epsilon)
        # improve policy
        new_policy = policy_improvement(V, R, policy, mapping, discount)
        print(f" K = {k}")
        print(new_policy)
        print(V)
        k+=1
    return V, new_policy

In [8]:
politique, R

(array([['↓', -1.0, '↓', -1.0, '↓'],
        ['↓', '→', '→', '↓', '↑'],
        ['↓', -1.0, -1.0, '→', '↑'],
        ['↓', -1.0, '→', '↓', '↑'],
        ['→', '→', '→', '→', 0.0]], dtype=object),
 array([[ 0., nan,  0., nan,  0.],
        [ 0.,  0.,  0.,  0.,  0.],
        [ 0., nan, nan,  0.,  0.],
        [ 0., nan,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  1.]]))

In [12]:
V, P = PolicyIteration(table.copy(), R, politique.copy(), mapping, discount=0.9, epsilon=0.01)
print("Policy of POLICY ITERATION :\n ", P)

 K = 0
[['↓' -1.0 '↓' -1.0 '↓']
 ['↓' '←' '→' '↓' '↑']
 ['↓' -1.0 -1.0 '↓' '↑']
 ['↓' -1.0 '→' '↓' '↓']
 ['→' '→' '→' '→' 0.0]]
[[0.43046721        nan 0.                nan 0.        ]
 [0.4782969  0.         0.         0.         0.        ]
 [0.531441          nan        nan 0.         0.        ]
 [0.59049           nan 0.729      0.81       0.        ]
 [0.6561     0.729      0.81       0.9        1.        ]]
 K = 1
[['↓' -1.0 '↓' -1.0 '↓']
 ['↓' '→' '→' '↓' '←']
 ['↓' -1.0 -1.0 '↓' '↓']
 ['↓' -1.0 '→' '↓' '↓']
 ['→' '→' '→' '→' 0.0]]
[[0.43046721        nan 0.531441          nan 0.        ]
 [0.4782969  0.43046721 0.59049    0.6561     0.        ]
 [0.531441          nan        nan 0.729      0.        ]
 [0.59049           nan 0.729      0.81       0.9       ]
 [0.6561     0.729      0.81       0.9        1.        ]]
 K = 2
[['↓' -1.0 '↓' -1.0 '↓']
 ['↓' '→' '→' '↓' '↓']
 ['↓' -1.0 -1.0 '↓' '↓']
 ['↓' -1.0 '→' '↓' '↓']
 ['→' '→' '→' '→' 0.0]]
[[0.43046721        nan 0.531441  

The policy seems to work. We can see Policy iteration is faster than Value iteration.