In [19]:
# MDP Code

In [20]:
import numpy as np

class MDP:
    '''A simple MDP class.  It includes the following members'''

    def __init__(self,T,R,discount):
        '''Constructor for the MDP class

        Inputs:
        T -- Transition function: |A| x |S| x |S'| array
        R -- Reward function: |A| x |S| array
        discount -- discount factor: scalar in [0,1)

        The constructor verifies that the inputs are valid and sets
        corresponding variables in a MDP object'''

        assert T.ndim == 3, "Invalid transition function: it should have 3 dimensions"
        self.nActions = T.shape[0]
        self.nStates = T.shape[1]
        assert T.shape == (self.nActions,self.nStates,self.nStates), "Invalid transition function: it has dimensionality " + repr(T.shape) + ", but it should be (nActions,nStates,nStates)"
        assert (abs(T.sum(2)-1) < 1e-5).all(), "Invalid transition function: some transition probability does not equal 1"
        self.T = T
        assert R.ndim == 2, "Invalid reward function: it should have 2 dimensions" 
        assert R.shape == (self.nActions,self.nStates), "Invalid reward function: it has dimensionality " + repr(R.shape) + ", but it should be (nActions,nStates)"
        self.R = R
        assert 0 <= discount < 1, "Invalid discount factor: it should be in [0,1)"
        self.discount = discount
        
    def valueIteration(self,initialV,nIterations=np.inf,tolerance=0.01):
        '''Value iteration procedure
        V <-- max_a R^a + gamma T^a V

        Inputs:
        initialV -- Initial value function: array of |S| entries
        nIterations -- limit on the # of iterations: scalar (default: infinity)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        V -- Value function: array of |S| entries
        iterId -- # of iterations performed: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''
        
        # temporary values to ensure that the code compiles until this
        # function is coded
        # V = np.zeros(self.nStates)
        print('Value iteration...')
        V = initialV
        iterId = 0
        epsilon = 0
        while iterId < nIterations:
            new_V = (self.R + self.discount * self.T @ V).max(axis=0)
            assert new_V.shape == V.shape, f"{new_V.shape} should be {V.shape}"
            epsilon = np.linalg.norm(new_V - V, np.inf)
            V = new_V
            iterId += 1
            if epsilon < tolerance:
                break
#             if iterId % 100 == 0:
#                 print(f'iteration {iterId},\t epsilon = {epsilon}')
            
        print(f'[V,iterId,epsilon] = {[V,iterId,epsilon]}')
        return [V,iterId,epsilon]

    def extractPolicy(self,V):
        '''Procedure to extract a policy from a value function
        pi <-- argmax_a R^a + gamma T^a V

        Inputs:
        V -- Value function: array of |S| entries

        Output:
        policy -- Policy: array of |S| entries'''

        policy = np.argmax(self.R + self.discount * (self.T @ V), axis=0)
        # assert policy.shape == V.shape
        return policy

    def evaluatePolicy(self,policy):
        '''Evaluate a policy by solving a system of linear equations
        V^pi = R^pi + gamma T^pi V^pi

        Input:
        policy -- Policy: array of |S| entries

        Ouput:
        V -- Value function: array of |S| entries'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        # V = np.zeros(self.nStates)
        # print(f'policy: {policy.shape}, {policy}')

        pi_state = np.arange(self.nStates)
        R_pi = self.R[policy, pi_state]
        # print(f'R_pi: {R_pi.shape}, R: {self.R.shape}')

        T_pi = self.T[policy, pi_state]
        # print(f'T_pi: {T_pi.shape}, T: {self.T.shape}')

        A = np.eye(self.nStates) - self.discount * T_pi
        # print(f'A: {A.shape}')

        V = np.linalg.solve(A, R_pi)
        # assert V.shape == policy.shape, f"got {V.shape} but expected {policy.shape}"
        return V
        
    def policyIteration(self,initialPolicy,nIterations=np.inf):
        '''Policy iteration procedure: alternate between policy
        evaluation (solve V^pi = R^pi + gamma T^pi V^pi) and policy
        improvement (pi <-- argmax_a R^a + gamma T^a V^pi).

        Inputs:
        initialPolicy -- Initial policy: array of |S| entries
        nIterations -- limit on # of iterations: scalar (default: inf)

        Outputs: 
        policy -- Policy: array of |S| entries
        V -- Value function: array of |S| entries
        iterId -- # of iterations peformed by modified policy iteration: scalar'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        # policy = np.zeros(self.nStates)
        policy = initialPolicy
        V = np.zeros(self.nStates)
        iterId = 0
        while iterId < nIterations:
            V = self.evaluatePolicy(policy)
            new_policy = self.extractPolicy(V)
            if (new_policy == policy).all():
                break
            assert new_policy.shape == policy.shape
            policy = new_policy
            iterId += 1

        return [policy,V,iterId]
            
    def evaluatePolicyPartially(self,policy,initialV,nIterations=np.inf,tolerance=0.01):
        '''Partial policy evaluation:
        Repeat V^pi <-- R^pi + gamma T^pi V^pi

        Inputs:
        policy -- Policy: array of |S| entries
        initialV -- Initial value function: array of |S| entries
        nIterations -- limit on the # of iterations: scalar (default: infinity)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        V -- Value function: array of |S| entries
        iterId -- # of iterations performed: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''

        policy = policy.astype(int)

        V = initialV
        iterId = 0
        epsilon = np.inf
        pi_state = np.arange(self.nStates)

        R_pi = self.R[policy, pi_state]
        # print(f'R_pi: {R_pi.shape}, R: {self.R.shape}')

        T_pi = self.T[policy, pi_state]
        # print(f'T_pi: {T_pi.shape}, T: {self.T.shape}')
        while iterId < nIterations and epsilon > tolerance:
            new_V = R_pi + self.discount * T_pi @ V
            delta_V = new_V - V
            epsilon = np.linalg.norm(delta_V, np.inf)
            # print(f'epsilon = {epsilon}')

            V = new_V
            iterId += 1

        return [V,iterId,epsilon]

    def modifiedPolicyIteration(self,initialPolicy,initialV,nEvalIterations=5,nIterations=np.inf,tolerance=0.01):
        '''Modified policy iteration procedure: alternate between
        partial policy evaluation (repeat a few times V^pi <-- R^pi + gamma T^pi V^pi)
        and policy improvement (pi <-- argmax_a R^a + gamma T^a V^pi)

        Inputs:
        initialPolicy -- Initial policy: array of |S| entries
        initialV -- Initial value function: array of |S| entries
        nEvalIterations -- limit on # of iterations to be performed in each partial policy evaluation: scalar (default: 5)
        nIterations -- limit on # of iterations to be performed in modified policy iteration: scalar (default: inf)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        policy -- Policy: array of |S| entries
        V -- Value function: array of |S| entries
        iterId -- # of iterations peformed by modified policy iteration: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        policy = np.zeros(self.nStates)
        V = np.zeros(self.nStates)
        iterId = 0
        epsilon = np.inf

        while iterId < nIterations and epsilon > tolerance:
            V, _, _ = self.evaluatePolicyPartially(policy, V,  nIterations=nEvalIterations)
            policy = self.extractPolicy(V)
            new_V = (self.R + self.discount * self.T @ V).max(axis=0)
            epsilon = np.linalg.norm(new_V - V, np.inf)
            iterId += 1

        return [policy,V,iterId,epsilon]
        


# Maze

In [21]:
''' Construct a simple maze MDP

  Grid world layout:

  ---------------------
  |  0 |  1 |  2 |  3 |
  ---------------------
  |  4 |  5 |  6 |  7 |
  ---------------------
  |  8 |  9 | 10 | 11 |
  ---------------------
  | 12 | 13 | 14 | 15 |
  ---------------------

  Goal state: 11 
  Bad state: 9
  End state: 16

  The end state is an absorbing state that the agent transitions 
  to after visiting the goal state.

  There are 17 states in total (including the end state) 
  and 4 actions (up, down, left, right).'''

# Transition function: |A| x |S| x |S'| array
T = np.zeros([4,17,17])
a = 0.8;  # intended move
b = 0.1;  # lateral move

# up (a = 0)

T[0,0,0] = a+b;
T[0,0,1] = b;

T[0,1,0] = b;
T[0,1,1] = a;
T[0,1,2] = b;

T[0,2,1] = b;
T[0,2,2] = a;
T[0,2,3] = b;

T[0,3,2] = b;
T[0,3,3] = a+b;

T[0,4,4] = b;
T[0,4,0] = a;
T[0,4,5] = b;

T[0,5,4] = b;
T[0,5,1] = a;
T[0,5,6] = b;

T[0,6,5] = b;
T[0,6,2] = a;
T[0,6,7] = b;

T[0,7,6] = b;
T[0,7,3] = a;
T[0,7,7] = b;

T[0,8,8] = b;
T[0,8,4] = a;
T[0,8,9] = b;

T[0,9,8] = b;
T[0,9,5] = a;
T[0,9,10] = b;

T[0,10,9] = b;
T[0,10,6] = a;
T[0,10,11] = b;

T[0,11,16] = 1;

T[0,12,12] = b;
T[0,12,8] = a;
T[0,12,13] = b;

T[0,13,12] = b;
T[0,13,9] = a;
T[0,13,14] = b;

T[0,14,13] = b;
T[0,14,10] = a;
T[0,14,15] = b;

T[0,15,14] = b;
T[0,15,11] = a;
T[0,15,15] = b;

T[0,16,16] = 1;

# down (a = 1)

T[1,0,0] = b;
T[1,0,4] = a;
T[1,0,1] = b;

T[1,1,0] = b;
T[1,1,5] = a;
T[1,1,2] = b;

T[1,2,1] = b;
T[1,2,6] = a;
T[1,2,3] = b;

T[1,3,2] = b;
T[1,3,7] = a;
T[1,3,3] = b;

T[1,4,4] = b;
T[1,4,8] = a;
T[1,4,5] = b;

T[1,5,4] = b;
T[1,5,9] = a;
T[1,5,6] = b;

T[1,6,5] = b;
T[1,6,10] = a;
T[1,6,7] = b;

T[1,7,6] = b;
T[1,7,11] = a;
T[1,7,7] = b;

T[1,8,8] = b;
T[1,8,12] = a;
T[1,8,9] = b;

T[1,9,8] = b;
T[1,9,13] = a;
T[1,9,10] = b;

T[1,10,9] = b;
T[1,10,14] = a;
T[1,10,11] = b;

T[1,11,16] = 1;

T[1,12,12] = a+b;
T[1,12,13] = b;

T[1,13,12] = b;
T[1,13,13] = a;
T[1,13,14] = b;

T[1,14,13] = b;
T[1,14,14] = a;
T[1,14,15] = b;

T[1,15,14] = b;
T[1,15,15] = a+b;

T[1,16,16] = 1;

# left (a = 2)

T[2,0,0] = a+b;
T[2,0,4] = b;

T[2,1,1] = b;
T[2,1,0] = a;
T[2,1,5] = b;

T[2,2,2] = b;
T[2,2,1] = a;
T[2,2,6] = b;

T[2,3,3] = b;
T[2,3,2] = a;
T[2,3,7] = b;

T[2,4,0] = b;
T[2,4,4] = a;
T[2,4,8] = b;

T[2,5,1] = b;
T[2,5,4] = a;
T[2,5,9] = b;

T[2,6,2] = b;
T[2,6,5] = a;
T[2,6,10] = b;

T[2,7,3] = b;
T[2,7,6] = a;
T[2,7,11] = b;

T[2,8,4] = b;
T[2,8,8] = a;
T[2,8,12] = b;

T[2,9,5] = b;
T[2,9,8] = a;
T[2,9,13] = b;

T[2,10,6] = b;
T[2,10,9] = a;
T[2,10,14] = b;

T[2,11,16] = 1;

T[2,12,8] = b;
T[2,12,12] = a+b;

T[2,13,9] = b;
T[2,13,12] = a;
T[2,13,13] = b;

T[2,14,10] = b;
T[2,14,13] = a;
T[2,14,14] = b;

T[2,15,11] = b;
T[2,15,14] = a;
T[2,15,15] = b;

T[2,16,16] = 1;

# right (a = 3)

T[3,0,0] = b;
T[3,0,1] = a;
T[3,0,4] = b;

T[3,1,1] = b;
T[3,1,2] = a;
T[3,1,5] = b;

T[3,2,2] = b;
T[3,2,3] = a;
T[3,2,6] = b;

T[3,3,3] = a+b;
T[3,3,7] = b;

T[3,4,0] = b;
T[3,4,5] = a;
T[3,4,8] = b;

T[3,5,1] = b;
T[3,5,6] = a;
T[3,5,9] = b;

T[3,6,2] = b;
T[3,6,7] = a;
T[3,6,10] = b;

T[3,7,3] = b;
T[3,7,7] = a;
T[3,7,11] = b;

T[3,8,4] = b;
T[3,8,9] = a;
T[3,8,12] = b;

T[3,9,5] = b;
T[3,9,10] = a;
T[3,9,13] = b;

T[3,10,6] = b;
T[3,10,11] = a;
T[3,10,14] = b;

T[3,11,16] = 1;

T[3,12,8] = b;
T[3,12,13] = a;
T[3,12,12] = b;

T[3,13,9] = b;
T[3,13,14] = a;
T[3,13,13] = b;

T[3,14,10] = b;
T[3,14,15] = a;
T[3,14,14] = b;

T[3,15,11] = b;
T[3,15,15] = a+b;

T[3,16,16] = 1;

# Reward function: |A| x |S| array
R = -1 * np.ones([4,17]);

# set rewards
R[:,11] = 100;  # goal state
R[:,9] = -70;   # bad state
R[:,16] = 0;    # end state

# Discount factor: scalar in [0,1)
discount = 0.95


In [22]:

# MDP object
mdp =MDP(T,R,discount)


# Test Each Procedure

In [27]:
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates),
                                             tolerance=0.01)


Value iteration...
[V,iterId,epsilon] = [array([ 66.4750799 ,  72.35601113,  78.52647962,  84.21513533,
        64.96977905,  71.61263898,  84.87838464,  91.78276307,
        55.06163193,  12.96639802,  91.19633961, 100.        ,
        65.26377228,  72.14936837,  85.60984747,  91.85958892,
         0.        ]), 18, 0.005916217653677336]


In [28]:
[policy,V,nIterations] = mdp.policyIteration(np.zeros(mdp.nStates,dtype=int))

In [29]:
policy, V, nIterations

(array([3, 3, 3, 1, 3, 3, 3, 1, 1, 3, 3, 0, 3, 3, 3, 0, 0]),
 array([ 66.48026629,  72.35814698,  78.52739639,  84.21554544,
         64.97315039,  71.6137027 ,  84.87872847,  91.78284995,
         55.06432671,  12.96679046,  91.19641953, 100.        ,
         65.26496129,  72.14957754,  85.60989813,  91.85960257,
          0.        ]),
 3)

In [26]:
[policy,V,nIterations,epsilon] = mdp.modifiedPolicyIteration(np.zeros(mdp.nStates,dtype=int),np.zeros(mdp.nStates),tolerance=0.01)
policy, V, nIterations

(array([3, 3, 3, 1, 3, 3, 3, 1, 1, 3, 3, 0, 3, 3, 3, 0, 0]),
 array([ 66.47162218,  72.35454333,  78.52585639,  84.21485392,
         64.9677271 ,  71.61193002,  84.87814924,  91.78270463,
         55.06093332,  12.96615491,  91.19629119, 100.        ,
         65.26344413,  72.14929159,  85.60983295,  91.85958651,
          0.        ]),
 5)