# Gridworld Example

Rules:
1. If you are at $A$, you get a reward of +10 irrespective of the action only if you land in $A'$
2. If you are at $B$, you get a reward of +5 irrespective of the action only if you land in $B'$
3. If you are at a boundary except for $A$ and $B$, you get -1 if you take an action that takes you out of bounds, but you remain in the same state
4. Any other step has zero reward


## The Bellman equation
$$
    v_\pi(s) = \sum_{a} \pi(a | s)\sum_{s', r} p(s', r | a, s) [r + \gamma v_\pi(s')]
$$

In [1]:
import einops
import numpy as np
from numba import jit
from scipy import optimize

## Suboptimal policy

To solve the gridworld example, we suppose that the agent selects all four actions with equal probabilty in all states ( $\forall a.\pi(a\vert s) = \frac{1}{4}$). By the Bellman equation, we have

$$
\begin{aligned}
    v_\pi(s) &= \sum_{a} \pi(a\vert s) \sum_{s',r} p(s', r \vert s, a)[r + \gamma v_\pi(s')]\\
             &= \frac{1}{4}\sum_{a, s', r} p(s', r \vert s, a) [r + \gamma v_\pi(s')]
\end{aligned}
$$
Equation $(1)$ above can be rewritten as
$$
    \sum_{s'}v_\pi(s') \left[\mathbb{1}(s' = s) - \frac{\gamma}{4}\sum_{a,r}p(s', r \vert s, a)\right] =
    \frac{1}{4} \sum_{a,s',r} r \cdot p(s',r\vert s,a)
$$

Let ${\bf v}_\pi \in \mathbb{R}^{\cal S}$,
$$
    A_{s',s} = \mathbb{1}(s' = s) - \frac{\gamma}{4}\sum_{a,r}p(s', r \vert s, a),
$$
and
$$
    {\bf b}_{s} = \frac{1}{4} \sum_{a,s',r} r \cdot p(s',r\vert s,a)
$$

we have
$$
    {\bf A}{\bf v}_\pi = {\bf b}.
$$

Hence, by building ${\bf A}$ and $\bf b$, ${\bf v}_\pi$ is estimated solving the system of equations above.

In [2]:
# actions, rewards, (*state)
n_actions = 4
n_rewards = 4
state_size = 25

In [3]:
lower_bound = 0
upper_bound = np.sqrt(state_size).astype(int) - 1

In [4]:
rewards = np.array([0, 5, 10, -1])
reward_map = {r: ix for ix, r in enumerate(rewards)}

actions = ["up", "right", "down", "left"]
actions_ix_map = {a: ix for ix, a in enumerate(actions)}

action_map = {
    "up": np.array([-1, 0]),
    "right": np.array([0, 1]),
    "down": np.array([1, 0]),
    "left": np.array([0, -1])
}

# mapping from special states to rewards
special_map = {
    1: 10,
    3: 5
}

# mapping from special states to terminal states
special_states = [1, 3]
special_state_map = {
    1: 21,
    3: 13
}

In [5]:
def get_pos(ix):
    """
    From state to coordinate position
    """
    col = ix % 5
    row = ix // 5
    state = np.asarray([row, col])
    return state


def get_state(position):
    """
    From coordinate position to state
    """
    row, col = position
    return 5 * row + col


def is_out_of_bounds(position, lb=0, ub=4):
    """
    Check if coordinate position is out of bounds
    """
    return (position < lb).any() or (position > ub).any()


def move_and_check(position, action):
    """
    Make a non-special move and check wheter
    we're out of bounds.
    If we're out of bounds, return the initial position
    """
    new_position = position + action_map[action]
    new_state = get_state(new_position)
            
    if is_out_of_bounds(new_position):
        return position
    else:
        return new_position


def move(state, action):
    # move to special state
    if state in special_map:
        new_state = special_state_map[state]
        new_position = get_pos(new_state)
    # make a move
    else:
        position = get_pos(state)
        new_position = move_and_check(position, action)
        
    return new_position

## Constructing $p(s', r | a , s)$
First build $p(s', r, s, a)$, then condition over $p(s,a) = \sum_{s',r}p(s',r, a, s)$.

In [6]:
# Constructing p(s',r | a, s)
p_gridworld = np.zeros((state_size, n_rewards, state_size, n_actions))

for state in range(state_size):
    for action in action_map:

        # obtain s' — new state
        new_pos = move(state, action)
        new_state = get_state(new_pos)
        
        # obtain r — reward
        if state in special_states:
            r = special_map[state]
        elif state == new_state:
            r = -1
        else:
            r = 0
        
        # r = r
        a_pos = actions_ix_map[action]
        r_pos = reward_map[r]
        p_gridworld[new_state, r_pos, state, a_pos] = 1

p_gridworld = p_gridworld / p_gridworld.sum(axis=(0,1), keepdims=True)
p_gridworld.shape

(25, 4, 25, 4)

In [7]:
# Σ_{s', r, a} r * p(s', r | a, s)
b = einops.einsum(p_gridworld, rewards, "s_prime r s a, r -> s") / 4

γ = 0.9
I = np.eye(state_size)
A = I - γ / 4 * einops.reduce(p_gridworld, "s_prime r s a -> s s_prime", "sum")

----

In [8]:
np.linalg.solve(A, b).reshape(5, 5).round(1)

array([[ 3.3,  8.8,  4.4,  5.3,  1.5],
       [ 1.5,  3. ,  2.3,  1.9,  0.5],
       [ 0.1,  0.7,  0.7,  0.4, -0.4],
       [-1. , -0.4, -0.4, -0.6, -1.2],
       [-1.9, -1.3, -1.2, -1.4, -2. ]])

## Optimal state-value function
**Bellman optimality equation**

$$
    v_{*}(s) = \max_{a\in\mathcal{A}} \sum_{s',r}p\left( s', r \vert s, a \right)\left[r + \gamma v_*(s')\right]
$$

In [9]:
def bellman_optimality(vs):
    rhs = (rewards[None, :] + γ * vs[:, None])
    rhs = einops.einsum(rhs, p_gridworld, "s_prime r, s_prime r s a -> s a")
    rhs = einops.reduce(rhs, "s a -> s", "max")
    
    return vs - rhs

In [10]:
vs = np.ones(25) # intial estimate of the optimal value function
vs_star = optimize.broyden1(bellman_optimality, vs)
vs_star.reshape(5, 5).round(1)

array([[22. , 24.4, 22. , 19.4, 17.5],
       [19.8, 22. , 19.8, 17.8, 16. ],
       [17.8, 19.8, 17.8, 16. , 14.4],
       [16. , 17.8, 16. , 14.4, 13. ],
       [14.4, 16. , 14.4, 13. , 11.7]])

### Optimal actions
Having computed $v_*(s)$ for all $s\in{\cal S}$, we compute
$$
    v_{*,a}(s) := \sum_{s',r}p(s',r | s, a)[r + \gamma v_*(s')]\ \forall a\in{\cal A}
$$
and keep, for a given state $s$, all $a$ such that
$$
    v_*(s) = v_{*,a}(s).
$$

In [11]:
# Compute expected returns for all actions at a given state
action_expected_return = rewards[None, :] + γ * vs_star[:, None]
action_expected_return = einops.einsum(action_expected_return, p_gridworld, "s_prime r, s_prime r s a -> a s")
# check which actions obtain the optimal value-function
optimal_actions = (action_expected_return == action_expected_return.max(axis=0))

In [17]:
# Up, right, left, down actions
actions_str = np.where(optimal_actions, np.array(["u","r","d","l"])[:, None], "").T
np.array([f'{"".join(row):4}' for row in actions_str]).reshape(5, 5)

array([['r   ', 'urdl', 'l   ', 'urdl', 'l   '],
       ['u   ', 'u   ', 'l   ', 'l   ', 'l   '],
       ['r   ', 'u   ', 'u   ', 'u   ', 'l   '],
       ['r   ', 'u   ', 'u   ', 'l   ', 'u   '],
       ['r   ', 'u   ', 'u   ', 'l   ', 'l   ']], dtype='<U4')