# Markov Decision Processes. 

This notebook describes a simple MDP class.  

It includes a constructor, value iteration and a procedure to extract a policy from the value function.

### Environments

We will construct two simple MDP environments in this notebook. The first will be the simple MDP, the second MDP will involve a classic grid world layout. 

In [1]:
import numpy as np
from numpy.linalg import inv

class MDP:
    def __init__(self, T, R, discount):
        """Constructor for the MDP class
        
        The constructor verifies that the inputs are valid and sets
        corresponding variables in an MDP object
        
        Args:
            T: Transition function, |A| x |S| x |S'| array
            R: Reward function, |A| x |S| array
            discount: discount factor, scalar in [0,1)
        """
        assert T.ndim == 3, "Invalid transition function: it should have 3 dimensions"
        self.nActions = T.shape[0]
        self.nStates = T.shape[1]
        assert T.shape == (self.nActions,self.nStates,self.nStates), "Invalid transition function: it has dimensionality " + repr(T.shape) + ", but it should be (nActions,nStates,nStates)"
        assert (abs(T.sum(2)-1) < 1e-5).all(), "Invalid transition function: some transition probability does not equal 1"
        self.T = T
        assert R.ndim == 2, "Invalid reward function: it should have 2 dimensions" 
        assert R.shape == (self.nActions,self.nStates), "Invalid reward function: it has dimensionality " + repr(R.shape) + ", but it should be (nActions,nStates)"
        self.R = R
        assert 0 <= discount < 1, "Invalid discount factor: it should be in [0,1)"
        self.discount = discount
        
        
    def valueIteration(self, initialV, nIterations=np.inf, tolerance=0.01, verbose=False):
        """Value iteration function
        
        Performs value iteration until convergence.
        
        Args:
            initialV: Initial value function, array of |S| entries (array of 0s)
            nIterations: limit on the # of iterations, scalar (default: infinity)
            tolerance: threshold on ||V^n-V^n+1||_inf, scalar (default: 0.01)
            verbose: set to true to print debugging logs

        Returns:
            A tuple of the value function, the number of iteration performed and the value of epsilon
        """
        # reshape initialV from |S| to |S| x 1
        V = initialV.reshape((self.nStates, 1)) 
        iterId = 0
        epsilon = 0
        
        # expand dims of the rewards to |A| x |S| x 1
        R_expanded = np.expand_dims(self.R, axis=2)
        
        # Note: I count the 0th iteration as an iteration. So if iterId = 5 => 6 iterations have already occurred 
        while iterId < nIterations:
            # self.T is |A| x |S| x |S|
            # V is |S| x 1 => we need to expand dim 0 of V and duplicate 0th dim 
            # self.nActions number of times for matmul to work.
            V_expanded = np.expand_dims(V, axis=0)
            # |A| x |S| x 1
            V_expanded = np.repeat(V_expanded, self.nActions, axis=0)
            # matmul(self.T, V) = |A| x |S| x 1
            V_new = self.discount * np.matmul(self.T, V_expanded)
            # add the reward for each state action pair
            V_new += R_expanded
            # take max through first dim to get to |S| x 1
            V_new = np.max(V_new, axis=0)
            
            
            # calculate epsilon 
            epsilon = np.sum(abs(V_new - V))
            
            if verbose: 
                print(f"iteration: {iterId}, epsilon: {epsilon}")
                print(f"V_new shape: {V_new.shape}, V_new:\n{V_new}")
            
            # check thresh
            if epsilon <= tolerance:
                break
                                    
            # update iteration num
            iterId += 1      
            V = V_new
            
        return [V.reshape(self.nStates), iterId, epsilon]
    
    
    def extractPolicy(self, V):
        """Procedure to extract a policy from a value function
        
        Args:
            V: Value function, array of |S| entries

        Returns:
            The policy, an array of |S| entries
        """
        # calculate gamma * matmul(T^a V)
        V_expanded = np.expand_dims(V.reshape((self.nStates, 1)) , axis=0)
        V_expanded = np.repeat(V_expanded, self.nActions, axis=0)
        future_rewards = self.discount * np.matmul(self.T, V_expanded)
        future_rewards += np.expand_dims(self.R, axis=2)

        return np.argmax(future_rewards, axis=0).reshape(self.nStates) 

### Environment: Advertise or Save

This code will run value iteration on a simple MDP

In [2]:
# Transition function: |A| x |S| x |S'| array
T = np.array([[[0.5,0.5,0,0],[0,1,0,0],[0.5,0.5,0,0],[0,1,0,0]],[[1,0,0,0],[0.5,0,0,0.5],[0.5,0,0.5,0],[0,0,0.5,0.5]]])


# Reward function: |A| x |S| array
R = np.array([[0,0,10,10],[0,0,10,10]])


# Discount factor: scalar in [0,1)
discount = 0.9        


# MDP object
mdp = MDP(T,R,discount)


# Compute value function by value iteration
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates), nIterations=6, verbose=True)


# extract policy
policy = mdp.extractPolicy(V)
print(f'Policy: {policy}')

iteration: 0, epsilon: 20.0
V_new shape: (4, 1), V_new:
[[ 0.]
 [ 0.]
 [10.]
 [10.]]
iteration: 1, epsilon: 18.0
V_new shape: (4, 1), V_new:
[[ 0. ]
 [ 4.5]
 [14.5]
 [19. ]]
iteration: 2, epsilon: 14.175000000000002
V_new shape: (4, 1), V_new:
[[ 2.025]
 [ 8.55 ]
 [16.525]
 [25.075]]
iteration: 3, epsilon: 11.846249999999998
V_new shape: (4, 1), V_new:
[[ 4.75875]
 [12.195  ]
 [18.3475 ]
 [28.72   ]]
iteration: 4, epsilon: 10.251562500000002
V_new shape: (4, 1), V_new:
[[ 7.6291875]
 [15.0654375]
 [20.3978125]
 [31.180375 ]]
iteration: 5, epsilon: 9.226406249999997
V_new shape: (4, 1), V_new:
[[10.21258125]
 [17.46430313]
 [22.61215   ]
 [33.21018437]]
Policy: [0 1 1 1]


In [3]:
# Transition function: |A| x |S| x |S'| array
T = np.zeros([4,17,17])
a = 0.7  # intended move
b = 0.15  # lateral move

# up (a = 0)

T[0,0,0] = a+b
T[0,0,1] = b

T[0,1,0] = b
T[0,1,1] = a
T[0,1,2] = b

T[0,2,1] = b
T[0,2,2] = a
T[0,2,3] = b

T[0,3,2] = b
T[0,3,3] = a+b

T[0,4,4] = b
T[0,4,0] = a
T[0,4,5] = b

T[0,5,4] = b
T[0,5,1] = a
T[0,5,6] = b

T[0,6,5] = b
T[0,6,2] = a
T[0,6,7] = b

T[0,7,6] = b
T[0,7,3] = a
T[0,7,7] = b

T[0,8,8] = b
T[0,8,4] = a
T[0,8,9] = b

T[0,9,8] = b
T[0,9,5] = a
T[0,9,10] = b

T[0,10,9] = b
T[0,10,6] = a
T[0,10,11] = b

T[0,11,10] = b
T[0,11,7] = a
T[0,11,11] = b

T[0,12,12] = b
T[0,12,8] = a
T[0,12,13] = b

T[0,13,12] = b
T[0,13,9] = a
T[0,13,14] = b

T[0,14,16] = 1

T[0,15,11] = a
T[0,15,14] = b
T[0,15,15] = b

T[0,16,16] = 1

# down (a = 1)

T[1,0,0] = b
T[1,0,4] = a
T[1,0,1] = b

T[1,1,0] = b
T[1,1,5] = a
T[1,1,2] = b

T[1,2,1] = b
T[1,2,6] = a
T[1,2,3] = b

T[1,3,2] = b
T[1,3,7] = a
T[1,3,3] = b

T[1,4,4] = b
T[1,4,8] = a
T[1,4,5] = b

T[1,5,4] = b
T[1,5,9] = a
T[1,5,6] = b

T[1,6,5] = b
T[1,6,10] = a
T[1,6,7] = b

T[1,7,6] = b
T[1,7,11] = a
T[1,7,7] = b

T[1,8,8] = b
T[1,8,12] = a
T[1,8,9] = b

T[1,9,8] = b
T[1,9,13] = a
T[1,9,10] = b

T[1,10,9] = b
T[1,10,14] = a
T[1,10,11] = b

T[1,11,10] = b
T[1,11,15] = a
T[1,11,11] = b

T[1,12,12] = a+b
T[1,12,13] = b

T[1,13,12] = b
T[1,13,13] = a
T[1,13,14] = b

T[1,14,16] = 1

T[1,15,14] = b
T[1,15,15] = a+b

T[1,16,16] = 1

# left (a = 2)

T[2,0,0] = a+b
T[2,0,4] = b

T[2,1,1] = b
T[2,1,0] = a
T[2,1,5] = b

T[2,2,2] = b
T[2,2,1] = a
T[2,2,6] = b

T[2,3,3] = b
T[2,3,2] = a
T[2,3,7] = b

T[2,4,0] = b
T[2,4,4] = a
T[2,4,8] = b

T[2,5,1] = b
T[2,5,4] = a
T[2,5,9] = b

T[2,6,2] = b
T[2,6,5] = a
T[2,6,10] = b

T[2,7,3] = b
T[2,7,6] = a
T[2,7,11] = b

T[2,8,4] = b
T[2,8,8] = a
T[2,8,12] = b

T[2,9,5] = b
T[2,9,8] = a
T[2,9,13] = b

T[2,10,6] = b
T[2,10,9] = a
T[2,10,14] = b

T[2,11,7] = b
T[2,11,10] = a
T[2,11,15] = b

T[2,12,8] = b
T[2,12,12] = a+b

T[2,13,9] = b
T[2,13,12] = a
T[2,13,13] = b

T[2,14,16] = 1

T[2,15,11] = b
T[2,15,14] = a
T[2,15,15] = b

T[2,16,16] = 1

# right (a = 3)

T[3,0,0] = b
T[3,0,1] = a
T[3,0,4] = b

T[3,1,1] = b
T[3,1,2] = a
T[3,1,5] = b

T[3,2,2] = b
T[3,2,3] = a
T[3,2,6] = b

T[3,3,3] = a+b
T[3,3,7] = b

T[3,4,0] = b
T[3,4,5] = a
T[3,4,8] = b

T[3,5,1] = b
T[3,5,6] = a
T[3,5,9] = b

T[3,6,2] = b
T[3,6,7] = a
T[3,6,10] = b

T[3,7,3] = b
T[3,7,7] = a
T[3,7,11] = b

T[3,8,4] = b
T[3,8,9] = a
T[3,8,12] = b

T[3,9,5] = b
T[3,9,10] = a
T[3,9,13] = b

T[3,10,6] = b
T[3,10,11] = a
T[3,10,14] = b

T[3,11,7] = b
T[3,11,11] = a
T[3,11,15] = b

T[3,12,8] = b
T[3,12,13] = a
T[3,12,12] = b

T[3,13,9] = b
T[3,13,14] = a
T[3,13,13] = b

T[3,14,16] = 1

T[3,15,11] = b
T[3,15,15] = a+b

T[3,16,16] = 1


# Reward function: |A| x |S| array
R = -1 * np.ones([4,17])


# set rewards
R[:,14] = 100  # goal state
R[:,9] = -70   # bad state
R[:,16] = 0    # end state


# Discount factor: scalar in [0,1)
discount = 0.95
        
# MDP object
mdp = MDP(T,R,discount)

## MDP in a maze environment

This code will run your value iteration algorithm on the maze problem.  It will display the number of iterations, the value function and policy found.s

In [4]:
#value iteration
[V, nIterations, epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates),nIterations=10000,tolerance=0.01)
print(f'Number of iterations: {nIterations}')
print('Value Function')

with np.printoptions(precision=2):
    print(np.reshape(V[:16],(4,4)))
print('Policy:')
actions = np.array(['U','D','L','R'])
policy = mdp.extractPolicy(V)
print(np.reshape(actions[policy[:16]],(4,4)))

Number of iterations: 30
Value Function
[[ 49.68  55.28  61.58  65.88]
 [ 48.03  52.32  68.14  73.26]
 [ 50.23  -0.42  77.07  81.36]
 [ 66.36  76.31 100.    89.91]]
Policy:
[['R' 'R' 'D' 'D']
 ['R' 'U' 'D' 'D']
 ['D' 'R' 'R' 'D']
 ['R' 'R' 'U' 'L']]
