In [1]:
class BasePolicy:
    """ Base Policy Class"""

    def __init__(self, action_space):
        """

        :param action_space: OpenAI Gym Space object
        """
        self.action_space = action_space

    def get_action(self, state):
        """

        :param state: current environment state
        :return: action to take based on the policy
        """
        return self.action_space.sample()

In [2]:
class BaseAgent(object):
    """ Abstract Agent class. """

    def __init__(self, observation_space: Space, action_space: Space, name="BaseAgent", params={'gamma': 0.95},
                 specs=None):
        """

        :param observation_space: Environment's observation space
        :param action_space: Environment's action space
        :param name: Agent's Name
        :param params: Hyper-parameters etc
        :param specs: Specifies space types agent is compactible with (eg Discrete for RMax)
        """

    def learn(self, state, reward=None, done=False):
        """
        Returns an action during agent's training

        :param state: new environment state
        :param reward: reward due to arriving in state
        :param done: boolean indicating whether the episode is complete
        :return action: an action to take on the environment
        """
    
    def predict(self, state):
        """
        Returns an action during agent's training

        :param state: new environment state
        :return action: agent's optimal action
        
        """
    
    def start_of_episode(self, state):
        """

        :param state: environment start state
        :return: action to take
        """

    def end_of_episode(self):
        """
        ends episode
        """

    def _stepwise_update(self, state, reward):
        """
        agent implements this
        """
        pass

    def _episodic_update(self):
        """
        agent implements this
        """
        pass


In [None]:
'''
RMaxAgentClass.py: Class for an RMaxAgent from [Strehl, Li and Littman 2009].

Notes:
    - Assumes WLOG reward function codomain is [0,1] (so RMAX is 1.0)
'''

# Python imports.
import random
import numpy as np
from itertools import product

# Local classes.
from core.agents.base import BaseAgent
from core.agents.models import RMaxModel
from core.agents.policies import ExploreLeastKnown, DiscreteTabularPolicy
from core.utils import constants, specs


RMAX_DEFAULTS = {
    'epsilon': 0,  # There's no exploration in R-Max
    'gamma': 0.95,  # discount factor
    'known_threshold': 5,  # number of occurrences of (state, action) pairs before it is marked as known
    'max_reward': 1,  # maximum reward
    'epsilon_one': 0.99,  #  precision parameter for policy iterations
    'max_stepwise_backups': 20,  # maximum number of backups per experience/transition during training
    'max_episodic_backups': 0,  # maximum number of backups at the end of an episode
}

RMAX_SPEC = specs.AgentSpec(
    observation_space=constants.SpaceTypes.DISCRETE,
    action_space=constants.SpaceTypes.DISCRETE
)

class RMaxAgent(BaseAgent):
    '''
    Implementation for an R-Max Agent [Strehl, Li and Littman 2009]
    '''

    def __init__(self, observation_space, action_space, name="RMax Agent", parameters={}, starting_policy=None):
        BaseAgent.__init__(self, observation_space, action_space, name, params=dict(RMAX_DEFAULTS, **parameters), specs=RMAX_SPEC)

        # Policy Setup
        if starting_policy:
            self.predict_policy = starting_policy
        else:
            self.predict_policy = DiscreteTabularPolicy(self.observation_space, self.action_space, default_value=1/(1-self.gamma))
        self.backup_lim = int(np.log(1/(self.params['epsilon_one'] * (1 - self.gamma))) / (1 - self.gamma))
        self.policy_iterations = 0

        # Model Setup
        self.model = RMaxModel(observation_space, action_space, default_reward=self.params['max_reward'], limit=self.params['known_threshold'])

        self.learn_policy = ExploreLeastKnown(
                action_space=self.action_space,
                policy=self.predict_policy,
                model=self.model
            )

    def _stepwise_update(self, state, reward):
        if not self.model.is_known(self.prev_state, self.prev_action):
            self.model.update(self.prev_state, self.prev_action, reward, state)
            if self.model.is_known_state(self.prev_state):
                self.vectorized_iterate_policy(num_steps=min(self.backup_lim, self.params['max_stepwise_backups']))

    def _episodic_update(self):
        self.vectorized_iterate_policy(num_steps=self.params['max_episodic_backups'])

    def _vectorized_iterate_policy(self, num_steps):
        for _ in range(num_steps):
            assert (self.model.known_rewards < 1).any()
            self.predict_policy.q_table = self.model.known_rewards + self.gamma*np.dot(self.model.known_transitions, self.predict_policy.get_max_q_values())
