# Grid World Example
> A gridworld is a small game where the player moves in a 2D grid and tries to reach a certain goal cell at the end in order to win.

> This example concerns itself with dynamic programming (dp) methods in reinforcement learning.

> In this code only policy evaluation and iteration are implemented. You are expected to implement value iteration.

## Imports
- numpy to handle arrays and other mathematical operations

In [0]:
import numpy as np

## Global Variables
- Actions

In [0]:
MOVE_UP    = 0
MOVE_DOWN  = 1
MOVE_LEFT  = 2
MOVE_RIGHT = 3
ACTIONS = (
    MOVE_UP,
    MOVE_DOWN,
    MOVE_LEFT,
    MOVE_RIGHT
)
ACTION_NAME = {
    MOVE_UP: "UP",
    MOVE_DOWN: "DOWN",
    MOVE_LEFT: "LEFT",
    MOVE_RIGHT: "RIGHT"
}

## Environment Class (GridWorld)
A class containing the MDP representing the gridworld problem.

|Parmeter|Description|
|-------------|-------------------------------------------------------------------------------------------------------|
|shape|Tuple containing the shape of the grid (row, column)|
|final_idx|Tuple containing index of final row and final column|
|num_states|Number of states in the MDP|
|num_actions|Number of actions in the MDP|
|rewards|A grid containing the rewards for each state.<br/> By default the first and last cells of the grid are +1.|
|start_state|The index of the state where the agent starts.<br/>By default the first cell of the grid.|
|discount|The discount factor for the MDP.<br/>By default 0.9.|
|terminals|List of all the terminal states. <br/>By default it contains the first cell of the grid.|
|\_v|The current known value function of the MDP. Initially all 0s.|


In [0]:
class GridWorld:
    def __init__(self, shape=(4, 4), rewards_grid=None, start=(0, 0), discount_factor=0.9, terminals=[(0, 0)]):
        """Initialize a GridWorld environment
    
        Parameters
        ----------
        shape: tuple
          shape of grid
        rewards_grid: numpy.ndarray
          reward function
        start: tuple
          start state
        discount_factor: float
          value between 0 and 1 to dicount future rewards
        terminals: list
          array containing all terminal states
        """
        # Error handling
        if not isinstance(shape, tuple) or not len(shape) == 2\
        or not isinstance(start, tuple) or not len(start) == 2:
            raise ValueError('shape and start arguments must be a tuple of length 2')
        if discount_factor >= 1 or discount_factor <= 0:
            raise ValueError('discount_factor must be a number between 0 and 1')
        if rewards_grid is not None and rewards_grid.shape != shape:
            raise ValueError('rewards grid shape is not compatible with grid shape')

        # Metadata parameters
        self.shape  = shape
        self.final_idx = (shape[0] - 1, shape[1] - 1)

        # MDP variables (assume environment with no randomness)
        self.num_states  = np.prod(shape)
        self.num_actions = len(ACTIONS)
        self.rewards     = rewards_grid
        self.start_state = start
        self.discount    = discount_factor
        self.terminals   = terminals

        # Set rewards in case of none.
        if rewards_grid is None:
            self.rewards = np.zeros(self.shape)
            self.rewards[self.start_state] = 1
            self.rewards[self.final_idx]   = 1

        # Learning parameters
        self._v = np.zeros(self.shape)
    def reset_v(self):
        """Reset the currently known value function.
        """
        self._v = np.zeros(self.shape)
  
    def step(self, action, state):
        """Take an action in a state and return the result

        Parameters
        ----------
        action: int
            The action to be taken
        state: tuple
            The state to take the action in

        Returns
        -------
        tuple
            The state the agent ends up in when taking the action
        """
        curr_x = state[1]
        curr_y = state[0]
        if action == MOVE_UP:
            curr_y -= 1
        elif action == MOVE_DOWN:
            curr_y += 1
        elif action == MOVE_LEFT:
            curr_x -= 1
        elif action == MOVE_RIGHT:
            curr_x += 1
        else:
            raise ValueError('Action not available')
    
        # Make sure the resulting state is within bounds
        curr_y = min(self.final_idx[0], max(0, curr_y))
        curr_x = min(self.final_idx[1], max(0, curr_x))
        return (curr_y, curr_x)
  
    def evaluate_policy(self, policy, theta=0.01):
        """An implementation of the policy evaluation algorithm

        Parameters
        ----------
        policy: numpy.ndarray
            policy to be evaluated
        theta: float
            Threshold for stopping the iteration

        Returns
        -------
        numpy.ndarray
            value function based on current policy
        """
        v = np.zeros(self.shape)
        eps = theta + 1
        while eps > theta:
            eps = 0
            it = np.nditer(self.rewards, flags=['multi_index'])
            while not it.finished:
                s = it.multi_index
                if s in self.terminals:
                    it.iternext()
                    continue
                val = v[s]
                res = 0
                for i in range(self.num_actions):
                    s_ = self.step(i, s)
                    idx = (s[0], s[1], i)
                    res += policy[idx] * (self.rewards[s_] + self.discount * self._v[s_])
                v[s] = res
                eps = max(eps, abs(val - v[s]))
                it.iternext()
            self._v = v
        return v
  
    def policy_iteration(self):
        """An implementation of the policy iteration algorithm

        Returns
        -------
        numpy.ndarray
            learned policy for the agent
        """
        pi = np.full((5, 5, 4), 0.25)
        stable = False
        while not stable:
            stable = True
            v = self.evaluate_policy(pi)
            it = np.nditer(self.rewards, flags=['multi_index'])
            while not it.finished:
                s = it.multi_index
                if s in self.terminals:
                    it.iternext()
                    continue
                acts = pi[s]
                new_vals = []
                for i in range(self.num_actions):
                    s_ = self.step(i, s)
                    new_vals.append(self.rewards[s_] + self.discount * self._v[s_])
                best = np.argmax(new_vals)
                new_acts = [1 if i == best else 0 for i in range(self.num_actions)]
                idx = (s[0], s[1], best)
                if pi[idx] != new_acts[best]:
                    stable = False
                pi[s] = new_acts
                it.iternext()
        return pi
      

## Example

In [0]:
rewards = np.array([
    [1, 0, 0, 0, 0],
    [0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0],
    [0, 0, 0, 0, -1]])
terminals = [(0, 0), (4, 4)]

In [5]:
env = GridWorld((5, 5), rewards_grid=rewards, terminals=terminals)
env.reset_v()
pi = env.policy_iteration()
print(env._v)

[[0.       1.       0.9      0.81     0.729   ]
 [1.       0.9      0.81     0.729    0.6561  ]
 [0.9      0.81     0.729    0.6561   0.59049 ]
 [0.81     0.729    0.6561   0.59049  0.531441]
 [0.729    0.6561   0.59049  0.531441 0.      ]]
