In [1]:
import pandas as pd
import itertools
import numpy as np

In [2]:
## Define the gridworld
M = 4 # no of rows starting from the top left corner 
N = 4
# State space
S = list(itertools.product(range(M), range(N)))
# Terminal states
terminal_states = [(0,0), (3,3)]
nstates = len(S)
# Action space
A = [(-1,0), (0,1), (1,0),(0,-1)] # N, E, S, W

In [3]:
## Function to return the transition probability
def P_t(start_state, action, end_state):
  # If start state is a terminal state
  if start_state in terminal_states:
    return 0

  # Otherwise calculate the new proposed state
  # according to the action
  proposed_state = tuple(np.array(start_state) + np.array(action))
  
  # If the proposed state is outside the grid (illegal action)
  if any(val in proposed_state for val in [M, N, -1]):
    if start_state == end_state:
      return 1
    else:
      return 0

  # Otherwise, if action is legal
  # Check if proposed state and end state are the same  
  if proposed_state == end_state:
    return 1
  else:
    return 0        

In [4]:
## Function to return reward
def reward(start_state, action, end_state):
  if start_state in terminal_states:
    return 0
  else:
    return -1  

In [5]:
## Creating a dictionary with:
## - keys from a list
## - values are empty lists
pi_optimal = dict.fromkeys(S, [])
print(pi_optimal)

{(0, 0): [], (0, 1): [], (0, 2): [], (0, 3): [], (1, 0): [], (1, 1): [], (1, 2): [], (1, 3): [], (2, 0): [], (2, 1): [], (2, 2): [], (2, 3): [], (3, 0): [], (3, 1): [], (3, 2): [], (3, 3): []}


In [9]:
## Synchronous value iteration to find optimal policy

gamma = 1.0
# Initialize optimal policy for all states
pi_optimal = dict.fromkeys(S, [])
# Randomly initialize state value function
v_old = np.zeros(nstates)
tol = 1e-05 # stopping tolerance
normdiff = np.inf

maxiter = 1000
iter = 1

while normdiff > tol and iter <= maxiter:
  v_new = np.ones(nstates)*(-np.inf)
  for i in range(len(S)):
    for a in range(len(A)):      
      innersum = 0
      for j in range(len(S)):
        innersum += P_t(S[i],A[a], S[j]) * (reward(S[i],A[a], S[j]) + gamma*v_old[j])
      if v_new[i] < innersum:
        v_new[i] = innersum
        pi_optimal[S[i]] = [] # clear list of actions 
        pi_optimal[S[i]].append(A[a])
      elif innersum == v_new[i]: 
        pi_optimal[S[i]].append(A[a])            
  iter = iter+1 
  normdiff = np.linalg.norm(v_new - v_old)
  v_old = np.copy(v_new)

In [10]:
## Print optimal policy
pi_optimal

{(0, 0): [(-1, 0), (0, 1), (1, 0), (0, -1)],
 (0, 1): [(0, -1)],
 (0, 2): [(0, -1)],
 (0, 3): [(1, 0), (0, -1)],
 (1, 0): [(-1, 0)],
 (1, 1): [(-1, 0), (0, -1)],
 (1, 2): [(-1, 0), (0, 1), (1, 0), (0, -1)],
 (1, 3): [(1, 0)],
 (2, 0): [(-1, 0)],
 (2, 1): [(-1, 0), (0, 1), (1, 0), (0, -1)],
 (2, 2): [(0, 1), (1, 0)],
 (2, 3): [(1, 0)],
 (3, 0): [(-1, 0), (0, 1)],
 (3, 1): [(0, 1)],
 (3, 2): [(0, 1)],
 (3, 3): [(-1, 0), (0, 1), (1, 0), (0, -1)]}

In [11]:
## Map actions to unicode characters representing arrows
d = {(-1, 0):'\u2191', (0, 1):'\u2192', (1, 0):'\u2193', (0, -1):'\u2190'}
for key, val in pi_optimal.items():
  pi_optimal[key] = ''.join([d.get(item) for item in val])

In [12]:
np.array(list(pi_optimal.values())).reshape((M, N))

array([['↑→↓←', '←', '←', '↓←'],
       ['↑', '↑←', '↑→↓←', '↓'],
       ['↑', '↑→↓←', '→↓', '↓'],
       ['↑→', '→', '→', '↑→↓←']], dtype='<U4')

In [13]:
v_new.reshape((M,N))

array([[ 0., -1., -2., -3.],
       [-1., -2., -3., -2.],
       [-2., -3., -2., -1.],
       [-3., -2., -1.,  0.]])

5 X 5 Gridworld :
(Thanks to Sriaknt sir for suggesting function based coding)

In [None]:
## Define the gridworld
M = 5 # no of rows starting from the top left corner 
N = 5
# State space
S = list(itertools.product(range(M), range(N)))
# Terminal states
terminal_states = [(0,0), (3,3)]
nstates = len(S)
gamma = 0.9
# Action space
A = [(-1,0), (0,1), (1,0),(0,-1)] # N, E, S, W

#Special States:

special_states = {'start':[(0, 1), (0, 3)], 'end':[(4, 1), (2, 3)]}
special_state_rewards = {(0, 1):10, (0, 3):5}


In [22]:
def action_to_unicode(action_dict):
  d = {(-1, 0):'\u2191', (0, 1):'\u2192', (1, 0):'\u2193', (0, -1):'\u2190'}
  for key, val in action_dict.items():
    action_dict[key] = ''.join([d.get(item) for item in val])
  return action_dict

In [23]:
def value_iteration(gridworld_func, P_t_func, reward_func):
  M, N, S, A, nstates, gamma, _, _ = gridworld_func()

  # Initialize optimal policy for all states
  pi_optimal = dict.fromkeys(S, [])
  # Randomly initialize state value function
  v_old = np.zeros(nstates)
  tol = 1e-05 # stopping tolerance
  normdiff = np.inf

  maxiter = 1000
  iter = 1

  while normdiff > tol and iter <= maxiter:
    v_new = np.ones(nstates)*(-np.inf)
    for i in range(len(S)):
      for a in range(len(A)):      
        innersum = 0
        for j in range(len(S)):
          innersum += P_t_func(S[i],A[a], S[j]) * (reward_func(S[i],A[a], S[j]) + gamma*v_old[j])         
        if v_new[i] < innersum:
          v_new[i] = innersum
          pi_optimal[S[i]] = [] # clear list of actions 
          pi_optimal[S[i]].append(A[a])
        elif innersum == v_new[i]: 
          pi_optimal[S[i]].append(A[a])
    iter = iter+1 
    normdiff = np.linalg.norm(v_new - v_old)
    v_old = np.copy(v_new)

  return v_new, pi_optimal

In [14]:
#Making a function (srikant sir idea):

## Define the 5 x 5 gridworld
def gridworld_5x5():
  M = 5 # no of rows starting from the top left corner 
  N = 5
  # State space
  S = list(itertools.product(range(M), range(N)))
  # Action space
  A = [(-1,0), (0,1), (1,0),(0,-1)] # N, E, S, W
  nstates = len(S)
  gamma = 0.9
  # Special states A, B and A', B' 
  special_states = {'start':[(0, 1), (0, 3)], 'end':[(4, 1), (2, 3)]}
  special_state_rewards = {(0, 1):10, (0, 3):5}

  return M, N, S, A, nstates, gamma, special_states, special_state_rewards

In [18]:
def P_t_5x5(start_state, action, end_state):
  M, N, S, A, nstates, gamma, special_states, _ = gridworld_5x5()

  if start_state in special_states['start']:
    index = special_states['start'].index(start_state)
    if end_state == special_states['end'][index]:
      return 1
    else:
      return 0  

  # calculate new proposed state
  # according to the action
  proposed_state = tuple(np.array(start_state) + np.array(action))
  
  # If the proposed state is outside the grid (illegal action)
  if (proposed_state not in S):
    if start_state == end_state:
      return 1
    else:
      return 0


  if proposed_state == end_state:
     return 1
  else:
     return 0

In [19]:
## Function to return reward
def reward_5x5(start_state, action, end_state):
  M, N, S, A, nstates, gamma, special_states, special_state_rewards = gridworld_5x5()
  
  if start_state in special_states['start']:
    return special_state_rewards[start_state]
    
  # calculate new proposed state
  # according to the action
  proposed_state = tuple(np.array(start_state) + np.array(action))
  
  # If the proposed state is outside the grid (illegal action)
  if (proposed_state not in S):
    return -1

  # Otherwise, if action is legal
  return 0

In [24]:
def optimal_policy_5x5():
  M, N, S, A, nstates, gamma, special_states, special_state_rewards = gridworld_5x5()
  v_new, pi_optimal = value_iteration(gridworld_5x5, P_t_5x5, reward_5x5)
  print(v_new.reshape((M,N)))
  print( 
      np.array(
          list(action_to_unicode(pi_optimal).values())
       ).reshape((M, N)) 
  )

In [25]:
optimal_policy_5x5()

[[21.97747666 24.41941851 21.97747666 19.41941851 17.47747666]
 [19.77972899 21.97747666 19.77972899 17.80175609 16.02157612]
 [17.80175609 19.77972899 17.80175609 16.02157612 14.41941851]
 [16.02157612 17.80175609 16.02157612 14.41941851 12.97747666]
 [14.41941851 16.02157612 14.41941851 12.97747666 11.67972899]]
[['→' '↑→↓←' '←' '↑→↓←' '←']
 ['↑→' '↑' '↑←' '←' '←']
 ['↑→' '↑' '↑←' '↑←' '↑←']
 ['↑→' '↑' '↑←' '↑←' '↑←']
 ['↑→' '↑' '↑←' '↑←' '↑←']]
