In [1]:
from ex4.mdp import MDP
from ex4.misc import make_grader
from ex4.misc import plot_v_pi
from ex4.misc import expected_output_vi_finite_1, expected_output_vi_finite_2, expected_output_vi_infinite, expected_output_vi_infinite_stoch
import numpy as np
import matplotlib.pyplot as plt

### MDP

The MDP class for the cleaning robot is already implemented and doesn't need to be changed.

In [2]:
# Actions
DOWN = 0
RIGHT = 1
UP = 2
LEFT = 3
STAY = 4


class RoboMDP(MDP):
    O = -1e3  # Obstacles, walls or inaccessible places
    D = 1  # Dirt
    W = -1  # Water
    C = -30  # Cat
    T = 10  # Toy
    E = 0  # Empty

    grid_world = {
        "reward": [E, E, E, E, E, E, E, W, E, E,
                   E, O, E, E, E, E, E, O, D, O,
                   E, O, E, D, O, E, O, D, O, T,
                   E, E, E, O, O, E, E, C, E, E,
                   O, O, E, E, W, E, E, O, O, E,
                   E, E, O, E, E, E, E, O, O, E,
                   E, E, E, E, E, E, W, E, E, E, 
                   E, E, E, E, E, O, E, E, D, W,
                   O, W, O, O, O, O, E, E, E, E,
                   D, E, E, E, D, O, E, O, O, O],
        "map": [
            "       W  ",
            " O     ODO",
            " O DO ODOT",
            "   OO  C  ",
            "OO  W  OO ",
            "  O    OO ",
            "      W   ",
            "     O  DW",
            "OWOOOO    ",
            "D   DO OOO"
        ]
    }

    n_actions = 5
    world_dim = (10, 10)

    # Transition probabilities. Each row corresponds to one action, i.e. choosing action down has a 70% chance of
    # executing correctly, and 10% chance for executing right, left or stay.
    # Usage: self.tp[actual_action, noisy_executed_action]

    #             down, right, up, left, stay
    tp = np.array([[0.7, 0.1, 0.0, 0.1, 0.1],  # down
                  [0.1, 0.7, 0.1, 0.0, 0.1],  # right
                  [0.0, 0.1, 0.7, 0.1, 0.1],  # up
                  [0.1, 0.0, 0.1, 0.7, 0.1],  # left
                  [0.0, 0.0, 0.0, 0.0, 1.0]])  # stay

    def __init__(self, discount: float, stochastic: bool):
        super(RoboMDP, self).__init__(discount, self.world_dim, self.n_actions, desc=self.grid_world['map'])

        def inc(row: int, col: int, action: int):
            if action == DOWN:  # move down
                row = min(row+1, self.world_dim[0] - 1)
            elif action == RIGHT:  # move right
                col = min(col+1, self.world_dim[1] - 1)
            elif action == UP:  # move up
                row = max(row - 1, 0)
            elif action == LEFT:  # move left
                col = max(col - 1, 0)
            elif action == STAY:  # stay
                pass  # Not moving
            return row, col

        # We need to set the next state with its corresponding transition probability based on the current state and
        # action. Have a look at mdp.py for the implementation of self.stp
        for r in range(self.world_dim[0]):
            for c in range(self.world_dim[1]):
                s = self.to_s(r, c)
                self.rewards[s] = self.grid_world["reward"][s]
                for a in range(self.n_actions):
                    stp_list = self.stp[s, a]
                    if stochastic:
                        for noisy_a in range(self.n_actions):
                            if self.tp[a, noisy_a] > 0:
                                next_r, next_c = inc(r, c, noisy_a)
                                next_state = self.to_s(next_r, next_c)
                                stp_list.append((next_state, self.tp[a, noisy_a]))
                    else:
                        next_r, next_c = inc(r, c, a)
                        next_state = self.to_s(next_r, next_c)
                        stp_list.append((next_state, 1.))

    # helper functions

    def to_s(self, row, col):
        """given a row and column, returns a linear index corresponding to the world dimension"""
        return row * self.world_dim[1] + col

    def to_rc(self, s):
        """given a linear state, returns a row and column corresponding to the world dimension"""
        return np.unravel_index(s, self.world_dim)
    
    def plot(self):
        n_rows = self.world_dim[0]
        n_cols = self.world_dim[1]

        plt.figure(figsize=(n_rows, n_cols))
        plt.imshow(self.rewards.reshape((n_rows, n_cols)), cmap='gray', interpolation='none')  #, clim=(0, 1))
        ax = plt.gca()
        ax.set_xticks(np.arange(n_cols) - .5)
        ax.set_yticks(np.arange(n_rows) - .5)
        ax.set_xticklabels([])
        ax.set_yticklabels([])
        Y, X = np.mgrid[0:n_rows, 0:n_cols]
        for x, y in zip(X.flatten(), Y.flatten()):
            plt.text(x, y, self.desc[y][x],
                     color='g', size=12, verticalalignment='center',
                     horizontalalignment='center', fontweight='bold')
        plt.grid(color='b', lw=2, ls='-')
        plt.savefig("/tmp/robo_mdp.pdf")


## Part 1: Value Iteration

### Implementing value iteration
For exercise 4, you'll implement value iteration, which has the following pseudocode:

---
Initialize $V_{(0)}^\ast(s)=0$, for all $s$

For $i=0, 1, 2, \dots$
- $V^\ast_{(i+1)}(s) = \max_a  \Big( r(s,a) + \gamma \sum_{s'} P(s' \mid s,a) \, V^\ast_{(i)}(s') \Big)$, for all $s$
- Until convergence or horizon $H$

---

We additionally define the sequence of greedy policies $\pi_{(0)}, \pi_{(1)}, \dots, \pi_{(n-1)}$, where
$$\pi^\ast_{(i)}(s) = \underset{a}{\arg \max} \Big( r(s,a) + \gamma \sum_{s'} P(s' \mid s,a) V^\ast_{(i)}(s')\Big)$$

Your code will return two lists: $[V_{(0)}, V_{(1)}, \dots, V_{(n)}]$ and $[\pi_{(0)}, \pi_{(1)}, \dots, \pi_{(n-1)}]$

To ensure that you get the same policies as the reference solution, choose the lower-index action to break ties in $\arg \max_a$. This is done automatically by np.argmax. This will only affect the "# chg actions" printout below--it won't affect the values computed.

<div class="alert alert-warning">
Warning: make a copy of your value function each iteration and use that copy for the update--don't update your value function in place.
Updating in-place is also a valid algorithm, sometimes called Gauss-Seidel value iteration or asynchronous value iteration, but it will cause you to get different results than our reference solution (which in turn will mean that our testing code won’t be able to help in verifying your code).
</div>

#### Hints
- The reward function can be accessed by $\texttt{mdp.rewards[s]}$, the next states and their probability of reaching
 them (given a state $s$ and and action $a$) by $\texttt{mdp.stp[s, a]}$
- Use the functions $\texttt{to_s}$ and $\texttt{to_rc}$ provided by the MDP class to convert between linear states
 $s \in [0, |\mathcal{S}|]$ and row/column format, i.e. $\texttt{mdp.to_rc(0)}$ returns $(0, 0)$, while
 $\texttt{mdp.to_s(0, 0)}$ returns $0$
- For your implementation, the identities $V^\ast(s) = \max_a Q^\ast(s, a)$ and
 $\pi^\ast(s) = \underset{a}{\arg \max} \, Q^\ast(s, a)$ may be helpful
- Following numpy functions may be useful: $\texttt{np.sum}$, $\texttt{np.max}$, $\texttt{np.argmax}$
- Try using list comprehensions as this will shorten your code
- For the infinite horizon case, we request 100 iterations of value iteration. However, the algorithm should converge
earlier.


In [3]:
def value_iteration(mdp: MDP, n_iter: int, grade_print=print) -> (list, list):
    """
    Inputs:
    @param mdp: an MDP object
    @param n_iter: number of VI iterations
    @param grade_print:
    Outputs:
    @return: (value_functions, policies)

    len(value_functions) == n_iter+1 and len(policies) == n_iter
    """
    vs = [np.zeros(shape=mdp.world_dim)]  # list of value functions contains the initial value function V_{(0)}, which is zero
    pis = []  # list containing sequence of previous greedy policies

    grade_print("Iteration |  max|V-Vprev|  | # chg actions | V[0]    ")
    grade_print("----------+----------------+---------------+---------")
    for it in range(0, n_iter):
        pi_old = pis[-1] if len(pis) > 0 else None  # \pi^{(it)} = Greedy[V^{(it-1)}]. Just used for printout
        v_old = vs[-1]
        
        # Exercise 4 a)
        # %%%%%%%% add your code here %%%%%%%%%%%

        pi = np.zeros(shape=mdp.world_dim)  # contains the actual actions
        v = np.zeros(shape=mdp.world_dim)  # new value function
        
        ...

        # %%%%%%%% add your code here %%%%%%%%%%%

        max_diff = np.abs(v - v_old).max()
        n_chg_actions = 0 if pi_old is None else (pi != pi_old).sum()
        grade_print("{:4d}      | {:12.5f}   |   {:4d}        | {:8.3f}".format(it, max_diff, n_chg_actions, v[0, 0]))

        vs.append(v)
        pis.append(pi)

        if max_diff < 1e-3:
            break

    return vs, pis

## 4 b) Finite Horizon Problem

In [None]:
robo_mdp_finite = RoboMDP(discount=0.5, stochastic=False)
v_finite, pi_finite = value_iteration(robo_mdp_finite, 10, make_grader(expected_output_vi_finite_1))

plot_v_pi(v_finite[-1], pi_finite[-1], robo_mdp_finite)

In [None]:
robo_mdp_finite = RoboMDP(discount=0.9, stochastic=False)
v_finite, pi_finite = value_iteration(robo_mdp_finite, 10, make_grader(expected_output_vi_finite_2))

plot_v_pi(v_finite[-1], pi_finite[-1], robo_mdp_finite)

## 4 c) Infinite Horizon Problem

In [None]:
robo_mdp_infinite = RoboMDP(discount=0.8, stochastic=False)
v_infinite, pi_infinite = value_iteration(robo_mdp_infinite, 100, make_grader(expected_output_vi_infinite))

plot_v_pi(v_infinite[-1], pi_infinite[-1], robo_mdp_infinite)

## 4 d) Infinite Horizon Problem with Stochastic Transitions

In [None]:
robo_mdp_finite_stoch = RoboMDP(discount=0.8, stochastic=True)
v_finite_stoch, pi_finite_stoch = value_iteration(robo_mdp_finite_stoch, 100, make_grader(expected_output_vi_infinite_stoch))

plot_v_pi(v_finite_stoch[-1], pi_finite_stoch[-1], robo_mdp_finite_stoch)