In [20]:
class QTable:
    def __init__(self, n_states, n_actions):
        self.table = [[0] * n_actions for _ in range(n_states)]

    def get_greedy_action_policy(self, state):
        action_values = self.table[state]
        max_q_actions = [
            action 
            for action, q in enumerate(action_values) 
            if q == max(action_values)
        ]
        return [
            1/len(max_q_actions) if action in max_q_actions else 0 
            for action in range(len(action_values))
        ]
    
    def print(self):
        for state, actions in enumerate(self.table):
            print('State', state, '->', actions)

class StateValueTable:
    def __init__(self, n_states):
        self.table = [0] * n_states

    def get_greedy_action_policy(self, action_to_next_state_dynamics, exit_dynamics):
        exit_actions = [action for action, next_state in enumerate(action_to_next_state_dynamics) if next_state in exit_dynamics]
        greedy_actions = exit_actions
        if not greedy_actions:
            actions_next_state_values = [self.table[next_state] for next_state in action_to_next_state_dynamics]
            max_next_state_value = max([next_state_value for next_state_value in actions_next_state_values])
            max_actions = [action for action, next_state_value in enumerate(actions_next_state_values) if next_state_value == max_next_state_value]
            greedy_actions = max_actions
        return [
            1/len(greedy_actions) if action in greedy_actions else 0 
            for action in range(len(action_to_next_state_dynamics))
        ]

    def print(self):
        print(self.table)

class PolicyFunctions:
    @staticmethod
    def combine_policy_with_exploratory_policy(action_policy, exploration_rate):
        exploratory_action_policy_part = [exploration_rate/len(action_policy)] * len(action_policy)
        action_policy_part = [action_policy[action] * (1 - exploration_rate) for action in range(len(action_policy))]
        return [exploratory_action_policy_part[action] + action_policy_part[action] for action in range(len(action_policy))]

    @staticmethod
    def get_max_actions(state_action_value_function):
        max_q = max(state_action_value_function)
        max_actions = [action for action, q in enumerate(state_action_value_function) if q == max_q]
        return max_actions
    
    @staticmethod
    def single_action_as_action_probabilities(chosen_action, n_actions):
        return [
            1 if action == chosen_action else 0 
            for action in range(n_actions)
        ]
    

In [21]:
from AbstractAgent import AbstractAgent
from Runner import Runner
from AbstractEnvironment import AbstractEnvironment
from OneDMazeEnvironment import OneDMazeEnvironment

In [22]:
def test_agent(agent: AbstractAgent, environment: AbstractEnvironment):
    runner = Runner(agent)
    runner.run(environment)
    runner.run(environment, n_episodes=1, verbose=True, learn=False)
    agent.print()
    
environment = OneDMazeEnvironment()

In [23]:
class QLearningAgent(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.1, discount_factor=0.999, exploration_rate=0.1):
        self.q_table = QTable(environment.get_states_count(), environment.get_actions_count())
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate

    def get_action_policy(self, state):
        greedy_action_policy = self.q_table.get_greedy_action_policy(state)
        return PolicyFunctions.combine_policy_with_exploratory_policy(greedy_action_policy, self.exploration_rate)

    def post_act_learning(self, state, action, reward, next_state):
        max_next_q = max(self.q_table.table[next_state])
        self.q_table.table[state][action] += self.learning_rate * (reward + self.discount_factor * max_next_q - self.q_table.table[state][action])

    def print(self):
        self.q_table.print()

test_agent(QLearningAgent(environment), environment)


Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [0.0, 0.12990514265019082]
State 1 -> [0.0006284797834766312, 0.7005051493802641]
State 2 -> [0, 2.6363716982910006]
State 3 -> [0, 6.5132155990000005]
State 4 -> [0, 0]


In [24]:
import random


class DoubleQLearning(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.1, discount_factor=0.999, exploration_rate=0.1):
        self.q_table1 = QTable(environment.get_states_count(), environment.get_actions_count())
        self.q_table2 = QTable(environment.get_states_count(), environment.get_actions_count())
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate

    def get_action_policy(self, state):
        greedy_action_policy = [
            (q1 + q2) / 2 
            for q1, q2 in zip(self.q_table1.table[state], self.q_table2.table[state])
        ]
        return PolicyFunctions.combine_policy_with_exploratory_policy(greedy_action_policy, self.exploration_rate)

    def post_act_learning(self, state, action, reward, next_state):
        if random.choice([True, False]):
            q_table = self.q_table1.table
            source_q_table = self.q_table2.table
        else:
            q_table = self.q_table2.table
            source_q_table = self.q_table1.table
        max_action = q_table[next_state].index(max(q_table[next_state]))
        q_table[state][action] += self.learning_rate * (reward + self.discount_factor * source_q_table[next_state][max_action] - q_table[state][action])

    def print(self):
        self.q_table1.print()
        self.q_table2.print()

test_agent(DoubleQLearning(environment), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [0.0, 0.0]
State 1 -> [0.0, 0.08300374316999999]
State 2 -> [0.0, 1.259487252]
State 3 -> [0.0, 4.0951]
State 4 -> [0, 0]
State 0 -> [0.0, 0.0]
State 1 -> [0.0, 0.0]
State 2 -> [0.0, 0.5872122]
State 3 -> [0, 4.0951]
State 4 -> [0, 0]


In [25]:
class SarsaAgent(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.1, discount_factor=0.999, exploration_rate=0.1):
        self.q_table = QTable(environment.get_states_count(), environment.get_actions_count())
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.prior = None

    def get_action_policy(self, state):
        greedy_action_policy = self.q_table.get_greedy_action_policy(state)
        return PolicyFunctions.combine_policy_with_exploratory_policy(greedy_action_policy, self.exploration_rate)

    def post_act_learning(self, state, action, reward, next_state):
        if self.prior is not None:
            prior_state, prior_action, prior_reward = self.prior
            prior_q = self.q_table.table[prior_state][prior_action]
            current_q = 0
            if state is not None and action is not None:
                current_q = self.q_table.table[state][action]
            self.q_table.table[prior_state][prior_action] += self.learning_rate * (prior_reward + self.discount_factor * current_q - prior_q)
        self.prior = state, action, reward
    
    def post_episode_learning(self):
        self.post_act_learning(None, None, 0, None)
        self.prior = None

    def print(self):
        self.q_table.print()

test_agent(SarsaAgent(environment), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [0.005004282726007624, 0.12756851177600004]
State 1 -> [0.0, 0.6837181994082898]
State 2 -> [0.0, 2.4851379147597004]
State 3 -> [0.16786949971974302, 6.5132155990000005]
State 4 -> [0, 0]


In [26]:
class ExpectedSarsaAgent(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.1, discount_factor=0.999, exploration_rate=0.1):
        self.q_table = QTable(environment.get_states_count(), environment.get_actions_count())
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate

    def get_action_policy(self, state):
        greedy_action_policy = self.q_table.get_greedy_action_policy(state)
        return PolicyFunctions.combine_policy_with_exploratory_policy(greedy_action_policy, self.exploration_rate)

    def post_act_learning(self, state, action, reward, next_state):
        current_q = self.q_table.table[state][action]
        expected_q = sum(self.q_table.table[next_state]) / len(self.q_table.table[next_state])
        self.q_table.table[state][action] += self.learning_rate * (reward + self.discount_factor * expected_q - current_q)

    def print(self):
        self.q_table.print()

test_agent(ExpectedSarsaAgent(environment), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [0.0, 0.021805324858840496]
State 1 -> [0.0, 0.2511367525286398]
State 2 -> [0.015664774090507302, 1.5327158977016397]
State 3 -> [0.07494574527075888, 6.5132155990000005]
State 4 -> [0, 0]


In [27]:
from OneDMazeDynamics import OneDMazeDynamics


class StateValueTemporalDifferenceZero(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.2, discount_factor=0.9, exploration_rate=0.1):
        self.state_value_table = StateValueTable(environment.get_states_count())
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.environment = environment

    def get_action_policy(self, state):
        action_to_next_state_dynamics = OneDMazeDynamics.get_action_to_next_state_dynamics(state)
        goal_dynamics = environment.get_exits()
        greedy_action_policy = self.state_value_table.get_greedy_action_policy(action_to_next_state_dynamics, goal_dynamics)
        return PolicyFunctions.combine_policy_with_exploratory_policy(greedy_action_policy, self.exploration_rate)

    def post_act_learning(self, state, _, reward, next_state):
        next_state_value = self.state_value_table.table[next_state]
        current_state = self.state_value_table.table[state]
        self.state_value_table.table[state] += self.learning_rate * (reward + self.discount_factor * next_state_value - current_state)

    def print(self):
        self.state_value_table.print()

test_agent(StateValueTemporalDifferenceZero(environment), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
[0.8871716125163522, 2.5621334514892804, 5.617713254400001, 8.926258176000001, 0]


In [28]:
class TemporalDifferenceNStepsToExpectedSarsa(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.2, discount_factor=0.9, exploration_rate=0.1, n_steps=2):
        self.q_table = QTable(environment.get_states_count(), environment.get_actions_count())
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.n_steps = n_steps
        self.priors = []

    def get_action_policy(self, state):
        greedy_action_policy = self.q_table.get_greedy_action_policy(state)
        return PolicyFunctions.combine_policy_with_exploratory_policy(greedy_action_policy, self.exploration_rate)

    def post_act_learning(self, state, action, reward, next_state):
        self.priors.append((state, action, reward, next_state))
        if len(self.priors) >= self.n_steps:
            self._update_once_from_priors()

    def post_episode_learning(self):
        while(self.priors):
            self._update_once_from_priors()

    def _update_once_from_priors(self):
        returns = 0
        target_state, target_action, _, _ = self.priors[0]
        for step in range(min(self.n_steps, len(self.priors))):
            _, _, step_reward, _ = self.priors[step]
            returns += step_reward * self.discount_factor ** step
        if len(self.priors) == self.n_steps:
            _, _, _, last_step_next_state = self.priors[self.n_steps-1]
            last_step_expected_q = sum(self.q_table.table[last_step_next_state]) / len(self.q_table.table[last_step_next_state])
            returns += last_step_expected_q * self.discount_factor ** self.n_steps
        target_current_q = self.q_table.table[target_state][target_action]
        self.q_table.table[target_state][target_action] += self.learning_rate * (returns - target_current_q)
        self.priors.pop(0)

    def print(self):
        self.q_table.print()

test_agent(TemporalDifferenceNStepsToExpectedSarsa(environment, n_steps=2), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [0.0, 2.275173868032]
State 1 -> [0.0, 2.3105951729049603]
State 2 -> [0.6311552762880001, 8.0336323584]
State 3 -> [0.0, 8.926258176000001]
State 4 -> [0, 0]


In [29]:
import sys


class MonteCarlo(TemporalDifferenceNStepsToExpectedSarsa):
    def __init__(self, environment: AbstractEnvironment, learning_rate=0.2, discount_factor=0.9, exploration_rate=0.1):
        super().__init__(environment, learning_rate, discount_factor, exploration_rate, n_steps=sys.maxsize)

test_agent(MonteCarlo(environment), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [4.835314518837193, 6.154993527460396]
State 1 -> [0.9845226927719948, 7.020513602387068]
State 2 -> [1.4420670819931014, 8.07854713307136]
State 3 -> [2.9160000000000004, 8.926258176000001]
State 4 -> [0, 0]


In [30]:
class BellmansEquations:
    @staticmethod
    def evaluate_and_improve_policy_iteration(n_states, n_actions, discount, environment_probabilities, environment_rewards, max_evaluate_iterations=100, max_improve_iterations=100, min_evaluation_delta=1e-4):
        policy = [0] * n_states
        for iteration_index in range(max_improve_iterations):
            state_value_function = BellmansEquations._evaluate_policy(n_states, policy, max_evaluate_iterations, discount, min_evaluation_delta, environment_probabilities, environment_rewards)
            policy, is_stable = BellmansEquations._improve_policy(n_states, n_actions, policy, state_value_function, discount, environment_probabilities, environment_rewards)
            if is_stable:
                print(f'Policy improvement stopped at iteration {iteration_index}'); 
                break
        return policy

    @staticmethod
    def _evaluate_policy(n_states, policy, max_iterations, discount, min_evaluation_delta, environment_probabilities, environment_rewards):
        state_value_function = [0] * n_states
        for iteration_index in range(max_iterations):
            max_delta = 0
            for state in range(n_states):
                prior_state_value_function = state_value_function[state]
                action = policy[state]
                state_value_function[state] += BellmansEquations._action_value_update(state, action, state_value_function, environment_probabilities, environment_rewards, discount)
                max_delta = max(max_delta, abs(prior_state_value_function-state_value_function[state]))
            if max_delta < min_evaluation_delta: 
                print(f'Policy evaluation stopped at iteration {iteration_index}'); 
                break
        return state_value_function
    
    @staticmethod
    def _improve_policy(n_states, n_actions, policy, state_value_function, discount, environment_probabilities, environment_rewards):
        action_value_function = [[0] * n_actions for _ in range(n_states)]
        is_stable = True
        for state in range(n_states):
            for action in range(n_actions):
                action_value_function[state][action] = BellmansEquations._action_value_update(state, action, state_value_function, environment_probabilities, environment_rewards, discount)
            new_max_actions = PolicyFunctions.get_max_actions(action_value_function[state])
            if policy[state] != new_max_actions[0]:
                is_stable = False
            policy[state] = new_max_actions[0]
        return policy, is_stable
    
    @staticmethod
    def policy_value_iteration(n_states, n_actions, discount, environment_probabilities, environment_rewards, max_iterations=100, min_evaluation_delta=1e-4):
        state_value_function = [0] * n_states
        policy = [0] * n_states
        for iteration_index in range(max_iterations):
            max_delta = 0
            for state in range(n_states):
                prior_state_value_function = state_value_function[state]
                action_value_function = [
                    BellmansEquations._action_value_update(state, action, state_value_function, environment_probabilities, environment_rewards, discount)
                    for action in range(n_actions)
                ]
                state_value_function[state] = max(action_value_function)
                max_delta = max(max_delta, abs(prior_state_value_function-state_value_function[state]))
                policy[state] = action_value_function.index(max(action_value_function))
            if max_delta < min_evaluation_delta: 
                print(f'Stopped at iteration {iteration_index}'); 
                break
        return policy
    
    @staticmethod
    def _action_value_update(state, action, state_value_function, environment_probabilities, environment_rewards, discount):
        return sum([
            (next_state_reward_probability * (environment_rewards[reward_index] + discount * state_value_function[next_state]))
            for next_state, next_state_reward_probabilities in enumerate(environment_probabilities[state][action])
            for reward_index, next_state_reward_probability in enumerate(next_state_reward_probabilities)
        ])

In [40]:
class PolicyIterationAgent(AbstractAgent):
    def __init__(self, environment: OneDMazeEnvironment, discount=0.9):
        self.n_actions = environment.get_actions_count()
        environment_probabilities = OneDMazeDynamics.build_environment_probabilities(environment)
        environment_rewards = environment.get_reward_values()
        self.policy = BellmansEquations.evaluate_and_improve_policy_iteration(environment.get_states_count(), self.n_actions, discount, environment_probabilities, environment_rewards)

    def get_action_policy(self, state):
        policy_action = self.policy[state]
        return PolicyFunctions.single_action_as_action_probabilities(policy_action, self.n_actions)

    def print(self):
        print(self.policy)

test_agent(PolicyIterationAgent(environment), environment)


Policy evaluation stopped at iteration 0
Policy improvement stopped at iteration 0
Episode: 1
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | 

In [45]:
class ValueIterationAgent(AbstractAgent):
    def __init__(self, environment: OneDMazeEnvironment, discount=0.9):
        self.n_actions = environment.get_actions_count()
        environment_probabilities = OneDMazeDynamics.build_environment_probabilities(environment)
        environment_rewards = environment.get_reward_values()
        self.policy = BellmansEquations.policy_value_iteration(environment.get_states_count(), self.n_actions, discount, environment_probabilities, environment_rewards)

    def get_action_policy(self, state):
        policy_action = self.policy[state]
        return PolicyFunctions.single_action_as_action_probabilities(policy_action, self.n_actions)

    def print(self):
        print(self.policy)
    
test_agent(ValueIterationAgent(environment), environment)

Stopped at iteration 57
Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
[1, 1, 1, 1, 0]


In [33]:
import math


class SoftmaxFunctions:
    @staticmethod
    def actions_softmax(state_q, temperature):
        action_exponentials = [math.exp(q / temperature) for q in state_q]
        exponentials_sum = sum(action_exponentials)
        return [action_exponential / exponentials_sum for action_exponential in action_exponentials]

    @staticmethod
    def action_softmax(state_q, action, temperature):
        softmaxed_actions = SoftmaxFunctions.actions_softmax(state_q, temperature)
        return softmaxed_actions[action]


In [34]:
class ReinforceAgent(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, discount=0.9, learning_rate=0.1, softmax_temperature=0.5):
        self.learning_rate = learning_rate
        self.softmax_temperature = softmax_temperature
        self.discount = discount
        self.critic_state_value_table = StateValueTable(environment.get_states_count())
        self.actor_q_table = QTable(environment.get_states_count(), environment.get_actions_count())
        self.priors = []

    def get_action_policy(self, state):
        return SoftmaxFunctions.actions_softmax(self.actor_q_table.table[state], self.softmax_temperature)

    def post_act_learning(self, state, action, reward, next_state):
        self.priors.append((state, action, reward, next_state))

    def post_episode_learning(self):
        step_discount = self.discount ** (len(self.priors)-1)
        returns = 0
        for step in range(len(self.priors)-1, -1, -1):
            state, action, reward, _ = self.priors[step]
            returns = reward + step_discount * returns
            state_value_delta = returns - self.critic_state_value_table.table[state]
            self.critic_state_value_table.table[state] += self.learning_rate * state_value_delta
            action_softmax = SoftmaxFunctions.action_softmax(self.actor_q_table.table[state], action, self.softmax_temperature)
            self.actor_q_table.table[state][action] += self.learning_rate * state_value_delta * step_discount * (1 - action_softmax) / self.softmax_temperature
            step_discount /= self.discount
        self.priors = []

    def print(self):
        self.actor_q_table.print()

test_agent(ReinforceAgent(environment), environment)

Episode: 1
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [1.0554059492597547, 1.6138520321979528]
State 1 -> [0.0023655956006757434, 1.058932816142716]
State 2 -> [-0.0030990059157742483, 1.1562005702561202]
State 3 -> [-0.14736275187686232, 1.1460945933307476]
State 4 -> [0, 0]


In [35]:
class ActorCriticAgent(AbstractAgent):
    def __init__(self, environment: AbstractEnvironment, discount=0.9, learning_rate=0.1, softmax_temperature=0.5):
        self.learning_rate = learning_rate
        self.softmax_temperature = softmax_temperature
        self.discount = discount
        self.critic_state_value_table = StateValueTable(environment.get_states_count())
        self.actor_q_table = QTable(environment.get_states_count(), environment.get_actions_count())

    def get_action_policy(self, state):
        return SoftmaxFunctions.actions_softmax(self.actor_q_table.table[state], self.softmax_temperature)

    def post_act_learning(self, state, action, reward, next_state):
        state_value = self.critic_state_value_table.table[state]
        next_state_value = self.critic_state_value_table.table[next_state]
        state_value_delta = reward + self.discount * next_state_value - state_value
        self.critic_state_value_table.table[state] += self.learning_rate * state_value_delta
        action_softmax = SoftmaxFunctions.action_softmax(self.actor_q_table.table[state], action, self.softmax_temperature)
        self.actor_q_table.table[state][action] += self.learning_rate * state_value_delta * (1 - action_softmax) / self.softmax_temperature

    def print(self):
        self.actor_q_table.print()

test_agent(ActorCriticAgent(environment), environment)
    

Episode: 1
[A| | | |E]
[A| | | |E]
[A| | | |E]
[A| | | |E]
[ |A| | |E]
[ | |A| |E]
[ | | |A|E]
[ | | | |A]
State 0 -> [-0.003106927674276576, 0.12759786577229343]
State 1 -> [-0.05755221284297138, 0.4900252515350916]
State 2 -> [-0.31684825117116694, 1.0025585369715866]
State 3 -> [0, 1.7020922904038942]
State 4 -> [0, 0]
