### Policy Iteration Algorithm

*For performing the Policy Iteration, we follow the steps below*:
1. Initialize a random policy
2. Calculate the utility or value of all state given the current policy.
3. With the new value(s) obtained for each state, compute the utility value for all possible actions i.e. up, right, down, left.
4. Select the action with the maximum value.
5. Compare this value with the value obtained from the initial policy.
* 5a. If new value > old_value -> update the policy
* 5b. Else if new value = old value -> Policy converged.

In [None]:
# Initialize random policies for the environment.
# Policy Evaluation Phase

def P_eval(initial_policy, v):
  for k in range(1,16):
    for row in range(n_rows):
        for col in range(n_cols):
            if (row, col) == terminals[0]:
              v[row, col] = 1
              continue

            elif (row, col) == terminals[1]:
              v[row, col] = -1
              continue

            elif (row, col) in walls:
              continue

            else:
              p_next_states = [] # Initialize list for plausible next states
              v_next_states = [] # Initialize list for the values of the plausible next states

              for i, a in enumerate(actions):
                next_row = row + row_change[i]
                next_col = col + col_change[i]
                if next_row < 0 or next_row >= n_rows or next_col < 0 or next_col >= n_cols or (next_row, next_col) in walls: # Checking to ensure boundaries of the environment is not violated.
                  next_state = (row, col)
                  p_next_states.append(next_state)
                  v_next_states.append(v[next_state])
                else:
                  next_state = (next_row, next_col)
                  p_next_states.append(next_state)
                  v_next_states.append(v[next_state])

              rand_action = initial_policy[row][col]

              act_prob = list(motion_probs[rand_action].values())
              temp_res = [act_prob[i] * v_next_states[i] for i in range(len(v_next_states))] # (P(s'|s,a)* V(s'))
              sum_temp_res = sum(temp_res) # ∑(P(s'|s,a)* V(s')
              v[row, col] = R[row, col] + gamma * sum_temp_res # R(s) + γ(∑(P(s'|s,a)* V(s'))
  return v

In [None]:
def P_improv(initial_policy, v):
  # Update the policy
  for row in range(n_rows):
    for col in range(n_cols):
        if (row, col) == terminals[0]:
          v[row, col] = 1
          continue

        elif (row, col) == terminals[1]:
          v[row, col] = -1
          continue

        elif (row, col) in walls:
          continue

        else:
          p_next_states = [] # Initialize list for plausible next states
          v_next_states = [] # Initialize list for the values of the plausible next states

          old_v = v[row, col]
          old_policy = initial_policy[row][col]

          for i, a in enumerate(actions):
              next_row = row + row_change[i]
              next_col = col + col_change[i]
              if next_row < 0 or next_row >= n_rows or next_col < 0 or next_col >= n_cols or (next_row, next_col) in walls: # Checking to ensure boundaries of the environment is not violated.
                  next_state = (row, col)
                  p_next_states.append(next_state)
                  v_next_states.append(v[next_state])
              else:
                  next_state = (next_row, next_col)
                  p_next_states.append(next_state)
                  v_next_states.append(v[next_state])

          v_check = []  # Initialize empty list for the value of state after taking an action from up, right, down, left.
          for a in actions:
            act_prob = list(motion_probs[a].values())
            temp_res = [act_prob[i] * v_next_states[i] for i in range(len(v_next_states))] # (P(s'|s,a)* V(s')
            sum_temp_res = sum(temp_res) # ∑(P(s'|s,a)* V(s')
            v_check.append(sum_temp_res)
          v_selected = max(v_check)
          v[row, col] = gamma * v_selected # γmax(∑(P(s'|s,a)* V(s'))

          # Check previous policy against new values
          if v[row, col] > old_v:
            # Get the action that produces this.
            max_index = np.argmax(v_check) # Extract the index of the highest value in that state: argmax(∑(P(s'|s,a)* V(s'))
            initial_policy[row][col] = actions[max_index] # Update the policy
          else:
            initial_policy[row][col] = old_policy
            continue
  return initial_policy

In [None]:
import numpy as np

# Define the rows and columns
n_rows = 3
n_cols = 5

# Define the terminal states and the walls/impossible states
terminals = [(0, 4), (2, 3)]
walls = [(0, 3), (1, 1)]

# Define the possible actions at each state
actions = ["UP", "RIGHT", "DOWN", "LEFT"]

# Define the row and column changes from each state to new states depending on actions taken
row_change = [-1, 0, 1, 0] # up, right, down, left
col_change = [0, 1, 0, -1] # up, right, down, left

# Define the motion model
motion_probs = {
    'UP':    {'UP': 0.8, 'RIGHT': 0.1, 'DOWN': 0.0, 'LEFT': 0.1},
    'RIGHT': {'UP': 0.1, 'RIGHT': 0.8, 'DOWN': 0.1, 'LEFT': 0.0},
    'DOWN':  {'UP': 0.0, 'RIGHT': 0.1, 'DOWN': 0.8, 'LEFT': 0.1},
    'LEFT':  {'UP': 0.1, 'RIGHT': 0.0, 'DOWN': 0.1, 'LEFT': 0.8}
}

# Definition and initialization of the reward function
R = np.full((n_rows, n_cols), -0.1)
R[0, 4] = 1 # Reward at terminal state I (Goal state)
R[2, 3] = -1 # Reward at terminal state II (Bad state)

# Define the discount factor
gamma = 0.8

threshold = 1e-3

# Definition and initialization of the value for each state.
v_original = np.zeros((n_rows, n_cols))

v_real = v_original.copy()

initial_policy = np.array([["LEFT", "UP", "DOWN", "NONE", "TERMINAL"],
                 ["DOWN", "NONE", "DOWN", "DOWN", "RIGHT"],
                 ["RIGHT", "UP", "RIGHT", "TERMINAL", "UP"]])

random_policy = initial_policy.copy()


In [None]:
status = False

Iter = 1

while status == False:

  print(f"Iteration: {iter}")

  v_new = P_eval(initial_policy, v_original)

  v_new_array = sum(v_new)

  updated_policy = P_improv(initial_policy, v_new)

  new_v = P_eval(updated_policy, v_original)

  new_v_array = sum(new_v)

  # Check if the absolute difference between each corresponding element is less than the threshold
  if np.all(np.abs(v_new_array - new_v_array) < threshold):
    print("Optimal policy reached\n")
    updated_policy
    status = True
  else:
    initial_policy = updated_policy
    iter += 1
    status = False

Iteration: 1972
Iteration: 1973
Iteration: 1974
Iteration: 1975
Iteration: 1976
Iteration: 1977
Optimal policy reached



In [None]:
updated_policy

array([['RIGHT', 'RIGHT', 'DOWN', 'NONE', 'TERMINAL'],
       ['UP', 'NONE', 'RIGHT', 'RIGHT', 'UP'],
       ['UP', 'LEFT', 'UP', 'TERMINAL', 'UP']], dtype='<U8')

In [None]:
random_policy

array([['LEFT', 'UP', 'DOWN', 'NONE', 'TERMINAL'],
       ['DOWN', 'NONE', 'DOWN', 'DOWN', 'RIGHT'],
       ['RIGHT', 'UP', 'RIGHT', 'TERMINAL', 'UP']], dtype='<U8')

In [None]:
new_v

array([[-0.28014603, -0.20490052, -0.11268263,  0.        ,  1.        ],
       [-0.3324915 ,  0.        ,  0.01988109,  0.22635815,  0.60663984],
       [-0.37520335, -0.4049159 , -0.19966937, -1.        ,  0.22635815]])

In [None]:
v_real

array([[0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]])