In [352]:
import numpy as np
from tqdm import tqdm
from itertools import product
from copy import deepcopy

In [450]:
ACTIONS = ['up', 'down', 'left', 'right']


class GridWorld:
    ACTION_MAP = {
        'up': (-1, 0),
        'down': (1, 0),
        'left': (0, -1),
        'right': (0, 1),
    }

    def __init__(self, nrow, ncol):
        self.nrow = nrow
        self.ncol = ncol
        self.shape = (nrow, ncol)
        self.grid = [[0]*ncol for _ in range(nrow)]
    
    def __iter__(self):
        for cell in product(range(self.nrow), range(self.ncol)):
            yield cell
    
    def is_terminal(self, state):
        return state in [(0, 0), (self.nrow-1, self.ncol-1)]

    def get_move(self, action):
        return self.ACTION_MAP[action.lower()]

    def get_reward(self, state):
        if self.is_terminal(state):
            return 0
        else:
            return -1
    
    def get_new_state(self, state, move):
        _row = min(max(state[0] + move[0], 0), self.nrow-1)
        _col = min(max(state[1] + move[1], 0), self.ncol-1)
        new_state = (_row, _col)
        return new_state
    
    def get_four_neighbors(self, state):
        return [
            self.get_new_state(state, self.get_move('up')),
            self.get_new_state(state, self.get_move('down')),
            self.get_new_state(state, self.get_move('left')),
            self.get_new_state(state, self.get_move('right')),
        ]

    def step(self, state, action):
        move = self.get_move(action)
        new_state = self.get_new_state(state, move)
        reward = self.get_reward(new_state)
        return new_state, reward
    


def random_policy(state):
    """Uniform policy across action space"""
    return np.ones(len(ACTIONS)) / len(ACTIONS)


def evaluate_cell_mc(grid, cell, policy, iter):
    V = []  
    for _ in range(iter):
        s0 = cell
        R = 0
        while True:
            if grid.is_terminal(s0):
                break

            action = np.random.choice(ACTIONS, p=policy(s0))
            s1, r = grid.step(s0, action)
            s0 = s1
            R += r
        V.append(R) # value per step
        
    return np.mean(V)


def evaluate_grid_mc(grid, policy, iter=2000):
    """Monte Carlo evaluation of policy value"""
    vs = []
    for cell in grid:
        col_v = evaluate_cell_mc(grid, cell, policy, iter=iter)  
        vs.append(col_v)
    
    return np.array(vs).reshape(grid.shape)


def evaluate_grid_pi(grid, policy, delta=0.001):
    """Policy evaluation using bellman equation updates"""
    V = deepcopy(grid.grid)
    delta = 0.001
    while True:  # iterations until error less than delta
        V2 = deepcopy(V)
        _delta = 0
        for s0 in grid:  # calculate value each cell
            if grid.is_terminal(s0):
                V2[s0[0]][s0[1]] = 0
                continue

            neighbors = grid.get_four_neighbors(s0)

            # update using previous iteration
            rs = [grid.get_reward(s0) for _ in neighbors]  # reward on current cell
            vs = [V[s1[0]][s1[1]] for s1 in neighbors]  # values from all actions
            pis = policy(s0)
            v = np.sum(np.c_[rs, vs].sum(1) * pis)
            V2[s0[0]][s0[1]] = v

            _delta = max(_delta, abs(v - V[s0[0]][s0[1]]))

        V = deepcopy(V2)
        if _delta < delta:
            break

    return np.array(V)

In [448]:
grid = GridWorld(4, 4)
np.round(evaluate_grid_pi(grid, random_policy, delta=0.001), 0)

array([[  0., -14., -20., -22.],
       [-14., -18., -20., -20.],
       [-20., -20., -18., -14.],
       [-22., -20., -14.,   0.]])

In [449]:
grid = GridWorld(4, 4)
np.round(evaluate_grid_mc(grid, random_policy, iter=2000), 0)

array([[  0., -13., -19., -22.],
       [-13., -17., -20., -19.],
       [-19., -19., -17., -13.],
       [-21., -20., -13.,   0.]])