In [1]:
import numpy as np
import copy

In [2]:
def initialization():
    '''
    Returns:
            V - a 2d array initialized as 0
            R - array containing rewards for each state
            P - array denoting equiprobable random policy
            states - array containing tuples of states in the gridworld
            terminal_states - array containing terminal states of the gridworld
    '''
    num_rows = 4
    num_cols = 4
    states = []
    for i in range(num_rows):
        for j in range(num_cols):
            states.append((i,j))
    terminal_states = [(0,0),(3,3)]
    V = np.zeros([4,4])
    R = {}
    P = {}
    for state in states:
        if state in terminal_states:
            R[state] = 0
            P[state] = []
        else:
            R[state] = -1
            P[state] = ['L','R','D','U']
    return V,R,P,states,terminal_states

In [3]:
def get_state(action, state):
  if action == 'L':
    new_state = (state[0], state[1]-1)
  if action == 'R':
    new_state = (state[0], state[1]+1)
  if action == 'U':
    new_state = (state[0]-1, state[1])
  if action == 'D':
    new_state = (state[0]+1, state[1])
  if new_state not in states:
    new_state = state
  return new_state

In [4]:
def RewardFunction(state, action, R, states):
    reward = R[state]
    finalPosition = get_state(action,state)
    return finalPosition, reward

In [5]:
def policy_evaluation(V,R,P,states,terminal_states):
    '''
    Arguments:
            V is a 2d array initialized as 0
            R is the array containing rewards for each state
            P is the policy taken by the agent
            states - array containing tuples of states in the gridworld
            terminal_states - array containing terminal states of the gridworld
    Returns:
            V - The value function calculated for the policy P
    '''
    num_iterations = 0
    V = {}
    for state in states:
        V[state] = 0
    gamma = 1
    while num_iterations < 2000:
        deltas = []
        copyV = copy.deepcopy(V)
        deltaState = []
        for state in states:
          new_reward = 0
          actions = P[state]
          for action in actions:
            final_state, reward = RewardFunction(state, action, R, states)
            new_reward += (1/len(actions))*(reward+(gamma*V[final_state]))          
          deltaState.append(np.abs(copyV[state]-new_reward))
          copyV[state] = new_reward
        deltas.append(deltaState)
        V = copyV
        num_iterations += 1
        info = False
        if info:
          if num_iterations in [0,1,2,9, 99, 1999]:
            print("Iteration {}".format(num_iterations+1))
            print(V)
            print("")
    return V

In [6]:
V,R,P1,states,terminal_states = initialization()

In [11]:
'''Testing Policy Evaluation


P_t = [ [[], ['L'], ['L'], ['L', 'D']], [['U'], ['L', 'U'], ['L', 'R', 'U', 'D'], ['D']], [['U'], ['L', 'R', 'U', 'D'], ['R', 'D'], ['D']], [['R', 'U'], ['R'], ['R'], []]]
P_test = {}
for row in range(4):
  for col in range(4):
    s = (row,col)
    P_test[s] =  P_t[row][col]
print(P_test)
v_test = policy_evaluation(V, R, P_test, states, terminal_states)
V2 = np.zeros((4,4))
for state in states:
    V2[state[0]][state[1]] = v_test[state]
print(V2)
'''

{(0, 0): [], (0, 1): ['L'], (0, 2): ['L'], (0, 3): ['L', 'D'], (1, 0): ['U'], (1, 1): ['L', 'U'], (1, 2): ['L', 'R', 'U', 'D'], (1, 3): ['D'], (2, 0): ['U'], (2, 1): ['L', 'R', 'U', 'D'], (2, 2): ['R', 'D'], (2, 3): ['D'], (3, 0): ['R', 'U'], (3, 1): ['R'], (3, 2): ['R'], (3, 3): []}
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]


In [8]:
def get_array(V, R, P, state):
  actions = P[state]
  V_array = []
  for action in actions:
    new_state = get_state(action, state)
    V_state = R[state] + V[new_state]
    V_array.append(round(V_state, 2))
  return V_array

def get_indices(arr, max_):
  indices = []
  for x in range(len(arr)):
    if arr[x] == max_:
      indices.append(x)
  return indices

In [9]:
def policy_improvement(V,R,P,P1,states,terminal_states):
    '''
    Arguments: 
            V is the value function
            R is the array containing rewards for each state
            P is the equiprobable random policy
            P1 is the previous optimal policy
            states - array containing tuples of states in the gridworld
            terminal_states - array containing terminal states of the gridworld
    Returns:
            P - Optimal policy after performing policy improvement
            policy_stable - bool variable denoting if P = P1
    '''
    policy_stable = False
    # Iterate over all states to find the optimal policy
    for state in states:
      V_array = get_array(V, R, P1, state)
      if len(V_array):
        max_ = max(V_array)
        indices = get_indices(V_array, max_)
        a = P1[state]
        P[state] = [a[indice] for indice in indices] 
    if P == P1:
      policy_stable = True
    return P,policy_stable

In [10]:
V = policy_evaluation(V,R,P1,states,terminal_states)

# Initial value function
V1 = np.zeros((4,4))
for state in states:
    V1[state[0]][state[1]] = V[state]
print("Initial value function is: ")
print(np.round(V1))

# Perform policy iteration until the policy doesn't change for any state in an iteration

while True:
    # Equiprobable random policy
    P = {}
    for state in states:
        if not (state in terminal_states):
            P[state] = ['L','R','U','D']
        else:
            P[state] = []

    P1,policy_stable = policy_improvement(V,R,P,P1,states,terminal_states)
    # If policy stable is true, the policy hasn't changed for any state in an iteration
    if policy_stable:
        break
    V = policy_evaluation(V,R,P1,states,terminal_states)

# Print optimal value function
print("\nOptimal value function is: ")
V1 = np.zeros((4,4))
for state in states:
    V1[state[0]][state[1]] = V[state]
print(np.round(V1))


# Print optimal policy
# Each cell denotes the optimal action that needs to be taken from that state
P = [[[],[],[],[]],
     [[],[],[],[]],
     [[],[],[],[]],
     [[],[],[],[]]]
for v,a in P1.items():
    P[v[0]][v[1]] = a
print("\nThe optimal policy is:")
for row in P:
    print(row)


Initial value function is: 
[[  0. -14. -20. -22.]
 [-14. -18. -20. -20.]
 [-20. -20. -18. -14.]
 [-22. -20. -14.   0.]]

Optimal value function is: 
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]

The optimal policy is:
[[], ['L'], ['L'], ['L', 'D']]
[['U'], ['L', 'U'], ['L', 'D'], ['D']]
[['U'], ['R', 'U'], ['R', 'D'], ['D']]
[['R', 'U'], ['R'], ['R'], []]
