### Random Walk

The agent is in an horizontal world and can move left or right with probability .5.
The reward is 0 for all transitions except for the transition to the right terminal state.

We eventually introduce randomness in the transitions by giving probability .1 to stay in the current state, probability 0.1 to move 2 steps at once and probability .8 to move to the next state.

In [1]:
import numpy as np
from numpy.random import choice
from functools import partial
import matplotlib.pyplot as plt
import copy
import sys

def move_left(current_state):
    return current_index-1
    
def move_right(current_state):
    return current_index+1



In [2016]:
def transition_matrix(n_states, n_actions):

    P = np.zeros((n_actions,n_states,n_states))
    for i in range(1,n_states-1):
        # move left :
        P[0,i,i] = 0. # 0.1
        if i-1>=0:
            P[0,i,i-1] = 1. #0.8
        if i-2>=0:
            P[0,i,i-2] = 0. #0.1

        # move right
        P[1,i,i] = 0. #0.1
        if i+1<n_states:
            P[1,i,i+1] = 1. #0.8
        if i+2<n_states:
            P[1,i,i+2] = 0. #0.1
            
    # Terminal states
    P[:,0,0]=1.
    P[:,n_states-1,n_states-1]=1.
    
    return P

def reward_matrix(n_states, n_actions):
    R = -1*np.ones((n_actions, n_states))
    R[0,1] = np.iinfo(np.int).min
    R[1,n_states-2] = 10
    R[:,0] = 0.
    R[:,n_states-1] = 0.
    return R
    
def make_random_policy(n_states, n_actions): 
    random_policy = np.ones((n_actions, n_states))/n_actions
    # Terminal states
    random_policy[:,0]=0.
    random_policy[:,n_states-1]=0.
    return random_policy

In [2017]:
n_states = 19
states = list(range(n_states))
actions = [move_left, move_right]
n_actions = len(actions)
P = transition_matrix(n_states, n_actions)
R = reward_matrix(n_states, n_actions)
random_policy = make_random_policy(n_states, n_actions)


### Policy Evaluation

Given a policy $\pi$ we want to evaluate the value function $v_{\pi}$. To do so we are going to use the Bellman expectation equation in an iterative manner to improve our value function :
$$ v_{k+1} = \sum_{a∈A}\pi(a/s)(R^a_s +\gamma \sum_{s'∈S}P^a_{ss'}v_{k}(s')) $$

In [2021]:
def markov_reward_process(R, P, policy):
    n_actions = P.shape[0]
    n_states = P.shape[1]
    P_ = np.zeros_like(P[0])
    R_ = np.sum(R*policy,axis=0)
    for s in range(n_states):
        P_[s,:] = policy[:,s] @ P[:,s,:]
    return R_, P_

def mdp_policy_evaluation(R, P, policy, epsilon= 10**-4, n_iter_max=2000, gamma=1, verbose=False):
    n_states = P.shape[1]
    R_, P_ = markov_reward_process(R, P, policy)
    V_k = np.zeros(n_states)
    delta = epsilon+1
    n_iter=1
    while delta>epsilon and n_iter<=n_iter_max:
        V_k_1 = R_ + gamma * P_ @ V_k
        delta = max(epsilon, np.max(np.abs(V_k_1-V_k)))
        V_k = V_k_1
        n_iter+=1
    if verbose:
        if delta>epsilon:
            print("Warning : Policy evaluation did not converge after %d steps"%(n_iter-1))
            print("delta : {}".format(delta))
        else:
            print("Policy evaluation did converge after %d steps"%(n_iter-1))
    return V_k_1

In [2023]:
v = mdp_policy_evaluation(R, P, random_policy, gamma=1,verbose=True)

Policy evaluation did converge after 1748 steps


## Policy Iteration

We have seen how to evaluate the value function $v_{\pi}$ of a given policy $\pi$ with respect to an existing MDP.
Once we have $v_{\pi}$ we can use it to improve or policy by acting greedily w.r.t $v_{\pi}$, i.e. we choose the action which leads to a state with maximal value function :
$$ \pi'(s) = arg\max_{a∈A}(R^a_{s}+\gamma \sum_{s'}P^a_{ss'}v_{\pi}(s'))$$


In [2034]:
def greedy(v,R,P,gamma):
    eps = np.finfo(np.float64).resolution
    n_actions = P.shape[0]
    n_states = P.shape[1]
    new_policy = np.zeros_like(R)
    Q = np.zeros_like(R)
    
    for s in range(1,n_states-1):
        for a in range(n_actions):           
            Q[a,s] = R[a,s] + gamma * np.sum(P[a,s,:] * v)
    for s in range(1,n_states-1):
        best_actions = np.where(np.abs(Q[:,s]-np.max(Q[:,s]))<=eps)[0]
        n_best = len(best_actions)
        for a in best_actions:
            new_policy[a,s]= 1./n_best
    return new_policy,Q

def mdp_policy_iteration(R, P, epsilon= 10**-7, n_iter_max=200, gamma=1, verbose=False):
    old_policy = make_random_policy(n_states, n_actions)

    policy_stable = False
    n_iter=0
    while not policy_stable:
        n_iter+=1
        new_v = mdp_policy_evaluation(R, P, old_policy, epsilon= epsilon, n_iter_max=n_iter_max, gamma=gamma)
        new_policy,Q = greedy(new_v,R,P,gamma)
        policy_stable = np.array_equal(old_policy, new_policy)
        old_policy = new_policy
    if verbose:
        print("Policy iteration did converge after %d steps"%(n_iter))
    return new_policy
        

In [2030]:
optimal_policy = mdp_policy_iteration(R, P, gamma=1,verbose=True)

Policy iteration did converge after 2 steps


In [2031]:
mdp_policy_evaluation(R,P,optimal_policy)

array([ 0., -6., -5., -4., -3., -2., -1.,  0.,  1.,  2.,  3.,  4.,  5.,
        6.,  7.,  8.,  9., 10.,  0.])

## Value Iteration

We will now use directly a serie of value functions $v_1\rightarrow v_2 \dots \rightarrow v_k \rightarrow v_{k+1}$ without explicitly using a policy :
$$ v_{k+1}(s) = \max_{a∈A}(R^a_{s}+\gamma \sum_{s'}P^a_{ss'}v_k(s'))$$


In [2032]:
def mdp_value_iteration(R, P, epsilon= 10**-7, n_iter_max=200, gamma=1, verbose=False):
    V_k = np.zeros_like(R[0])
    delta = epsilon+1
    n_iter=1
    while delta>epsilon and n_iter<=n_iter_max:
        
        V_k_1 = np.max(R + gamma * P @ V_k,axis=0)
        delta = max(epsilon, np.max(np.abs(V_k_1-V_k)))
        V_k = V_k_1
        n_iter+=1
    return V_k_1
        
    
        

In [2033]:
mdp_policy_iteration(R, P)

array([ 0., -6., -5., -4., -3., -2., -1.,  0.,  1.,  2.,  3.,  4.,  5.,
        6.,  7.,  8.,  9., 10.,  0.])