**This notebook includes implementations of the following algorithms:**


*   Value Iteration Algorithm
*   Policy Iteration Algorithm

The notebook also includes a test gridworld game where the two algorithms are implemented to extract the optimal policy using value iteration and policy iteration.

Below is the commented implementation where each section is in a seperate notebook.


In [25]:
import numpy as np

**Grid world class representing the dynamics of the grid (enviroment) including the following:**


*   Grid size
*   Immediate rewards
*   Possible actions
*   States of the game which are indeed the cells in the grid





In [26]:
class gridworld:

  def __init__(self, grid_size):
    self.grid_size = grid_size
    rewards = -(np.ones((grid_size, grid_size)))
    rewards[0,2] = 10
    self.rewards = rewards
    self.actions = ["UP", "DOWN", "LEFT", "RIGHT"]
    self.action_prob = {"UP": (0.8, 0.1, 0.1), "DOWN": (0.8, 0.1, 0.1),
               "LEFT": (0.8, 0.1, 0.1), "RIGHT": (0.8, 0.1, 0.1)}

  def next_state(self, state, action):
    x, y = state
    if action == 'UP':
        return [(x-1, y), (x, y-1), (x, y+1)]
    elif action == 'DOWN':
        return [(x+1, y), (x, y-1), (x, y+1)]
    elif action == 'LEFT':
        return [(x, y-1), (x-1, y), (x+1, y)]
    elif action == 'RIGHT':
        return [(x, y+1), (x-1, y), (x+1, y)]
    return [state, state, state]

  def is_valid(self, state):
      x,y = state
      return 0 <= x < self.grid_size and 0 <= y < self.grid_size

**In this section, the class agent algorithms is implemented including the following:**


*   Value Iteration: finds the optimal value function for each state
*   Policy Extraction: finds the optimal policy based on the output of the value iteration.
*   Policy Iteration



In [27]:
class agent_algorithms:
  def __init__(self, grid: gridworld):
    self.grid = grid
    self.discount_factor = 0.99 # takes the future highly in consideration.


  def value_iteration(self):
    state_values = np.zeros((3,3)) #initializng the value function to zero.
    while True:
      delta = 0
      for row in range(0,3):
        for col in range(0,3):
          max_value = float("-inf")
          for action in self.grid.actions:
            value = 0
            for prob, new_state in zip(self.grid.action_prob[action], self.grid.next_state((row,col), action)):
              x1,y1 = new_state
              if self.grid.is_valid(new_state):
                value += prob * self.discount_factor * state_values[x1,y1]
              else:
                value += prob * self.discount_factor * state_values[row,col]
              value += self.grid.rewards[row,col]
            if value > max_value:
              state_values[row,col] = value
              delta = max(delta, abs(value - state_values[row, col]))
      if delta < 1e-4:
            break
    return state_values

  def extract_policy(self, state_values):
    policy = np.empty((3, 3), dtype=str)
    for row in range(3):
        for col in range(3):
            max_value = float("-inf")
            best_action = None
            for action in self.grid.actions:
                value = 0
                for prob, new_state in zip(self.grid.action_prob[action], self.grid.next_state((row,col), action)):
                    x1, y1 = new_state
                    if self.grid.is_valid(new_state):
                        value += prob * (self.discount_factor * state_values[x1, y1])
                    else:
                        value += prob * (self.discount_factor * state_values[row, col])
                    value+= self.grid.rewards[row, col]
                if value > max_value:
                    max_value = value
                    best_action = action
            policy[row, col] = best_action
    return policy

  def policy_iteration(self):
        state_values = np.zeros((3, 3))  # initializng the value function to zero.
        policy = np.random.choice(self.grid.actions, size=(3, 3))  # random initial policy
        while True:
            # Policy Evaluation
            while True:
                delta = 0
                for row in range(0, 3):
                    for col in range(0, 3):
                        action = policy[row, col]
                        value = 0
                        for prob, new_state in zip(self.grid.action_prob[action], self.grid.next_state((row, col), action)):
                            x1, y1 = new_state
                            if self.grid.is_valid(new_state):
                                value += prob * (self.discount_factor * state_values[x1, y1])
                            else:
                                value += prob * (self.discount_factor * state_values[row, col])
                            value += self.grid.rewards[row, col]
                        delta = max(delta, abs(value - state_values[row, col]))
                        state_values[row, col] = value
                if delta < 1e-4:
                    break

            # Policy Improvement
            policy_stable = True
            for row in range(0, 3):
                for col in range(0, 3):
                    old_action = policy[row, col]
                    max_value = float("-inf")
                    best_action = None
                    for action in self.grid.actions:
                        value = 0
                        for prob, new_state in zip(self.grid.action_prob[action], self.grid.next_state((row, col), action)):
                            x1, y1 = new_state
                            if self.grid.is_valid(new_state):
                                value += prob * (self.discount_factor * state_values[x1, y1])
                            else:
                                value += prob * (self.discount_factor * state_values[row, col])
                            value += self.grid.rewards[row, col]
                        if value > max_value:
                            max_value = value
                            best_action = action
                    policy[row, col] = best_action
                    if old_action != best_action:
                        policy_stable = False
            if policy_stable:
                break

        return state_values, policy

**In this section**, The test cases are implemented using different values for the variable reward r and a discount factor = 0.99 (The future is highly accounted in the calculation)

In [28]:
reward_list = [100,3,0,-3]
grid = gridworld(grid_size= 3)
for i in range(4):
  grid.rewards[0,0] = reward_list[i]
  agent = agent_algorithms(grid)
  print(f"State values when r = {reward_list[i]}")
  result = agent.value_iteration()
  policy = agent.extract_policy(result)
  print(result)
  print()
  print(policy)
  print()
  print()

State values when r = 100
[[358.7825073   25.15314214  77.62951928]
 [ 32.51946822  -0.50983893   8.07629415]
 [  1.76496803  -3.26683906  -6.51799654]]

[['L' 'L' 'U']
 ['U' 'L' 'U']
 ['U' 'L' 'U']]


State values when r = 3
[[10.76347522 -2.47201665 57.87009329]
 [-1.93441595 -3.24472965  2.85531541]
 [-4.1282869  -4.05353689 -8.38854082]]

[['L' 'R' 'R']
 ['U' 'R' 'U']
 ['U' 'U' 'U']]


State values when r = 0
[[ 0.         -3.326403   57.25897703]
 [-3.         -3.3293139   2.69384184]
 [-4.31055252 -4.07786775 -8.4463927 ]]

[['L' 'R' 'R']
 ['U' 'R' 'U']
 ['U' 'U' 'U']]


State values when r = -3
[[-10.76347522  -4.18078935  56.64786076]
 [ -4.06558405  -3.41389815   2.53236827]
 [ -4.49281814  -4.10219861  -8.50424459]]

[['D' 'R' 'R']
 ['R' 'R' 'U']
 ['U' 'U' 'U']]




**This section** tests the **Policy Iteration** algorithm. It iterates over a list of reward values, sets each value as the reward in the top-left corner of the grid, computes the optimal policy and state values using Policy Iteration, and prints the results, showing how varying rewards affect the learned optimal policies and expected cumulative rewards.

In [29]:
# Testing the Policy Iteration algorithm
reward_list = [100, 3, 0, -3]
grid = gridworld(grid_size=3)

for i in range(4):
    grid.rewards[0, 0] = reward_list[i]
    agent = agent_algorithms(grid)
    print(f"State values and policy when r = {reward_list[i]}:")
    result, policy = agent.policy_iteration()
    print(result)
    print()
    print(policy)
    print()

State values and policy when r = 100:
[[26319.52754445 25947.7635734  25626.09615308]
 [25947.7635734  25624.94123665 25338.22122842]
 [25585.80882725 25304.89920855 25050.00690979]]

[['UP' 'LEFT' 'LEFT']
 ['UP' 'UP' 'UP']
 ['UP' 'UP' 'UP']]

State values and policy when r = 3:
[[2535.02473987 2559.73349111 2600.12416359]
 [2502.50966115 2525.47099703 2559.73358189]
 [2470.16757945 2491.24601732 2520.46874987]]

[['RIGHT' 'RIGHT' 'RIGHT']
 ['UP' 'UP' 'UP']
 ['UP' 'UP' 'UP']]

State values and policy when r = 0:
[[2522.82081172 2558.58354384 2599.0796592 ]
 [2491.55032642 2523.36141987 2558.58364149]
 [2460.22803927 2488.46084859 2519.15189624]]

[['RIGHT' 'RIGHT' 'RIGHT']
 ['UP' 'UP' 'UP']
 ['UP' 'UP' 'UP']]

State values and policy when r = -3:
[[2511.98041709 2558.20121661 2598.73247959]
 [2486.8594086  2522.65920518 2558.20130559]
 [2456.84177819 2487.89828187 2518.75400136]]

[['RIGHT' 'RIGHT' 'RIGHT']
 ['RIGHT' 'RIGHT' 'UP']
 ['RIGHT' 'RIGHT' 'UP']]

