In [1]:
# uncomment here if you need to install the relevant packages
# !pip install gymnasium 
# !pip install numpy

# Reinforcement Learning Lab 1: Tabular RL
---
In this lab session we will aim to accomplish the following:
- An introduction to [Gym(nasium)](https://gymnasium.farama.org); a commonly used package with benchmark RL environments
- Run and compare Monte-Carlo and Temporal Difference based algorithms;
- Write code that visualises the results of training the various algorithms.

Note that I've tried to highlight in **bold** parts of the lab that you can do yourself, or questions that you can discuss with the person sitting next to you, or ask me about in the lab. 

### First, we will begin by installing Gym and looking at it's functionality. 
Gym is an important framework in RL because it provides a standardised interface for interacting with RL environments, allowing researchers and practitioners to easily compare the performance of different RL algorithms on a wide range of environments. It also provides a collection of diverse and challenging environments that can be used for benchmarking and testing new algorithms.

Now, let's take a look at how a gym environment works! We will start by creating a copy of a Blackjack environment. For any regular Blackjack players, information about the specific rules of this Blackjack game can be found in the [documentation](https://gymnasium.farama.org/environments/toy_text/blackjack/), with a major difference between this implementation and that of a real casino is that the cards are drawn from an infinite deck. 

In [2]:
import gymnasium as gym
env = gym.make('Blackjack-v1')

As we can see, to create a copy of an environment we use the command `gym.make()` with a string of the environment name that we'd like to create. For more environment names that can be used, you can look at the Gym documentation that was linked above. The format is usually *env_name*-*version*, so be careful to specify the most recent version -- this again can be found in the documentation; for example, if we go to the documentation for the [Lunar Lander environment](https://gymnasium.farama.org/environments/box2d/lunar_lander/) we can scroll down to see the version history. 

The environment object provides us with information about the environment, most importantly about the state and action space:

In [3]:
print(env.observation_space)
print(env.action_space)

Tuple(Discrete(32), Discrete(11), Discrete(2))
Discrete(2)


From the output, we can see that the observation space is a 3-tuple. Here, `Discrete(N)` simply means that we have `N` discrete options. For the state space, this corresponds to the players current sum, the value of the dealers visible card, and whether or not the player holds a useable ace, respectively. The action space is simply whether to hit or stick. The rewards for the game are +1 for winning, +1.5 for winning with a 'natural' blackjack, -1 for losing and 0 for a draw. The episode ends whenever a player sticks, when a player hits and the sum of the cards exceeds 21, or if the player has blackjack. 

Now that we have the environment set up, we can play a game using a random policy!

In [4]:
import numpy as np
np.random.seed(3)
random_blackjack_scores = []
for episode in range(10000):
    state, _ = env.reset()  # this is how we reset the environment.
    done = False  # this tells us that the game is not over yet.
    score = 0  # to keep track of the rewards during the episode.
    while not done: 
        action = env.action_space.sample()  # this samples an action uniformly at random from the action space.
        next_state, reward, terminated, truncated, _ = env.step(action)  # here we take a step in the env using the sampled action.
        score += reward  # add on the observed reward.
        done = terminated or truncated  # tells us whether the episode is over.
        state = next_state  # here we reset the current state variable to be the next state from the previous transition.
    random_blackjack_scores.append(score)
print(f"The random policy got an average score of {np.mean(random_blackjack_scores)}")

The random policy got an average score of -0.4051


The above loop is something you will see a lot whilst learning Reinforcement Learning, and is why the gym framework is a valuable asset as it allows us a simple, clean way of looping through episodes of an environment. Note that when we reset the environment, `env.reset()` will actually return two objects -- the `state` and `info` (which we assign to an unnamed variable). The `info` is there to tell us if there are any problems with the environment or any bits of information that may be useful, and is only really used for debugging if something is going wrong. It is also returned when calling `env.step()`, but for now we don't need to worry about it. 

The other thing to note is that we are returned a `terminated` and a `truncated` flag by the environment when we take a step. This is not something we need to worry about for Blackjack, but in continuing problems (i.e. those with no terminal states), or those with very long time horizons, it can be useful to reset the environment whilst training before an episode ends. However, we don't want to treat this as a terminal state, and so the `terminated` flag is what we use to tell the algorithm whether or not the state is terminal, whereas the `truncated` flag is for our use in training to tell us whether or not we should reset the environment, so during training we now have two reset conditions: whether the state was truly terminal, or whether we are truncating the episode. 

Next we'll move on to Mote Carlo methods and see if we can beat the random policy. 

--- 

## Monte Carlo Control
We'll now focus on some Monte Carlo based algorithms. Monte Carlo Control is a family of RL algorithms that learns to find an optimal policy by repeatedly playing episodes of a given environment and estimating the expected return of each state-action pair based on the observed returns from those pairs in the played episodes, i.e. using a Monte Carlo approach to estimate the expected returns.

Below I have provided a class that implements an algorithm based on the first visit Monte Carlo estimates of the returns. First you should read through the code to see if you can understand it for yourself, and then I will add details of each method below. 

In [5]:
from gymnasium.spaces import Tuple
from itertools import product


class MonteCarloControl:
    def __init__(self, env, num_episodes=10000, gamma=0.99, epsilon=0.3):
        self.env = env
        self.num_episodes = num_episodes
        self.gamma = gamma
        self.num_actions = env.action_space.n
        self.Q = {}
        self.N = {}
        if type(env.observation_space) is Tuple:
            self.num_states = np.prod([a.n for a in env.observation_space])
            self.possible_states = [range(a.n) for a in env.observation_space]
            for state in product(*self.possible_states):
                self.Q[state] = np.random.uniform(-0.1, 0.1, self.num_actions)
                self.N[state] = np.zeros(self.num_actions)
                self.returns = {(s, a): [] for s in product(*self.possible_states) for a in range(self.num_actions)}
        else:
            self.possible_states = range(env.observation_space.n)
            self.num_states = env.observation_space.n
            for state in self.possible_states:
                self.Q[state] = np.random.uniform(-0.1, 0.1, self.num_actions)
                self.N[state] = np.zeros(self.num_actions)
                self.returns = {(s, a): [] for s in self.possible_states for a in range(self.num_actions)}
        self.policy = self.create_random_policy()
        self.epsilon = epsilon

    def create_random_policy(self):
        policy = {}
        for s in self.Q:
            policy[s] = np.random.dirichlet(np.ones(self.num_actions), size=1).flatten()
        return policy

    def get_action(self, state):
        probs = self.policy[state]
        action = np.random.choice(np.arange(len(probs)), p=probs)
        return action

    def generate_episode(self):
        episode = []
        state, _ = self.env.reset()
        done = False
        while not done:
            action = self.get_action(state)
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
            episode.append((state, action, reward))
            state = next_state
        return episode

    def update_Q(self, episode):
        G = 0
        for t in range(len(episode) - 1, -1, -1):
            state, action, reward = episode[t]
            G = self.gamma * G + reward
            if (state, action) not in [(x[0], x[1]) for x in episode[:t]]:
                self.N[state][action] += 1
                self.Q[state][action] += 1 / self.N[state][action] * (G - self.Q[state][action])
                self.policy[state] = np.zeros(self.num_actions)
                best_action = np.argmax(self.Q[state])
                self.policy[state] = np.array([self.epsilon / self.num_actions] * self.num_actions)
                self.policy[state][best_action] = 1 - self.epsilon + self.epsilon / self.num_actions

    def train(self):
        for episode in range(self.num_episodes):
            if (episode + 1) % 10000 == 0:
                print(f"Starting episode {episode + 1}")  # comment out if you don't want to have the progress printed
            episode = self.generate_episode()
            self.update_Q(episode)

    def generate_test_episode(self):
        state, _ = self.env.reset()
        done = False
        score = 0
        while not done:
            action = self.get_action(state)
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
            state = next_state
            score += reward
        return score


In the `init` we pass in the environment, how many episodes we'd like to train for, the discount factor ($\gamma$) and our $\epsilon$ parameter for the soft policy. Recall that a soft policy is one which assigns probability $\frac{\epsilon}{|\mathcal{A}|}$ to all non-greedy actions, and the remaining $1 - \epsilon + \frac{\epsilon}{|\mathcal{A}|}$ probability to the greedy value. Using the environment, we loop through all possible states and initialise our Q-table, `self.Q` and our visitation count `self.N`. We then also initialise the policy randomly using the `create_random_policy` method. 

Next, we have two straight forward methods: `get_action` and `generate_episode`. The former simply returns an action by sampling from the policy, whilst the latter runs through an episode similar to our above example, except here we append the `(state, action, reward)` tuple that we will use to update our Q-table. 

As the name suggests, the `update_Q` method is where we will use the episode information to update our Q-table. We receive an `episode` object, which is just a list of the tuples from the most recent episode. Using the list, we update Q using the first-visit Monte Carlo method, which will store the returns we saw from the *first* time we visited a state-action pair in the episode. 

To be more efficient, we do this dynamically using the following update: `NewEstimate = OldEstimate + 1/N * (NewValue - OldEstimate)`, where `NewValue` is the new return value we observe and `OldEstimate` is our previous estimate of the returns. Once we have update our Q-table, we then update our policy to be $\epsilon$-soft with respect to the new Q-Values. 

And there we have our first RL algorithm! Let's see if it can beat the random policy that we tested with earlier. 

In [6]:
np.random.seed(3)
env = gym.make('Blackjack-v1')
mc = MonteCarloControl(env, num_episodes=100000, epsilon=0.1)  # try playing around with epsilon if you like, but be careful not to go too high on the number of episodes else it will take a while to run
mc.train()
mc_test_scores = []
for ep in range(10000):
    mc_test_scores.append(mc.generate_test_episode())
print(f"Average test score was {np.mean(mc_test_scores)}")

Starting episode 10000
Starting episode 20000
Starting episode 30000
Starting episode 40000
Starting episode 50000
Starting episode 60000
Starting episode 70000
Starting episode 80000
Starting episode 90000
Starting episode 100000
Average test score was -0.0847


We can see that the average test score from 100,000 training episodes is about -0.1. This is an improvement over the random policy, but sadly you will still lose your money if you take this policy to the casino 😊

If you want to, you can change the environment to another of the environments from the Toy Text section of Gym and see how well the Monte Carlo algorithm fares (be careful not to choose any algorithms that truncate an episode!). 

---
## Temporal Difference Learning
We will now take a look at Temporal Difference (TD) Learning. One of the downsides of Monte Carlo learning is that we have to wait *until the end of an episode* before we receive any feedback about our actions. What if we don't want to wait this long? What if we are in a continuing task where there is no end to an episode? That is where TD Learning comes in, as rather than wait until the end of an episode to calculate the Monte Carlo returns, we simply bootstrap the remaining returns from our current estimate. The simplest example is the SARSA algorithm which makes the following update: $Q(s, a) \leftarrow Q(s, a) + \alpha \left[ r(s, a) + \gamma Q(s', a') - Q(s, a) \right]$, where $a'$ is an action which we sample from our current policy $\pi$, i.e. $a' \sim \pi(\cdot|s')$. We can see that our returns are calculated by bootstrapping the value from the next state $s'$, rather than waiting for the end of the episode.

Let's write some code for this and see how it compares to the Monte-Carlo method. As before, I will provide the code for you to look through to understand for yourself and then you can read my comments below on which each method is doing. 

In [7]:
class SARSA:
    def __init__(self, env, alpha, gamma, epsilon, num_episodes=100000):
        self.env = env
        self.alpha = alpha  # learning rate
        self.gamma = gamma  # discount factor
        self.epsilon = epsilon  # exploration rate
        self.num_episodes = num_episodes
        self.num_actions = env.action_space.n
        self.Q = {}
        if type(env.observation_space) is Tuple:
            self.num_states = np.prod([a.n for a in env.observation_space])
            self.possible_states = [range(a.n) for a in env.observation_space]
            for state in product(*self.possible_states):
                self.Q[state] = np.random.uniform(-0.1, 0.1, self.num_actions)
        else:
            self.possible_states = range(env.observation_space.n)
            self.num_states = env.observation_space.n
            for state in self.possible_states:
                self.Q[state] = np.random.uniform(-0.1, 0.1, self.num_actions)

    def choose_action(self, state):
        if np.random.uniform() < self.epsilon:
            action = self.env.action_space.sample()  # explore
        else:
            action = np.argmax(self.Q[state])  # exploit
        return action

    def update_Q(self, state, action, reward, next_state, next_action, terminated):
        td_target = reward + self.gamma * self.Q[next_state][next_action] * (1 - terminated)
        td_error = td_target - self.Q[state][action]
        self.Q[state][action] += self.alpha * td_error

    def train(self):
        for episode in range(self.num_episodes):
            if (episode + 1) % 10000 == 0:
                print(f"Starting episode {episode + 1}")
            state, _ = self.env.reset()
            action = self.choose_action(state)
            done = False
            while not done:
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                next_action = self.choose_action(next_state)
                self.update_Q(state, action, reward, next_state, next_action, terminated)
                state = next_state
                action = next_action

    def generate_test_episode(self):
        state, _ = self.env.reset()
        done = False
        score = 0
        while not done:
            action = self.get_greedy_action(state)
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
            state = next_state
            score += reward
        return score

    def get_greedy_action(self, state):
        return np.argmax(self.Q[state])

The code follows the general class structure as the Monte Carlo method (in general, it is good practice to find a layout for your classes that you like and stick to it, so that any algorithms you write and publish are more readable). The only real changes are in the `train` method and the `update_Q` method. In the latter, we have just changed our update rule to be that of the SARSA algorithm. In the former, we have changed slightly the order of things -- now we take an action with the initial state, and pass in the next state to the `update_Q` method. Other than that, things are pretty much as they were!

Let's see how SARSA compared to the Monte-Carlo algorithm in Blackjack.

In [8]:
np.random.seed(3)
env = gym.make('Blackjack-v1')
sarsa = SARSA(env, 0.8, 0.99, 0.05, 100000)  # try playing around with the hyper-parameters to see if you can improve the score
sarsa.train()
sarsa_test_scores = []
for ep in range(10000):
    sarsa_test_scores.append(sarsa.generate_test_episode())
print(f"SARSA test score was {np.mean(sarsa_test_scores)}")


Starting episode 10000
Starting episode 20000
Starting episode 30000
Starting episode 40000
Starting episode 50000
Starting episode 60000
Starting episode 70000
Starting episode 80000
Starting episode 90000
Starting episode 100000
SARSA test score was -0.0584


On the seed we tested on, SARSA gets a similar score to the Monte-Carlo algorithm. **Would you have expected this in an environment like Blackjack? Why or why not?**

### Q-Learning
We are now going to test our first off-policy algorithm, Q-Learning. This algorithm was seminal to making *deep* RL successful, and we will look at the deep variant in our next lab, but for now we will stick with the tabular version. The update rule is similar to SARSA: $Q(s, a) \leftarrow Q(s, a) + \alpha \left[ r(s, a) + \gamma \max_{a'}Q(s', a') - Q(s, a) \right]$. We still choose actions according to some exploratory policy, but the value we bootstrap from comes from the greedy policy. **Why is Q-Learning considered an off-policy algorithm?**

To write code for Q-Learning, we need to make minimal modifications to the SARSA algorithm. **Below I've attached a child class of the SARSA algorithm and I'll leave it as an exercise to make the changes.** Once you have made the changes, run the Q-Learning algorithm and see how it performs compared to Monte-Carlo and SARSA. 

In [None]:
class QLearning(SARSA):
    def __init__(self, **kwargs):
        super(QLearning, self).__init__(**kwargs)

    def update_Q(self, state, action, reward, next_state, terminated):
        pass

    def train(self):
        pass

In [None]:
env = gym.make('Blackjack-v1')
q_learning = QLearning(env=env, alpha=0.8, gamma=0.99, epsilon=0.05, num_episodes=100000)  # again, try and play with the hyper-parameters
q_learning.train()
q_learning_test_scores = []
for ep in range(10000):
    q_learning_test_scores.append(q_learning.generate_test_episode())
print(f"Q-Learning test score was {np.mean(q_learning_test_scores)}")