# Rainbow DQN Implementation

## Introduction

Hello, here is my implementation of the algorithm  [Rainbow DQN](https://arxiv.org/pdf/1710.02298), algorithm published by Deepmind in October 2017.

__Note__: I didn't implement some features (like Noisy Nets) because I didn't need it for the environments I used.

## Implementation

Now let's go to the implementation !

In [1]:
# Run me :D
%pip install numpy torch torchvision gymnasium tqdm einops
%pip install gymnasium[box2d]
%pip install box2d box2d-kengz
%pip install pygame
%pip install --upgrade gymnasium pygame


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
# Again... Don't think, RUN !
import numpy as np
import einops
import math

import torch
import torch.nn as nn
import torch.nn.functional as F

import gymnasium as gym
from tqdm import tqdm

from ReplayBuffer import ReplayBuffer, ALPHA
import random

import matplotlib.pyplot as plt
from IPython.display import clear_output

These are some constants used during the training steps.

In [3]:
# Constants:
EPISODES = 500
FRAME = 1000

GAMMA = 0.99
LR=5e-4

MEMORY_CAPACITY = 100

# Epsilon parameters
EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 0.01

UPDATE_FREQUENCY = 1000

# Device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

Now we have the Neural Network needed that we implement following the dueling architecture.

__NOTE__: Q(s, a) = V(s) + A(s, a)

In [4]:
class RainbowNetwork(nn.Module):

    def __init__(self, env: gym.Env) -> None:
        super().__init__()

        self.epsilon = 1.0
        # State Value Stream
        self.state_nn = nn.Sequential(
            nn.Linear(env.observation_space.shape[0], 128),
            nn.ReLU(),
            nn.Linear(128, 1),
        )
        # Advantage Stream
        self.advantage_nn = nn.Sequential(
            nn.Linear(env.observation_space.shape[0], 128),
            nn.ReLU(),
            nn.Linear(128, env.action_space.n),
        )

    def forward(self, x) -> torch.Tensor:
        # State Value prediction
        state_value = self.state_nn(x)

        # Advantage prediction
        advantage = self.advantage_nn(x)
        # Getting the mean of advantage (we could use the max also)
        average_advantage = advantage.mean()
        # Substracting the mean to the advantage to not have too much relative advantages
        return state_value + (advantage - average_advantage)


Utils functions for training process:
*   For the Gradient Descent
*   The copy of the online network param into the target's one.

In [5]:
def gradient_descent(loss: torch.Tensor, optimizer: torch.optim.Optimizer) -> None:
    # Reset gradient
    optimizer.zero_grad()
    # Backpropagate
    loss.backward()
    # Apply gradient on the Neural Network
    optimizer.step()

def copy_nn_parameters(target: nn.Module, online: nn.Module) -> None:
    target.load_state_dict(online.state_dict())

We will create the Rainbow Agent class, agents that will make the actions.

We also initialize the `ReplayBuffer` useful for the training process with a `fill_buffer` method
that will fill the `ReplayBuffer` with a certain amount of episodes based on the `MEMORY_CAPACITY` constant

In [6]:
class RainbowAgent:

    def __init__(self, N) -> None:
        self.buffer = ReplayBuffer(N)

    def fill_buffer(
        self,
        env: gym.Env,
    ) -> None:
        state, _ = env.reset()
        for _ in tqdm(range(MEMORY_CAPACITY)):
            action = env.action_space.sample()

            new_state, reward, done, _, _ = env.step(action)

            experience = (state, action, reward, done, new_state)
            self.buffer.init_transition(experience=experience, priority=10e-5, proba=(1 / MEMORY_CAPACITY), weight=1.0)
            state = new_state

            if done:
                state, _ = env.reset()

    @staticmethod
    def get_action(
        state: np.ndarray,
        env: gym.Env,
        epsilon: float,
        network: nn.Module
    ) -> int:
        """
            Given a `state`, an `env`, an `epsilon` value and a `network` compute the following action,
            with respect on the current policy.
        """

        states = torch.as_tensor(state, dtype=torch.float32)
        q_values: torch.Tensor = network.forward(states)
        greedy_action: int = q_values.argmax().item()
        action = RainbowAgent.epsilon_greedy_policy(
            env,
            epsilon,
            greedy_action
        )
        return action

    @staticmethod
    def epsilon_greedy_policy(
        env: gym.Env,
        epsilon: float,
        greedy_action: int
    ) -> int:
        """
            Implementation of the Epsilon greedy policy.
        """

        bool_action = random.random() > epsilon
        if bool_action:
            action = greedy_action
        else:
            action = env.action_space.sample()
        return action

Initialization of all the things we need. (Network, agent, environment, ...)

In [7]:
# Load environment:
env = gym.make("LunarLander-v2", render_mode="rgb_array")

# Load classes needed:
Rainbow = RainbowAgent(N=MEMORY_CAPACITY)
online_network = RainbowNetwork(env).to(device=device)
target_network = RainbowNetwork(env).to(device=device)

# Get the optimizer and the loss_fn:
optimizer = torch.optim.RMSprop(online_network.parameters(), lr=LR)
loss_fn = torch.nn.SmoothL1Loss(reduction='none')

In this cell we are training our Rainbow DQN with the environment initialized before.

In [8]:
def train_rainbow(
    env: gym.Env,
    rainbow: RainbowAgent,
    online_network: RainbowNetwork,
    target_network: RainbowNetwork,
    loss_fn: torch.nn.Module,
    optimizer: torch.optim.Optimizer
) -> None:
    """
        This function is used to train the Rainbow DQN model with the 2 networks, the agent and the `ReplayBuffer`.

        NOTE: The env has to be opened before and be closed after the return of this function.
    """
    epsilon = EPS_START
    steps = 0

    for episode in tqdm(range(1, EPISODES + 1)):
        rewards = []
        state, _ = env.reset()

        ### Here we have some epsilon decay options ###
        # epsilon = max(0.1, epsilon - 1.0 / EPISODES * 2)
        epsilon = max(epsilon * np.exp(-EPS_DECAY), EPS_END)
        # BETA = 0.4 + (1.0 - 0.4) * (episode / EPISODES)

        for _ in range(1, FRAME + 1):
            ### Here we have some epsilon decay options ###
            # epsilon = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps / EPS_DECAY)
            # epsilon = max(0.1, epsilon * 0.999) # decay factor of epsilon

            # Taking an action
            action = rainbow.get_action(state, env, epsilon, online_network)

            # Observe env return values
            new_state, reward, termination, truncation, _ = env.step(action)
            rewards.append(reward)

            # Add the experience to the experience relay
            rainbow.buffer.store_transition((state, action, reward, termination, new_state))

            # Go to next state
            state = new_state
            # Sample an experience and get its state, action, reward and new_state
            state_t, action_t, reward_t, termination_t, new_state_t, weights, indexes = rainbow.buffer.retrieve_transitions()

            # Implement loss
            max_next_state = target_network.forward(new_state_t).max(dim=1, keepdim=True)[0]
            y = reward_t + (GAMMA * max_next_state * (1 - termination_t))

            current_q = online_network.forward(state_t).gather(dim=1, index=action_t)

            error = loss_fn(y, current_q)
            loss = torch.mean(error * weights)

            # Gradient Descent on loss with respect with the optimizer
            gradient_descent(loss, optimizer)

            # Increment step at each gradient descent
            steps += 1

            new_priorities = error ** ALPHA
            rainbow.buffer.update_priority(new_priorities=new_priorities, indexes=indexes)

            if steps % UPDATE_FREQUENCY == 0:
                copy_nn_parameters(target_network, online_network)

            # If it is the end pass to the next episode
            if termination or truncation:
                break

        if episode % 100 == 0:
            print(f"We are at episode {episode}\t\
                mean reward is {np.mean(rewards):.3f}\
                and sum reward is {sum(rewards):.3f}\t\
                epsilon is {epsilon:.2f}"
            )
            if sum(rewards) >= 200:
                break

In [None]:
Rainbow.fill_buffer(env)
train_rainbow(env, Rainbow, online_network, target_network, loss_fn, optimizer)
env.close()

Here are few testing functions for the Rainbow DQN. (Choose the one that work the most in your conda environment).

In [10]:
def test_rainbow_plt(
    env: gym.Env,
    rainbow: RainbowAgent,
    network: RainbowNetwork
):
    """
        * `env`: Gymnasium environment that MUST be in 'rgb_array' render_mode
        * `rainbow`: The Rainbow Agent that choose the action
        * `network`: The Rainbow Network that perform the prediction of future actions

        NOTE: Here we use plt to display, use this if you can't run in render_mode.
    """
    epsilon = 0.0
    for episode in range(1, 6):
        rewards = []
        state, _ = env.reset()
        for _ in range(1, 1000):
            # Taking an action
            action = rainbow.get_action(state, env, epsilon, network)
    
            # Observe env return values
            new_state, reward, termination, truncation, _ = env.step(action)
    
            # Go to the next state for next action
            state = new_state
    
            # Get all the rewards
            rewards.append(reward)
    
            # If it is the end pass to the next episode
            if termination or truncation:
                break

            clear_output(wait=True)
            plt.imshow(env.render())
            plt.show()
 
        # Print the mean recent reward every 50 episodes
        if episode % 5 == 0:
            print(f"Episode {episode:>6}: \tR:{np.mean(rewards):>6.3f}")

def test_rainbow(
    env: gym.Env,
    rainbow: RainbowAgent,
    network: RainbowNetwork
):
    """
        * `env`: Gymnasium environment that MUST be in 'human' render_mode
        * `rainbow`: The Rainbow Agent that choose the action
        * `network`: The Rainbow Network that perform the prediction of future actions
    """
    epsilon = 0.0
    for episode in range(1, 6):
        rewards = []
        state, _ = env.reset()
        for _ in range(1, 1000):
            # Taking an action
            action = rainbow.get_action(state, env, epsilon, network)
    
            # Observe env return values
            new_state, reward, termination, truncation, _ = env.step(action)
    
            # Go to the next state for next action
            state = new_state
    
            # Get all the rewards
            rewards.append(reward)
    
            # If it is the end pass to the next episode
            if termination or truncation:
                break
 
        # Print the mean recent reward every 50 episodes
        if episode % 5 == 0:
            print(f"Episode {episode:>6}: \tR:{np.mean(rewards):>6.3f}")


In [None]:
# env.close()
# Let's compute the real deal !

env = gym.make("LunarLander-v2", render_mode="rgb_array")
test_rainbow_plt(env, Rainbow, online_network)
env.close()