The goal of an agent in Reinforcement learning is to maximise some numerical value that represents some objective. In order to do so the agent must learn the optimal policy, which is a function of states to actions to take at those states. One method used to solve these types of problems is Q Learning.

Q Learning is a Value iteration method used to solve RL problems formalised as a finite MDP by taking advantage of Value functions and more specifically, the action-value function. The action-value function or Q function determines the Q value of a state action pair

Install packages

In [1]:
!pip install gym numpy 'gym[toy_text]' pyglet > /dev/null 2>&1  # > /dev/null 2>&1 sends stdout and stderr to /dev/null (the "void") instead of displaying below

Import packages

In [46]:
import gym
import numpy as np

print(gym.__version__)  # 0.26.1 as of time of writing

0.26.1


Seed

In [None]:
seed = None
np.random.seed(seed)
# Why it needs to be set for tabular

## The Environment

- The environment is the world in which the agent lives. It is formalised as a finite Markov Decision Process where the rewards and state transition probabilities are unknown to the agent.
- FrozenLake is a grid environment that is described as a frozen over lake containing holes in it in which the agent can fall through. The ice is also slippy so there is a possibility the agent may slip in either adjacent direction of the chosen action (meaning state transitions are stochastic not deterministic).
- At every time step the agent has access to four actions, Up, Down, Left and Right. If on the edge of the grid, the agent will not be able to move off the grid and remain in the same state if they try to do so (or adjacent if they slip)
- This is a Tabular Stochastic Environment and standard Q Learning is capable of solving it.

In [8]:
env_id = "FrozenLake-v1"
# env_id = "FrozenLake8x8-v1"
env = gym.make(env_id, render_mode='ansi')

Let's see the environment in action:

In [47]:
env.reset()  # Environment needs to be reset every time a new episode needs to start
print("Starting State", env.render())
for step in range(50):  # Time steps

    rand_action = env.action_space.sample()  # Samples a random action from all possible actions

    next_state, reward, terminal, _, prob = env.step(rand_action)  # The step function is the whole environment as a function. Takes in actions and outputs information such as reward, the next state, if it was a terminal state and the state transition probability

    print(f"State at step {step} after action{env.render()}")  # the render function returns data to see what is happening in the environment

    if terminal:  # If we land on a terminal state the episode is over, and we need to reset to start the next episode
        print(f"Terminal state reached after action{env.render()}")  # the render function returns data to see what is happening in the environment
        env.reset()
        print("New Episode")
        print("Starting State", env.render())


Starting State 
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 0 after action  (Right)
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 1 after action  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 2 after action  (Down)
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 3 after action  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG

State at step 4 after action  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 5 after action  (Right)
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 6 after action  (Right)
SFFF
[41mF[0mHFH
FFFH
HFFG

State at step 7 after action  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG

State at step 8 after action  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG

State at step 9 after action  (Right)
SFFF
FHFH
FFFH
[41mH[0mFFG

Terminal state reached after action  (Right)
SFFF
FHFH
FFFH
[41mH[0mFFG

New Episode
Starting State 
[41mS[0mFFF
FHFH
FFFH
HFFG

State at step 10 after action  (Right)
SFFF
[41mF[0mHFH
FFFH
HFFG

State at step 11 after action  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG

State at ste

After running the previous cell we can see the agent randomly moving around its environment. However, we can also see strange behaviour of the agent when the agent may choose to move in a certain direction but it did not end up in its desired state. This is due to the environment being stochastic and the agent having a 33% chance of moving in its desired action and another 33% chance of moving in either adjacent direction.

Make the Agent
- Q Learning -> Find the optimal policy by learning the Q Values for each state-action pair
- q_pi(s, a) = E[G_t | S_t = a, A_t = a] = E[sum(gamma^k * R_t+k+1 | S_t = a, A_t = a]

## The Agent
In Q Learning the agents aim is to find the optimal policy by learning the Q values for each state action pair using the Bellman and then following the Optimal Action-Value Function from there on out

In [None]:
class QAgent:

    def __init__(self, q_env: gym.envs, agent_seed = None):
        self.env = q_env  # The environment the agent will live in
        self.seed = agent_seed

        self.obs_space = self.env.observation_space.n  # Number of states
        self.action_space = self.env.action_space.n  # Number of actions
        self.q_table = np.zeros((self.obs_space, self.action_space))  # Table of shape (States, Actions)

        self.alpha = 0.1  # Learning rate
        self.gamma = 0.99  # Expected return discount factor

        self.max_time_steps = 0
        self.max_epsilon = 1.0
        self.min_epsilon = 0.05


    def learn(self, max_time_steps):
        """
        Attempt to learn an optimal policy - Q Learning does so be estimating the Q Value of each state action pair
        :param max_time_steps:
        :return:
        """
        self.max_time_steps = max_time_steps

        # Log data
        total_reward = 0
        total_episodes = 0
        episode_steps = 0
        episode_wins = 0

        rewards = []

        state, _ = self.env.reset(seed=self.seed)  # Get initial state
        for steps in range(max_time_steps):  # Run training until max steps is reached

            epsilon = max(self.min_epsilon, self.max_epsilon - ((self.max_epsilon / max_time_steps) * steps))
            if epsilon > np.random.random():
                action =  self.env.action_space.sample()
            else:
                action = self.get_action(state)  # Choose an action based on the current state


            next_state, reward, done, _, prob = self.env.step(action)  # Take the action
            self._train(state, action, reward, next_state)  # Train the agent

            # Update data
            total_reward += reward
            state = next_state
            episode_steps += 1

            if done or self.env.spec.max_episode_steps <= episode_steps:  # If a terminal state or max steps is reached
                state, _ = self.env.reset(seed=self.seed)  # Get initial state
                total_episodes += 1  # Increment number of Episodes completed
                rewards.append(total_reward)

                print({
                    "Episode" : total_episodes,
                    "Reward": total_reward,
                    "Steps" : episode_steps
                })

                # Reset episodic data
                total_reward = 0
                episode_steps = 0

    def get_action(self, obs):
        """
        Check the Q Table to see which Action at State "obs" returns the greatest Q Value
        Greedy function used to determine if action is Explore or Exploit
        :param obs: The current state the agent exists at
        :return: The action that is expected to return the largest Q Value
        """


        return np.argmax(self.q_table[obs, :])


    def test(self, episodes = 5):
        # Show the Agent interacting with its environment
        test_rewards = []
        test_total_reward = 0

        state = self.env.reset()[0]
        for episode in range(episodes):
            action = self.get_action(state)  # Choose an action based on the current state
            next_state, reward, done, _, prob = self.env.step(action)  # Take the action
            test_total_reward += reward

            if done:
                test_rewards.append(test_total_reward)
                test_total_reward = 0

    def _train(self, s, a, r, ns):
        """
        Value Iteration
        Update the Q Value for each state-action pair by using the Bellman optimality equation until convergence
        => q* (s, a) = E[R_t+1 + gamma * max(a') q* (s', a')
        =>
        :return: None
        """
        gradient = (1 - self.alpha) + self.alpha * (r + self.gamma * np.max(self.q_table[ns, :]))
        self.q_table[s, a] *= gradient




Load the Environment

Load the agent

In [None]:
agent = QAgent(env, agent_seed=seed)

Train the agent

In [None]:
max_steps = 1_000_000
agent.learn(max_steps)

In [None]:
agent.env.render_mode = "human"
agent.test()