In [1]:
from dataclasses import dataclass
from typing import Tuple, Mapping, Dict, Sequence, Iterable, Set, List

In [2]:
from rl.markov_decision_process import FiniteMarkovDecisionProcess
from rl.distribution import Categorical, Constant
import abc

In [29]:
from typing import Tuple, Mapping

SPACE = 'SPACE'
BLOCK = 'BLOCK'
GOAL = 'GOAL'

maze_grid: Mapping[Tuple[int, int], str] = {(0, 0): SPACE, (0, 1): BLOCK, (0, 2): SPACE, (0, 3): SPACE, (0, 4): SPACE, 
             (0, 5): SPACE, (0, 6): SPACE, (0, 7): SPACE, (1, 0): SPACE, (1, 1): BLOCK,
             (1, 2): BLOCK, (1, 3): SPACE, (1, 4): BLOCK, (1, 5): BLOCK, (1, 6): BLOCK, 
             (1, 7): BLOCK, (2, 0): SPACE, (2, 1): BLOCK, (2, 2): SPACE, (2, 3): SPACE, 
             (2, 4): SPACE, (2, 5): SPACE, (2, 6): BLOCK, (2, 7): SPACE, (3, 0): SPACE, 
             (3, 1): SPACE, (3, 2): SPACE, (3, 3): BLOCK, (3, 4): BLOCK, (3, 5): SPACE, 
             (3, 6): BLOCK, (3, 7): SPACE, (4, 0): SPACE, (4, 1): BLOCK, (4, 2): SPACE, 
             (4, 3): BLOCK, (4, 4): SPACE, (4, 5): SPACE, (4, 6): SPACE, (4, 7): SPACE, 
             (5, 0): BLOCK, (5, 1): BLOCK, (5, 2): SPACE, (5, 3): BLOCK, (5, 4): SPACE, 
             (5, 5): BLOCK, (5, 6): SPACE, (5, 7): BLOCK, (6, 0): SPACE, (6, 1): BLOCK, 
             (6, 2): BLOCK, (6, 3): BLOCK, (6, 4): SPACE, (6, 5): BLOCK, (6, 6): SPACE, 
             (6, 7): SPACE, (7, 0): SPACE, (7, 1): SPACE, (7, 2): SPACE, (7, 3): SPACE, 
             (7, 4): SPACE, (7, 5): BLOCK, (7, 6): BLOCK, (7, 7): GOAL}

In [123]:
@dataclass(frozen=True)
class GridState:
    
    x: int
    y: int
    
    def get_neighbor(self, direction: str):
        if direction == "Up":
            return GridState(self.x, self.y-1)
        elif direction == "Down":
            return GridState(self.x, self.y+1)
        elif direction == "Left":
            return GridState(self.x-1, self.y)
        elif direction == "Right":
            return GridState(self.x+1, self.y)
        else:
            assert False, "invalid move"

    def __lt__(self, other):
        '''Your code here, implement a comparison function that should satisfy'''
        return (self.x, self.y) < (other.x, other.y)
    

In [118]:
class GridMazeMDP(FiniteMarkovDecisionProcess[GridState, int], abc.ABC):

    def __init__(
        self,
        state_map: Mapping[Tuple[int,int], str]
    ):
        self.states: Set[GridState] = set([GridState(x[0], x[1]) for x, y in state_map.items()
                                           if y == 'SPACE' or y == 'GOAL'])
        self.moves = ["Up", "Down", "Left", "Right"]
        self.goal: GridState = [GridState(x[0], x[1]) for x, y in state_map.items() if y == 'GOAL'][0]
        super().__init__(self.get_action_transition_reward_map())

    def get_action_transition_reward_map(self):
        d: Dict[GridState, Dict[str, Categorical[Tuple[GridState, float]]]] = {}

        for state in self.states
            if state != self.goal:
                d1: Dict[str, Categorical[Tuple[GridState, float]]] = {}
                for move in self.moves:
                    next_state = state.get_neighbor(move)
                    if next_state in self.states:
                        d1[move] = Constant((next_state, self.reward_func(next_state)))

                d[state] = d1
        return d
    
    @abc.abstractmethod
    def reward_func(self, next_state) -> float:
        pass

        
class GridMazeMDP_Dense(GridMazeMDP):
    
    def reward_func(self, next_state) -> float:
        return -1

class GridMazeMDP_Sparse(GridMazeMDP):
    
    def reward_func(self, next_state) -> float:
        if next_state == self.goal:
            return 1
        return 0


In [124]:
from rl.dynamic_programming import value_iteration
from typing import (Callable, Iterable, Iterator, Optional, TypeVar)
from rl.iterate import converged, iterate, last
from rl.markov_process import NonTerminal
from rl.markov_decision_process import (FiniteMarkovDecisionProcess,
                                        FiniteMarkovRewardProcess)
from rl.policy import DeterministicFinitePolicy
from rl.dynamic_programming import value_iteration, almost_equal_vfs, greedy_policy_from_vf
X = TypeVar('X')
Y = TypeVar('Y')
A = TypeVar('A')
S = TypeVar('S')

DEFAULT_TOLERANCE = 1e-8

# A representation of a value function for a finite MDP with states of
# type S
V = Mapping[NonTerminal[S], float]

def tracked_converge(values: Iterator[X], done: Callable[[X, X], bool]) -> Iterator[X]:
    '''Read from an iterator until two consecutive values satisfy the
    given done function or the input iterator ends.
    Raises an error if the input iterator is empty.
    Will loop forever if the input iterator doesn't end *or* converge.
    '''
    a = next(values, None)
    if a is None:
        return

    yield a

    for i,b in enumerate(values):
        if done(a, b):
            print(f'took {i} iterations to converge')  ### This is the only part you needed to change
            return

        a = b
        yield b

def tracked_converged(values: Iterator[X],
              done: Callable[[X, X], bool]) -> X:
    '''Return the final value of the given iterator when its values
    converge according to the done function.
    Raises an error if the iterator is empty.
    Will loop forever if the input iterator doesn't end *or* converge.
    '''
    result = last(tracked_converge(values, done))

    if result is None:
        raise ValueError("converged called on an empty iterator")

    return result

def almost_equal_vfs(
    v1: V[S],
    v2: V[S],
    tolerance: float = DEFAULT_TOLERANCE
) -> bool:
    '''Return whether the two value function tables are within the given
    tolerance of each other.
    '''
    return max(abs(v1[s] - v2[s]) for s in v1) < tolerance

def tracked_value_iteration_result(
    mdp: FiniteMarkovDecisionProcess[S, A],
    gamma: float
) -> Tuple[V[S], DeterministicFinitePolicy[S, A]]:
    opt_vf: V[S] = tracked_converged(
        value_iteration(mdp, gamma),
        done=almost_equal_vfs
    )
    opt_policy: DeterministicFinitePolicy[S, A] = greedy_policy_from_vf(
        mdp,
        opt_vf,
        gamma
    )

    return opt_vf, opt_policy

In [125]:
gmdp = GridMazeMDP_Dense(maze_grid)
gmdp2 = GridMazeMDP_Sparse(maze_grid)

In [128]:
print("dense rewards:")
vf, op = tracked_value_iteration_result(gmdp, gamma=1)
print("sparse rewards:")
vf_2, op_2 = tracked_value_iteration_result(gmdp2, gamma=0.9)

dense rewards:
took 16 iterations to converge
sparse rewards:
took 16 iterations to converge


In [129]:
op_2.action_for == op.action_for

True