In [1]:
# install the gym module that contains the NChain environment
#!pip install gym==0.13.1

In [2]:
# load the necessary python modules
import numpy as np
import matplotlib.pyplot as plt
from tabulate import tabulate
import random
import gym
import sys
import warnings
import time

# ignore warnings
if not sys.warnoptions:
    warnings.simplefilter("ignore")
# initialize the nchain environment
env = gym.make('NChain-v0')
# get 10 randomly sampled actions
[env.action_space.sample() for _ in range(10)]

[1, 1, 0, 0, 0, 0, 1, 1, 1, 0]

In [3]:
# get 10 randomly sampled states
[env.observation_space.sample() for _ in range(10)]

[3, 3, 0, 2, 2, 1, 0, 3, 4, 3]

In [4]:
class Qagent(object):
    """
    Implementation of a Q-learning Algorithm
    """

    def __init__(
        self,
        _action_size: int,
        _state_size: int,
        _learning_parameters: dict,
        _exploration_parameters: dict,
        _name: str = "agent",
        _color: str = "r",
    ) -> None:
        """ initialize the q-learning agent

        Args:
            _action_size (int): number of actions the agent can take
            _state_size (int): number of states the env has
            _learning_parameters (dict): learning parameters of the agent
            _exploration_parameters (dict): exploration parameters for the agent
            _name (str, optional):  set the name of the Q-Agent. Defaults to "agent".
            _color (str, optional): set the color of the agent for plotting. Defaults to "r".
        """
        self.name = _name
        self.color = _color

        self.action_size = _action_size
        self.state_size = _state_size
        self.qtable = np.zeros((_state_size, _action_size))

        self.learning_rate = _learning_parameters["learning_rate"]
        self.gamma = _learning_parameters["gamma"]

        self.epsilon = _exploration_parameters["epsilon"]
        self.max_epsilon = _exploration_parameters["max_epsilon"]
        self.min_epsilon = _exploration_parameters["min_epsilon"]
        self.decay_rate = _exploration_parameters["decay_rate"]

    def update_qtable(
        self, state: int, new_state: int, action: int, reward: int, done: bool
    ) -> None:
        """
        update the q-table: Q(s,a) = Q(s,a) + lr  * [R(s,a) + gamma * max Q(s',a') - Q (s,a)]

        Args:
          state (int): current state of the environment
          new_state (int): new state of the environment
          action (int): current action taken by agent
          reward (int): current reward received from env
          done (boolean): variable indicating if env is done
        """
        new_qvalue = (
            reward
            + self.gamma * np.max(self.qtable[new_state, :]) * (1 - done)
            - self.qtable[state, action]
        )
        self.qtable[state, action] = (
            self.qtable[state, action] + self.learning_rate * new_qvalue
        )

    def update_epsilon(self, episode: int) -> None:
        """
        reduce epsilon, exponential decay

        Args:
          episode (int): number of episode
        """
        self.epsilon = self.min_epsilon + (
            self.max_epsilon - self.min_epsilon
        ) * np.exp(-self.decay_rate * episode)

    def get_action(self, state: int) -> int:
        """
        select action e-greedy.exploration-exploitation trade-off:
        - exploitation, max value for given state
        - exploration, random choice

        Args:
          state (int): current state of the environment/agent

        Returns:
          action (int): action that the agent will take in the next step
        """
        if random.uniform(0, 1) >= self.epsilon:
            action = np.argmax(self.qtable[state, :])
        else:
            action = np.random.choice(self.action_size)
        return action

    def __str__(self, tablefmt="fancy_grid") -> str:
        """plot the q-table. Generate the table in fancy format.
        """
        headers = [f"Action {action}" for action in range(self.action_size)]
        show_index = [f"State {state}" for state in range(self.state_size)]
        table = tabulate(self.qtable, headers=headers, showindex=show_index, tablefmt="fancy_grid")
        return f"{self.name}\n{table}"

In [5]:
def learn_to_play(agent: Qagent, _max_game_steps: int = 10, _total_episodes: int = 1000) -> Qagent:
    """
    implementation of the q-learning algorithm, here the q-table values are calculated

    Args:
      _max_game_steps (int): number of steps an agent can take, before the environment is reset
      _total_episodes (int): total of training episodes (the number of trials an agent can do)
      agent (Qagent):
    """

    rewards = np.zeros(_total_episodes)
    epsilons = np.zeros(_total_episodes)
    last_states = np.zeros(_total_episodes)
    q_averages = np.zeros(_total_episodes)

    start = time.time()

    for episode in range(_total_episodes):

        state = env.reset()
        game_rewards = 0

        # for each episode loop over the max number of steps that are possible
        # take an action and observe the outcome state (new_state), reward and stopping criterion
        for step in range(_max_game_steps):

            action = agent.get_action(state)
            new_state, reward, done, _ = env.step(action)
            agent.update_qtable(state, new_state, action, reward, done)
            state = new_state
            game_rewards += reward

            if done:
                break

        rewards[episode] = game_rewards
        last_states[episode] = state
        epsilons[episode] = agent.epsilon
        q_averages[episode] = np.sum(agent.qtable)

        # reduce epsilon, for exploration-exploitation tradeoff
        agent.update_epsilon(episode)

        if episode % 300 == 0:
            elapsed_time = round((time.time() - start), 1)
            print(f"elapsed time [sec]: {elapsed_time}, episode: {episode}")

    agent.rewards = rewards
    agent.last_states = last_states
    agent.epsilons = epsilons
    agent.q_averages = q_averages
    return agent
action_size = env.action_space.n
state_size = env.observation_space.n

# Set the training parameters
env.env.slip = 0.0  # avoid slipping in on the chain

max_game_steps = 10  # Set number of steps an agent can take, before the environment is reset,
total_episodes = 1000  # Set total of training episodes (the number of trials an agent can do)


In [6]:
name = 'Smart Agent 1 - the agent explores and takes future rewards into account'
color = "orange"

learning_parameters = {
    'learning_rate': 0.8,
    'gamma': 0.9 
}  
exploration_parameters = {
    'epsilon': 1,
    'max_epsilon': 1,
    'min_epsilon': 0.0,
    'decay_rate': 0.008
} 

q_agent_1 = Qagent(action_size, state_size, learning_parameters, exploration_parameters, name, color)
q_agent_1 = learn_to_play(q_agent_1, _max_game_steps=max_game_steps, _total_episodes=total_episodes)

elapsed time [sec]: 0.0, episode: 0
elapsed time [sec]: 0.1, episode: 300
elapsed time [sec]: 0.1, episode: 600
elapsed time [sec]: 0.1, episode: 900


In [7]:
name = 'Greedy Agent 2 - the agent cares only about immediate rewards (small gamma)'
color =  "m"

learning_parameters = {
    'learning_rate': 0.8,
    'gamma': 0.01
}   
exploration_parameters = {
    'epsilon': 1,
    'max_epsilon': 0.5,
    'min_epsilon': 0.0,
    'decay_rate': 0.008
} 

q_agent_2 = Qagent(action_size, state_size, learning_parameters, exploration_parameters, name, color)
q_agent_2 = learn_to_play(q_agent_2, _max_game_steps=max_game_steps, _total_episodes=total_episodes)

elapsed time [sec]: 0.0, episode: 0
elapsed time [sec]: 0.0, episode: 300
elapsed time [sec]: 0.1, episode: 600
elapsed time [sec]: 0.1, episode: 900


In [8]:
name = "Shy Agent 3 - the agent doesn't explore the environment (small epsilon)"
color = "b"

learning_parameters = {
    'learning_rate': 0.8,
    'gamma': 0.9
} 
exploration_parameters = {
    'epsilon': 1,
    'max_epsilon': 0.2,
    'min_epsilon': 0.0,
    'decay_rate': 0.5
} 

q_agent_3 = Qagent(action_size, state_size, learning_parameters, exploration_parameters, name, color)
q_agent_3 = learn_to_play(q_agent_3, _max_game_steps=max_game_steps, _total_episodes=total_episodes)

elapsed time [sec]: 0.0, episode: 0
elapsed time [sec]: 0.0, episode: 300
elapsed time [sec]: 0.1, episode: 600
elapsed time [sec]: 0.1, episode: 900


In [10]:
# from helper_functions.visualize_plays import VisualizePlays
# visualize the different agents
#plays = VisualizePlays(q_agent_1, q_agent_2, q_agent_3)
#plays.plot()

In [11]:
#!pip install helper_functions

In [12]:
print(q_agent_1)

Smart Agent 1 - the agent explores and takes future rewards into accountt
╒═════════╤════════════╤════════════╕
│         │   Action 0 │   Action 1 │
╞═════════╪════════════╪════════════╡
│ State 0 │      65.61 │     61.049 │
├─────────┼────────────┼────────────┤
│ State 1 │      72.9  │     61.049 │
├─────────┼────────────┼────────────┤
│ State 2 │      81    │     61.049 │
├─────────┼────────────┼────────────┤
│ State 3 │      90    │     61.049 │
├─────────┼────────────┼────────────┤
│ State 4 │     100    │     61.049 │
╘═════════╧════════════╧════════════╛


In [13]:
print(q_agent_2)

Greedy Agent 2 - the agent cares only about immediate rewards (small gamma)
╒═════════╤════════════╤════════════╕
│         │   Action 0 │   Action 1 │
╞═════════╪════════════╪════════════╡
│ State 0 │  0.020202  │    2.0202  │
├─────────┼────────────┼────────────┤
│ State 1 │  0.019137  │    2.0202  │
├─────────┼────────────┼────────────┤
│ State 2 │  0.020202  │    1.93939 │
├─────────┼────────────┼────────────┤
│ State 3 │  0.0185613 │    2.0202  │
├─────────┼────────────┼────────────┤
│ State 4 │  0         │    2.00339 │
╘═════════╧════════════╧════════════╛
