In [10]:
from typing import Tuple, Sequence, Set, Mapping, Dict, Callable, Optional,TypeVar
from dataclasses import dataclass
from operator import itemgetter
from rl.distribution import Categorical, Choose, Constant
from rl.markov_decision_process import FiniteMarkovDecisionProcess
from rl.markov_decision_process import StateActionMapping
from rl.markov_decision_process import FinitePolicy
from rl.dynamic_programming import value_iteration_result, V

In [14]:
# SARSA
# using interface from our final

S = TypeVar('S')
A = TypeVar('A')
def get_sarsa_vf_and_policy(
    states_actions_dict: Mapping[S, Optional[Set[A]]],
    sample_func: Callable[[S, A], Tuple[S, float]],
    terminals,
    epsilon_greedy_action: Callable[[S, Mapping[A, float],float],A],
    episodes: int = 10000,
    step_size: float = 0.01
    ) -> Tuple[V[S], FinitePolicy[S, A]]:
    q: Dict[S, Dict[A, float]] = \
        {s: {a: 0. for a in actions} for s, actions in
            states_actions_dict.items() if actions is not None}
    nt_states: Set[S] = {s for s in q}
    uniform_states: Choose[S] = Choose(nt_states)
    for episode_num in range(episodes):
        epsilon: float = 1.0 / (episode_num + 1)
        state: S = uniform_states.sample()
        action = epsilon_greedy_action(S, q, epsilon)            
        while state not in terminals:
            (next_state, reward)=sample_func(state,action)
            if next_state in terminals:
                break
            next_action = epsilon_greedy_action(next_state, q, epsilon)
            q[state][action]=q[state][action]+step_size*(reward+q[next_state][next_action]
                                                         -q[state][action])
            state = next_state
            action= next_action            
    vf_dict: V[Cell] = {s: max(d.values()) for s, d in q.items()}
    policy: FinitePolicy[S, A] = FinitePolicy(
        {s: Constant(max(d.items(), key=itemgetter(1))[0])
            for s, d in q.items()}
        )
    return (vf_dict, policy)

In [16]:
#Q-learning
def get_q_learning_vf_and_policy(
    states_actions_dict: Mapping[S, Optional[Set[A]]],
    sample_func: Callable[[S, A], Tuple[S, float]],
    terminals,
    epsilon_greedy_action: Callable[[S, Mapping[A, float],float],A],
    episodes: int = 10000,
    step_size: float = 0.01,
    epsilon: float = 0.1
) -> Tuple[V[S], FinitePolicy[S, A]]:
    q: Dict[S, Dict[A, float]] = \
        {s: {a: 0. for a in actions} for s, actions in
            states_actions_dict.items() if actions is not None}
    nt_states: CellSet = {s for s in q}
    uniform_states: Choose[S] = Choose(nt_states)
    for episode_num in range(episodes):
        state: S = uniform_states.sample()
        while state not in self.terminals:
            action = epsilon_greedy_action(state, q, epsilon)
            (next_state, reward)=sample_func(state,action)
            if next_state in terminals:
                break
            q[state][action]=q[state][action]+step_size*(reward
                                                         +max(q[next_state].values())
                                                         -q[state][action])
            state = next_state
            
    vf_dict: V[S] = {s: max(d.values()) for s, d in q.items()}
    policy: FinitePolicy[S, A] = FinitePolicy(
        {s: Constant(max(d.items(), key=itemgetter(1))[0])
        for s, d in q.items()})
    return (vf_dict, policy)