# MDP

In [29]:
import numpy as np

class MDP:
    '''A simple MDP class.  It includes the following members'''

    def __init__(self,T,R,discount):
        '''Constructor for the MDP class

        Inputs:
        T -- Transition function: |A| x |S| x |S'| array
        R -- Reward function: |A| x |S| array
        discount -- discount factor: scalar in [0,1)

        The constructor verifies that the inputs are valid and sets
        corresponding variables in a MDP object'''

        assert T.ndim == 3, "Invalid transition function: it should have 3 dimensions"
        self.nActions = T.shape[0]
        self.nStates = T.shape[1]
        assert T.shape == (self.nActions,self.nStates,self.nStates), "Invalid transition function: it has dimensionality " + repr(T.shape) + ", but it should be (nActions,nStates,nStates)"
        assert (abs(T.sum(2)-1) < 1e-5).all(), "Invalid transition function: some transition probability does not equal 1"
        self.T = T
        assert R.ndim == 2, "Invalid reward function: it should have 2 dimensions" 
        assert R.shape == (self.nActions,self.nStates), "Invalid reward function: it has dimensionality " + repr(R.shape) + ", but it should be (nActions,nStates)"
        self.R = R
        assert 0 <= discount < 1, "Invalid discount factor: it should be in [0,1)"
        self.discount = discount
        
    def valueIteration(self,initialV,nIterations=np.inf,tolerance=0.01):
        '''Value iteration procedure
        V <-- max_a R^a + gamma T^a V

        Inputs:
        initialV -- Initial value function: array of |S| entries
        nIterations -- limit on the # of iterations: scalar (default: infinity)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        V -- Value function: array of |S| entries
        iterId -- # of iterations performed: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''
        
        V = initialV.copy()
        iterId = 0
        epsilon = np.inf

        while(iterId < nIterations and epsilon > tolerance):
            Ta_V = np.matmul(self.T, V)     # T和V矩陣相乘可一次性計算出了所有狀態下，採取所有動作的未來期望價值。
            All_values = self.R + (self.discount * Ta_V)   # 乘上gamma再加上reward可得|A| x |S|價值矩陣
            V_new = np.max((All_values), axis=0)       # 找出每個S的最大action value
            epsilon = np.max(np.abs(V_new - V))
            V = V_new
            iterId = iterId + 1

        return [V,iterId,epsilon]

    def extractPolicy(self,V):
        '''Procedure to extract a policy from a value function
        pi <-- argmax_a R^a + gamma T^a V

        Inputs:
        V -- Value function: array of |S| entries

        Output:
        policy -- Policy: array of |S| entries'''
        
        policy = np.zeros(self.nStates)

        # 計算|A| x |S|價值矩陣
        Ta_V = np.matmul(self.T, V)     
        All_values = self.R + (self.discount * Ta_V) 
        # 找出最大action value的位置
        policy = np.argmax(All_values, axis=0)
        return policy 

    def evaluatePolicy(self,policy):
        '''Evaluate a policy by solving a system of linear equations
        V^pi = R^pi + gamma T^pi V^pi

        Input:
        policy -- Policy: array of |S| entries

        Ouput:
        V -- Value function: array of |S| entries'''
        # 利用移項直接解V^pi = (I - gamma T^pi)^-1 R^pi
        # 先計算出policy的reward和狀態轉移矩陣
        R_policy = np.zeros(self.nStates)
        T_policy = np.zeros((self.nStates, self.nStates))
        for i in range(len(policy)):
            action = policy[i]
            R_policy[i] = self.R[action, i]
            T_policy[i] = self.T[action, i]
        gamma_T_policy = self.discount * T_policy
        # 算反矩陣(I - gamma T^pi)^-1
        invert = np.linalg.inv(np.identity(self.nStates) - gamma_T_policy)

        V = np.matmul(invert, R_policy)

        return V
        
    def policyIteration(self,initialPolicy,nIterations=np.inf):
        '''Policy iteration procedure: alternate between policy
        evaluation (solve V^pi = R^pi + gamma T^pi V^pi) and policy
        improvement (pi <-- argmax_a R^a + gamma T^a V^pi).

        Inputs:
        initialPolicy -- Initial policy: array of |S| entries
        nIterations -- limit on # of iterations: scalar (default: inf)

        Outputs: 
        policy -- Policy: array of |S| entries
        V -- Value function: array of |S| entries
        iterId -- # of iterations peformed by modified policy iteration: scalar'''

        policy = initialPolicy
        V = np.zeros(self.nStates)
        iterId = 0

        while(iterId < nIterations):
            V = self.evaluatePolicy(policy)
            new_policy = self.extractPolicy(V)
            iterId = iterId + 1
            if(np.array_equal(new_policy, policy)):
                break
            policy = new_policy.copy()

        return [policy,V,iterId]
            
    def evaluatePolicyPartially(self,policy,initialV,nIterations=np.inf,tolerance=0.01):
        '''Partial policy evaluation:
        Repeat V^pi <-- R^pi + gamma T^pi V^pi

        Inputs:
        policy -- Policy: array of |S| entries
        initialV -- Initial value function: array of |S| entries
        nIterations -- limit on the # of iterations: scalar (default: infinity)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        V -- Value function: array of |S| entries
        iterId -- # of iterations performed: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''

        
        V = initialV
        iterId = 0
        epsilon = np.inf
        # 先計算出policy的reward和狀態轉移矩陣
        R_policy = np.zeros(self.nStates)
        T_policy = np.zeros((self.nStates, self.nStates))
        for i in range(len(policy)):
            action = policy[i]
            R_policy[i] = self.R[action, i]
            T_policy[i] = self.T[action, i]
        # 計算value直到收斂
        while(iterId < nIterations and epsilon > tolerance):
            iterId += 1
            V_new = R_policy + self.discount * np.matmul(T_policy, V)
            epsilon = np.max(np.abs(V_new - V))
            V = V_new


        return [V,iterId,epsilon]

    def modifiedPolicyIteration(self,initialPolicy,initialV,nEvalIterations=5,nIterations=np.inf,tolerance=0.01):
        '''Modified policy iteration procedure: alternate between
        partial policy evaluation (repeat a few times V^pi <-- R^pi + gamma T^pi V^pi)
        and policy improvement (pi <-- argmax_a R^a + gamma T^a V^pi)

        Inputs:
        initialPolicy -- Initial policy: array of |S| entries
        initialV -- Initial value function: array of |S| entries
        nEvalIterations -- limit on # of iterations to be performed in each partial policy evaluation: scalar (default: 5)
        nIterations -- limit on # of iterations to be performed in modified policy iteration: scalar (default: inf)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        policy -- Policy: array of |S| entries
        V -- Value function: array of |S| entries
        iterId -- # of iterations peformed by modified policy iteration: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''

        policy = initialPolicy
        V = initialV
        iterId = 0
        epsilon = np.inf

        while(iterId < nIterations and epsilon > tolerance):
            iterId = iterId + 1
            V_new, _, _ = self.evaluatePolicyPartially(policy, V, nEvalIterations, tolerance)
            # extract Policy
            Ta_V = np.matmul(self.T, V_new)     
            All_values = self.R + (self.discount * Ta_V) 
            policy = np.argmax(All_values, axis=0)
            # use new policy to get value
            V_new_policy = [All_values[policy[i]][i] for i in range(len(policy))]
            epsilon = np.max(np.abs(V_new_policy - V_new))
            V = V_new_policy

        return [policy,V,iterId,epsilon]

# TestMDP.py

In [30]:
''' Construct simple MDP as described in Lecture 2a Slides 13-14'''
# Transition function: |A| x |S| x |S'| array
T = np.array([[[0.5,0.5,0,0],[0,1,0,0],[0.5,0.5,0,0],[0,1,0,0]],[[1,0,0,0],[0.5,0,0,0.5],[0.5,0,0.5,0],[0,0,0.5,0.5]]])
# Reward function: |A| x |S| array
R = np.array([[0,0,10,10],[0,0,10,10]])
# Discount factor: scalar in [0,1)
discount = 0.9        
# MDP object
mdp = MDP(T,R,discount)

'''Test each procedure'''
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates))
print(V,nIterations,epsilon)

policy = mdp.extractPolicy(V)
print(policy)

V = mdp.evaluatePolicy(np.array([1,0,1,0]))
print(V)

[policy,V,iterId] = mdp.policyIteration(np.array([0,0,0,0]))
print(policy,V,iterId)

[V,iterId,epsilon] = mdp.evaluatePolicyPartially(np.array([1,0,1,0]),np.array([0,10,0,13]))
print(V,iterId,epsilon)

[policy,V,iterId,tolerance] = mdp.modifiedPolicyIteration(np.array([1,0,1,0]),np.array([0,10,0,13]))
print(policy,V,iterId,tolerance)

[31.49636306 38.51527513 43.935435   54.1128575 ] 58 0.009860138819838937
[0 1 1 1]
[ 0.          0.         18.18181818 10.        ]
[0 1 1 1] [31.58510431 38.60401638 44.02417625 54.20159875] 2
[ 0.          0.08727964 18.18181818 10.08727964] 45 0.009697737297875264
[0 1 1 1] [np.float64(31.50605365934906), np.float64(38.524965727978426), np.float64(43.945125603197766), np.float64(54.12254810271034)] 11 0.00878340549812151


# TestMDPmaze.py.

In [31]:

''' Construct a simple maze MDP

  Grid world layout:

  ---------------------
  |  0 |  1 |  2 |  3 |
  ---------------------
  |  4 |  5 |  6 |  7 |
  ---------------------
  |  8 |  9 | 10 | 11 |
  ---------------------
  | 12 | 13 | 14 | 15 |
  ---------------------

  Goal state: 14 
  Bad state: 9
  End state: 16

  The end state is an absorbing state that the agent transitions 
  to after visiting the goal state.

  There are 17 states in total (including the end state) 
  and 4 actions (up, down, left, right).'''

# Transition function: |A| x |S| x |S'| array
T = np.zeros([4,17,17])
a = 0.8;  # intended move
b = 0.1;  # lateral move

# up (a = 0)

T[0,0,0] = a+b;
T[0,0,1] = b;

T[0,1,0] = b;
T[0,1,1] = a;
T[0,1,2] = b;

T[0,2,1] = b;
T[0,2,2] = a;
T[0,2,3] = b;

T[0,3,2] = b;
T[0,3,3] = a+b;

T[0,4,4] = b;
T[0,4,0] = a;
T[0,4,5] = b;

T[0,5,4] = b;
T[0,5,1] = a;
T[0,5,6] = b;

T[0,6,5] = b;
T[0,6,2] = a;
T[0,6,7] = b;

T[0,7,6] = b;
T[0,7,3] = a;
T[0,7,7] = b;

T[0,8,8] = b;
T[0,8,4] = a;
T[0,8,9] = b;

T[0,9,8] = b;
T[0,9,5] = a;
T[0,9,10] = b;

T[0,10,9] = b;
T[0,10,6] = a;
T[0,10,11] = b;

T[0,11,10] = b;
T[0,11,7] = a;
T[0,11,11] = b;

T[0,12,12] = b;
T[0,12,8] = a;
T[0,12,13] = b;

T[0,13,12] = b;
T[0,13,9] = a;
T[0,13,14] = b;

T[0,14,16] = 1;

T[0,15,11] = a;
T[0,15,14] = b;
T[0,15,15] = b;

T[0,16,16] = 1;

# down (a = 1)

T[1,0,0] = b;
T[1,0,4] = a;
T[1,0,1] = b;

T[1,1,0] = b;
T[1,1,5] = a;
T[1,1,2] = b;

T[1,2,1] = b;
T[1,2,6] = a;
T[1,2,3] = b;

T[1,3,2] = b;
T[1,3,7] = a;
T[1,3,3] = b;

T[1,4,4] = b;
T[1,4,8] = a;
T[1,4,5] = b;

T[1,5,4] = b;
T[1,5,9] = a;
T[1,5,6] = b;

T[1,6,5] = b;
T[1,6,10] = a;
T[1,6,7] = b;

T[1,7,6] = b;
T[1,7,11] = a;
T[1,7,7] = b;

T[1,8,8] = b;
T[1,8,12] = a;
T[1,8,9] = b;

T[1,9,8] = b;
T[1,9,13] = a;
T[1,9,10] = b;

T[1,10,9] = b;
T[1,10,14] = a;
T[1,10,11] = b;

T[1,11,10] = b;
T[1,11,15] = a;
T[1,11,11] = b;

T[1,12,12] = a+b;
T[1,12,13] = b;

T[1,13,12] = b;
T[1,13,13] = a;
T[1,13,14] = b;

T[1,14,16] = 1;

T[1,15,14] = b;
T[1,15,15] = a+b;

T[1,16,16] = 1;

# left (a = 2)

T[2,0,0] = a+b;
T[2,0,4] = b;

T[2,1,1] = b;
T[2,1,0] = a;
T[2,1,5] = b;

T[2,2,2] = b;
T[2,2,1] = a;
T[2,2,6] = b;

T[2,3,3] = b;
T[2,3,2] = a;
T[2,3,7] = b;

T[2,4,0] = b;
T[2,4,4] = a;
T[2,4,8] = b;

T[2,5,1] = b;
T[2,5,4] = a;
T[2,5,9] = b;

T[2,6,2] = b;
T[2,6,5] = a;
T[2,6,10] = b;

T[2,7,3] = b;
T[2,7,6] = a;
T[2,7,11] = b;

T[2,8,4] = b;
T[2,8,8] = a;
T[2,8,12] = b;

T[2,9,5] = b;
T[2,9,8] = a;
T[2,9,13] = b;

T[2,10,6] = b;
T[2,10,9] = a;
T[2,10,14] = b;

T[2,11,7] = b;
T[2,11,10] = a;
T[2,11,15] = b;

T[2,12,8] = b;
T[2,12,12] = a+b;

T[2,13,9] = b;
T[2,13,12] = a;
T[2,13,13] = b;

T[2,14,16] = 1;

T[2,15,11] = a;
T[2,15,14] = b;
T[2,15,15] = b;

T[2,16,16] = 1;

# right (a = 3)

T[3,0,0] = b;
T[3,0,1] = a;
T[3,0,4] = b;

T[3,1,1] = b;
T[3,1,2] = a;
T[3,1,5] = b;

T[3,2,2] = b;
T[3,2,3] = a;
T[3,2,6] = b;

T[3,3,3] = a+b;
T[3,3,7] = b;

T[3,4,0] = b;
T[3,4,5] = a;
T[3,4,8] = b;

T[3,5,1] = b;
T[3,5,6] = a;
T[3,5,9] = b;

T[3,6,2] = b;
T[3,6,7] = a;
T[3,6,10] = b;

T[3,7,3] = b;
T[3,7,7] = a;
T[3,7,11] = b;

T[3,8,4] = b;
T[3,8,9] = a;
T[3,8,12] = b;

T[3,9,5] = b;
T[3,9,10] = a;
T[3,9,13] = b;

T[3,10,6] = b;
T[3,10,11] = a;
T[3,10,14] = b;

T[3,11,7] = b;
T[3,11,11] = a;
T[3,11,15] = b;

T[3,12,8] = b;
T[3,12,13] = a;
T[3,12,12] = b;

T[3,13,9] = b;
T[3,13,14] = a;
T[3,13,13] = b;

T[3,14,16] = 1;

T[3,15,11] = b;
T[3,15,15] = a+b;

T[3,16,16] = 1;

# Reward function: |A| x |S| array
R = -1 * np.ones([4,17]);

# set rewards
R[:,14] = 100;  # goal state
R[:,9] = -70;   # bad state
R[:,16] = 0;    # end state

# Discount factor: scalar in [0,1)
discount = 0.95
        
# MDP object
mdp = MDP(T,R,discount)

# Part1.1 value iteration
Policy: [3 3 1 1 1 3 1 1 1 1 1 2 3 3 0 1 0]  

Value Function: [ 46.29965697  51.42479664  57.30588155  56.31114662  45.65817215
  49.05317084  64.59396607  61.83943198  49.86781046  -1.60561036
  74.87615816  67.40578654  66.15113212  76.11799457 100.
  68.79302266   0.        ]  
  
Number of Iterations: 31

In [32]:
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates),tolerance=0.01)
policy = mdp.extractPolicy(V)
print("Policy " + str(policy))
print("Value Function: " + str(V))
print("Number of Iterations: " + str(nIterations))


Policy [3 3 1 1 1 3 1 1 1 1 1 2 3 3 0 0 0]
Value Function: [ 57.40981959  62.35501737  67.77963975  64.96284383  58.62088109
  62.31802989  74.58594186  70.20153227  63.3314941    7.47046958
  82.89054368  75.58836359  75.79654065  83.65711283 100.
  72.869372     0.        ]
Number of Iterations: 21


# Part1.2 policy iteration
Policy: [3 3 1 1 1 3 1 1 1 1 1 2 3 3 0 1 0]  

Value Function: [ 46.30882335  51.43356663  57.31344666  56.32405724  45.65960854
  49.05853583  64.59847407  61.85069615  49.86807965  -1.60514155
  74.87805791  67.41607427  66.15129469  76.11809601 100.
  68.83116883   0.        ]  
  
Number of Iterations: 4

In [33]:
[policy,V,nIterations] = mdp.policyIteration(np.zeros(mdp.nStates,dtype=int))
print("Policy " + str(policy))
print("Value Function: " + str(V))
print("Number of Iterations: " + str(nIterations))

Policy [3 3 1 1 1 3 1 1 1 1 1 2 3 3 0 0 0]
Value Function: [ 57.41579164  62.35823345  67.78138742  64.96471086  58.62141068
  62.31944032  74.58645751  70.2024099   63.33161821   7.47053089
  82.89063387  75.58877302  75.79659742  83.65712755 100.
  72.87012982   0.        ]
Number of Iterations: 4


# Part1.3 modified policy iteration
|  number of iterations in partial policy evaluation    | Number of Iterations to converge  |
|  ----  | ----  |
| 1 | 11  |
|  2 | 8  |
|  3 | 7  |
|  4 | 6  |
|  5 | 6  |
|  6 | 6  |
|  7 | 5  |
|  8 | 5  |
|  9 | 6  |
|  10 | 5  |

In [None]:
for i in range(1, 11):
    [policy,V,nIterations,epsilon] = mdp.modifiedPolicyIteration(np.zeros(mdp.nStates,dtype=int),np.zeros(mdp.nStates), nEvalIterations=i, tolerance=0.01)
    print("nEvalIterations = " + str(i) + " Number of Iterations: " + str(nIterations))

nEvalIterations = 1 Number of Iterations: 11
nEvalIterations = 2 Number of Iterations: 8
nEvalIterations = 3 Number of Iterations: 7
nEvalIterations = 4 Number of Iterations: 6
nEvalIterations = 5 Number of Iterations: 6
nEvalIterations = 6 Number of Iterations: 6
nEvalIterations = 7 Number of Iterations: 5
nEvalIterations = 8 Number of Iterations: 5
nEvalIterations = 9 Number of Iterations: 6
nEvalIterations = 10 Number of Iterations: 5


: 

# Part1.4 Discussion
從結果可看出，增加evaluatePolicyPartially()的迭代次數，可以有效地減少外部主迴圈收斂所需的總迭代次數。原因是當nEvalIterations很小時，此時的行為類似Value iteration，每一次的價值函數 V 都只是對當前策略的一個粗略估計，因此需要多次迭代才能找到最優解。隨著nEvalIterations增加，行為越來越類似Policy iteration，對當前策略的價值估計越發準確，因此迭代次數也隨之降低。