In [1]:
class Environment:
    def __init__(self, cells_per_row = 11):
        self.discount = 1
        self.actions = ['up', 'down', 'left', 'right']
        self.grid_size = cells_per_row ** 11
        self.states = range(0, self.grid_size)
        self._cells_per_row = cells_per_row
        
    def is_terminal_state(self, state):
        return state == 0 or state == self.grid_size - 1

    def tick(self, current_state, action):
        # get next state, considering boundaries
        if self.is_terminal_state(current_state):
            return current_state
        
        if action == 'up':
            next_state = current_state - self._cells_per_row
            if next_state < 0:
                return current_state
        elif action == 'down':
            next_state = current_state + self._cells_per_row
            if next_state >= self.grid_size:
                return current_state
        elif action == 'left':
            if current_state % self._cells_per_row == 0:
                return current_state
            next_state = current_state - 1
        elif action == 'right':
            if (current_state+1) % self._cells_per_row == 0:
                return current_state
            next_state = current_state + 1
        
        return next_state
    
    def reward(self, current_state):
        if self.is_terminal_state(current_state):
            return 0
        else:
            return -1

        
class RandomPolicy:
    def __init__(self, environment):
        self.environment = environment
    
    def action(self, current_state):
        if self.environment.is_terminal_state(current_state):
            return {'stay': 1}
        return {e: 1 / len(self.environment.actions) for e in self.environment.actions}

In [2]:
# tests
e = Environment()
assert(e.tick(0, 'up') == 0)
assert(e.tick(0, 'left') == 0)
assert(e.tick(8, 'left') == 8)
assert(e.tick(11, 'right') == 11)
assert(e.tick(14, 'down') == 14)
assert(e.tick(5, 'up') == 1)
assert(e.tick(5, 'down') == 9)
assert(e.tick(5, 'left') == 4)
assert(e.tick(5, 'right') == 6)

p = RandomPolicy(e)
print(p.action(0))
print(p.action(1))

AssertionError: 

In [4]:
# 1. policy evaluation should converge
e = Environment()
p = RandomPolicy(e)


def evaluate_policy(policy, iterations):
    values = [0] * e.grid_size    
    for i in range(iterations):
        new_values = [0] * e.grid_size
        for state in e.states:
            for (action, probability) in policy.action(state).items():
                next_state = e.tick(state, action)
                next_state_value = values[next_state]
                reward = e.reward(state)
                new_values[state] += probability * (reward + next_state_value)
        values = new_values
    return values

print([round(e, 0) for e in evaluate_policy(p, 100)]) # this should match values from the slides

MemoryError: 