In [1]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

In [2]:
from IPython.display import IFrame

IFrame(width=560, height=315, src="https://www.youtube.com/embed/zSOMeug_i_M")

## Grid World

Here I define a simple grid world environment

In [3]:
from enum import Enum
from typing import Tuple

%matplotlib notebook
import matplotlib.pyplot as plt
import numpy as np


class Actions(Enum):
    UP = 0
    RIGHT = 1
    DOWN = 2
    LEFT = 3


class GridWorld:
    """
    Grid world which has solid ground, holes, and goals. Solid ground gives 0
    reward, holes give negative reward, goals give positive reward.
    """
    
    def __init__(self, height: int, width: int, num_goals=1, num_holes=0,
                 goal_reward: float = 1., hole_reward: int = -1.):
        self.shape = (height, width)
        self.hole_reward = hole_reward
        self.goal_reward = goal_reward
        # Reward is given whenever we land in a grid cell. By default
        # reward is zero
        self.rewards = np.zeros(shape=(height, width))
        # Set goals and holes
        indices = np.random.choice(
            np.arange(height*width), size=num_goals+num_holes)
        goal_indices = indices[:num_goals]
        hole_indices = indices[num_goals:]
        self.rewards[
            np.unravel_index(goal_indices, (height, width))] = goal_reward
        self.rewards[
            np.unravel_index(hole_indices, (height, width))] = hole_reward        

    def evaluate_state_action(self, state: Tuple[int, int], action: Actions):
        """
        Get the expected reward and next state probablities for taking an
        `action` while in an initial `state`
        The next state probabilities are represented in a dictionary. Keys are
        tuples of (y, x co-ords) and values are the associated probabilities.
        This method assumes we have a model of the environment, therefore we
        can compute the expected reward analytically
        For now the action has a deterministic outcome.
        TODO Implement non-deterministic outcomes
        """
        new_y = state[0]
        new_x = state[1]
        if action == Actions.UP:
            new_y = max(new_y - 1, 0)
        elif action == Actions.RIGHT:
            new_x = min(new_x + 1, self.shape[1] - 1)
        elif action == Actions.DOWN:
            new_y = min(new_y + 1, self.shape[0] - 1)
        elif action == Actions.LEFT:
            new_x = max(new_x - 1, 0)
        next_state_probabilities = {(new_y, new_x): 1.}
        reward = self.rewards[new_y, new_x]
        return next_state_probabilities, reward

    def render(self):
        """
        Blue for solid ground
        Green for goals
        Black for holes
        """
        grid = np.zeros((*self.shape, 3)) + np.array([0.1, 0.9, 1])
        # Color in the goals
        grid[np.where(self.rewards == self.goal_reward)] = np.array([0, 0.7, 0.1])
        # Color in holes
        grid[np.where(self.rewards == self.hole_reward)] = np.array([0.2, 0.2, 0.2])
        fig, ax = plt.subplots(figsize=(5, 5))
        ax.imshow(grid, vmin=0, vmax=1)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_title("Grid World")
        fig.canvas.draw()

env = GridWorld(8, 8, num_goals=5, num_holes=5)
env.render()

<IPython.core.display.Javascript object>

## Policy specification

Specification for how policies should be defined, and a helper for visualization.

In [19]:
class PolicyHelper:
    """
    Use this to define policies for the purposes of this notebook
    """
    def __init__(self, env: GridWorld, policy: np.ndarray):
        # `policy` is a 3d array (following the shape of the environment)
        # The first two dimensions are the spatial dimensions, and the last
        # dimension represents the probability of taking up, right, down, left
        # as an action
        self.env = env
        self.policy = policy
    
    @classmethod
    def uniformly_random(cls, env: GridWorld):
        policy = np.random.uniform(size=(*env.shape, len(Actions)))
        policy = policy / policy.sum(-1)[:, :, None]  # normalize
        return cls(env, policy)

    @classmethod
    def uniform(cls, env: GridWorld, action: Actions):
        policy = np.zeros(shape=(*env.shape, len(Actions)))
        policy[:, :, action.value] = 1
        return cls(env, policy)

    def render(self, figure_title: str = '', redraw=False):
        """
        Draw a grid of arrows with magnitudes representing
        the probability of taking an action
        """
        res = 40  # kind of like resolution
        grid = np.ones((env.shape[0]*res, env.shape[1]*res, 3))
        grid[:, res::res] *= 0
        grid[res::res] *= 0
        if redraw:
            self.ax.clear()
        else:
            self.fig, self.ax = plt.subplots(figsize=(5, 5))
        self.ax.imshow(grid, vmin=0, vmax=1)
        self.ax.set_xticks([])
        self.ax.set_yticks([])
        if len(figure_title):
            self.ax.set_title(figure_title)
        for ix, p in np.ndenumerate(self.policy):
            if p > 0:
                plt.arrow(
                    x = ix[1]*res + res/2, y= ix[0]*res + res/2,
                    dx = p * 0.4 * res * [0, 1, 0, -1][ix[2]],
                    dy = p * 0.4 * res * [-1, 0, 1, 0][ix[2]],
                    length_includes_head=True, width=0.05*res*np.sqrt(p),
                    head_length=0.1*res*np.sqrt(p),
                    head_width=0.2*res*np.sqrt(p), linewidth=0)
        self.fig.canvas.draw()


env = GridWorld(8, 8, num_goals=5, num_holes=5)
policy_helper = PolicyHelper.uniformly_random(env)
# policy_helper = PolicyHelper.uniform(env, action=Actions.DOWN)
policy_helper.render()

<IPython.core.display.Javascript object>

## Policy evaluation

In the next cell I demonstrate
1. How to evalute the policy exactly by solving a linear system of equations derived from applying Bellman's equation to all states.
2. How to evaluate the policy with dynamic programming.
3. Applying the above to a randomly generated gridworld and checking that values from both methods are the same.

In [51]:
from typing import Optional
from time import sleep

import seaborn as sns


def evaluate_policy_analytically(env: GridWorld, policy: np.ndarray, gamma: float):
    """
    Evaluate policy by solving system of linear equations
    If you are having trouble understanding the computation of expectation
    values, this video might help https://youtu.be/uUoGPvxPkQU
    """
    # `coeffs` will be a numpy array of coefficients for each state-value
    # `consts` will store the constant terms
    coeffs = []
    consts = []
    # Look at the Bellman equation for each state
    # Reminder of Bellman eqn
    # v(s) = sum_a(
    #     pi(a|s) * sum_r,s'(
    #         p(r, s' | s, a) * (r + gamma * v(s'))
    #     )
    # )
    # or compactly: v(s) = E[R_{t+1} + gamma * v(S_{t+1}) | s, pi]
    for ravelled_state_ix in range(np.prod(env.shape)):
        # Expectation value of reward given current state and policy
        # aka E[R_{t+1} | s, pi]
        er_sp = 0
        # Initialise next set of coefficients...
        coeffs.append(np.zeros(np.prod(env.shape)))
        # ... remembering that we need to set the coefficient for the current state
        coeffs[-1][ravelled_state_ix] = 1
        # Convert index to grid world coords
        state_ix = np.unravel_index(ravelled_state_ix, env.shape)
        # Step through outer sum of Bellman eqn
        for action in Actions:
            # Probability of action
            p_a = policy[state_ix][action.value]
            if p_a == 0:
                continue
            # `er_sa` is the expectation value of reward given state and action
            # aka E[R_{t+1} | s, a]
            next_state_probabilities, er_sa = env.evaluate_state_action(
                state_ix, action)
            # Add to the series expansion of er_sp
            er_sp += p_a * er_sa
            
            # Step through inner sum of Bellman eqn for next state probs
            for next_state_ix, p_s in next_state_probabilities.items():
                # Add to the series expansion of coefficients
                # We use a -ve sign because we need to move coefficients to LHS of Bellman
                coeffs[-1][np.ravel_multi_index(next_state_ix, env.shape)] \
                    -= p_a * p_s * gamma
        consts.append(er_sp)
        
    consts = np.array(consts)
        
    return np.linalg.solve(coeffs, consts).reshape(*env.shape)
             


class DynamicPolicyEvaluator:
    def __init__(self, env: GridWorld, policy: np.ndarray):
        self.env = env
        self.update_policy(policy)

    def update_policy(self, policy: np.ndarray):
        self.policy = policy
    
    def evaluate(self, gamma: float, num_iterations: Optional[int] = None,
                 vis_interval: Optional[int] = None):
        """
        Must specify gamma as the discount factor
        Specifying `iterations` will go for that many iterations or till
        convergence (whichever comes first)
        If you are having trouble understanding the computation of expectation
        values, this video might help https://youtu.be/uUoGPvxPkQU
        """
        # Store whether the evaluation converged
        converged = False
        # Initialize all values to zero
        values = np.zeros(self.env.shape)
        # One time step goes through all states in the policy and does an
        # update on the estimated value
        # Stopping condition is either `num_iterations` being reached or ALL
        # estimated values having converged
        k = 0  # iteration counter
        avg_diffs = [] # store average diff to visualize convergence
        # Will keep a copy of previous values each iteration to check convergence
        prev_values = values.copy()  
        while True:
            for state_ix, _ in np.ndenumerate(values):
                # Expectation value of reward given current state and policy
                # aka E[R_{t+1} | s, pi]
                er_sp = 0
                # Expectation value of state-value of next state given current state and policy
                # aka E[v(S_{t+1}) | s, pi]
                ev_sp = 0
                # Compute by expanding the expectation values over all possible actions
                for action in Actions:
                    # Probability of action
                    p_a = self.policy[state_ix][action.value]
                    if p_a == 0:
                        continue
                    # `er_sa` is the expectation value of reward given state and action
                    # aka E[R_{t+1} | s, a]
                    next_state_probabilities, er_sa = self.env.evaluate_state_action(
                        state_ix, action)
                    # Compute `ev_sa` aka E[v(S_{t+1}) | s, a]
                    ev_sa = 0
                    for next_state_ix, p_s in next_state_probabilities.items():
                        ev_sa += p_s * values[next_state_ix]
                    # Add to the series expansion of er_sp
                    er_sp += p_a * er_sa
                    # Add to the series expansion of ev_sp
                    ev_sp += p_a * ev_sa
                # Apply the update rule
                updated_value = er_sp + gamma * ev_sp
                # Update values
                values[state_ix] = updated_value
            # Update the average diff in values
            avg_diffs.append((values - prev_values).mean())
            k += 1
            # Visualize
            if vis_interval is not None and (k % vis_interval) == 0:
                self.render(values, figure_title=f'Estimated values: Iteration {k}',
                            redraw = (k > vis_interval))
                sleep(1)
            # Stop if num_iterations is reached
            if num_iterations is not None and k == num_iterations:
                break
            # Or if stopping condition is reached
            if prev_values is not None and np.allclose(prev_values, values, rtol=1e-4):
                converged = True
                break
            # Keep a copy of the values to check convergence
            prev_values = values.copy()
        return values, avg_diffs, converged

    def render(self, values, figure_title='', redraw=False):
        """
        Show grid with estimated values
        """
        if redraw:
            self.ax.clear()
        else:
            self.fig, self.ax = plt.subplots(figsize=(5, 5))
        sns.heatmap(values, ax=self.ax, cbar=False, annot=True, fmt='.2f')
        self.ax.set_xticks([])
        self.ax.set_yticks([])
        self.ax.set_title(figure_title)
        self.fig.canvas.draw()


GAMMA = 0.9
env = GridWorld(8, 8, num_goals=5, num_holes=5)
policy_helper = PolicyHelper.uniformly_random(env)
# policy_helper = PolicyHelper.uniform(env, Actions.DOWN)
analytical_solution = evaluate_policy_analytically(env, policy_helper.policy, GAMMA)
policy_helper.render()
env.render()
evaluator = DynamicPolicyEvaluator(env, policy_helper.policy)
# evaluator.render()
values, avg_diffs, converged = evaluator.evaluate(GAMMA, 200, vis_interval=10)
print(f"Policy evaluation did {'' if converged else 'not '}converge")

fig, ax = plt.subplots(figsize=(5, 3))
ax.plot(avg_diffs)
ax.set_xlabel('Iteration #')
ax.set_ylabel('Average of all value updates')
fig.canvas.draw()

if np.allclose(values, analytical_solution, atol=1e-02):
    print("✓ Values from policy iteration DO match the analytical solution")
else:
    print("✘ Values from policy iteration DO NOT match the analytical solution...")
    print(f"... max diff was {np.abs(values - analytical_solution).max()}")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Policy evaluation did converge


<IPython.core.display.Javascript object>

✓ Values from policy iteration DO match the analytical solution


## Policy Iteration

Making use of the DynamicPolicyEvaluator to do policy iteration via greedification. Will visualize how the policy changes over time.

Note that there seems to be a nice side-effect of the fact that running into a wall makes you stay on the spot. I did not even observe this myself until I was puzzled by the fact that the optimal policy does not try to return to the same goal patch if it is not near a wall. That's because the agent would only get the goal reward one in every two steps. But with a wall goal, the agent can get a reward for every step by running into the wall (and therefore staying on the spot). Try with a small gamma though (maybe 0.1) and you will find that sometimes the optimal thing to do IS to jump on and off a non-wall goal, if there is no wall goal nearby.

In [67]:
class GreedyPolicyIterator:
    """
    Policy iteration by applying dynamic policy evaluation with greedification steps
    """
    def __init__(self, env: GridWorld, policy_helper: PolicyHelper):
        self.env = env
        self.policy_helper = policy_helper
        
    def iterate(self, gamma: float, num_policy_iterations: Optional[int] = None,
                num_evaluation_iterations: Optional[int] = None,
                vis_interval: Optional[int] = None, vis_delay=1.):
        self.policy_helper.render(figure_title=f'Policy: Iteration 0')
        k = 0  # iteration counter
        converged = False  # we'll return whether it converged or not
        prev_values = None  # will keep a copy of previous values each iteration to check convergence
        avg_values = [] # store average value to visualize monotonic growth / convergence
        while True:
            # Evaluate the policy
            evaluator = DynamicPolicyEvaluator(self.env, self.policy_helper.policy)
            values, _, policy_eval_converged = evaluator.evaluate(gamma, num_evaluation_iterations)
            if not policy_eval_converged and num_evaluation_iterations is None:
                print(f"Warning: Policy evaluation did not converge for step {k}")
            # For each state, evaluate argmax of the state-action value function,
            # and based on that, update the policy for that state
            for state_ix, _ in np.ndenumerate(values):
                # State-action value function. We'll fill this in for each action
                q_sa = []
                for action in Actions:
                    # `er_sa` is the expectation value of reward given state and action
                    # aka E[R_{t+1} | s, a]
                    next_state_probabilities, er_sa = self.env.evaluate_state_action(
                        state_ix, action)
                    # Compute `ev_sa` aka E[v(S_{t+1}) | s, a]
                    ev_sa = 0
                    for next_state_ix, p_s in next_state_probabilities.items():
                        ev_sa += p_s * values[next_state_ix]
                    # Append the result for this action to q_sa. We will argmax over actions
                    q_sa.append(er_sa + ev_sa)
                best_action = max(zip(Actions, q_sa), key=lambda x: x[1])[0]
                # Clear the policy for this state...
                self.policy_helper.policy[state_ix[0], state_ix[1], :] *= 0
                # ... and select the best action
                self.policy_helper.policy[state_ix[0], state_ix[1], best_action.value] = 1.
            # Update the average values
            avg_values.append(values.mean())
            k += 1
            # Visualize
            if vis_interval is not None and (k % vis_interval) == 0:
                self.policy_helper.render(
                    figure_title=f'Policy: Iteration {k}', redraw=True)
                sleep(vis_delay)
            # Stop if num_iterations is reached
            if num_policy_iterations is not None and k == num_policy_iterations:
                break
            # Or if stopping condition is reached
            if prev_values is not None and np.allclose(prev_values, values, rtol=1e-4):
                converged = True
                break
            # Keep a copy of the values to check convergence
            prev_values = values.copy()

        return avg_values, converged
    

GAMMA = 0.9
env = GridWorld(8, 8, num_goals=5, num_holes=5)
policy_helper = PolicyHelper.uniformly_random(env)
# policy_helper = PolicyHelper.uniform(env, Actions.DOWN)
env.render()
iterator = GreedyPolicyIterator(env, policy_helper)
avg_values, converged = iterator.iterate(
    GAMMA, num_policy_iterations=100, num_evaluation_iterations=2, vis_interval=1, vis_delay=0.05)
print(f"Policy iteration did {'' if converged else 'not '}converge")

fig, ax = plt.subplots(figsize=(5, 3))
ax.plot(avg_values, label='Avg value at iteration')
ax.plot(range(1, len(avg_values)), np.diff(avg_values, 1), label='Diff')
ax.set_xlabel('Iteration #')
ax.legend()
fig.canvas.draw()


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Policy iteration did not converge


<IPython.core.display.Javascript object>