<a href="https://colab.research.google.com/github/asuzukosi/ReinforcementLearningForAGI/blob/main/src/DQN/DeepReinforcmentLearning_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Deep Q-Networks
In this notebook, we will explore the implementation of the base vanilla DQN algorithm to the rainbow DQN. We will explore the use of these algorithms across various gym environments. We will use 3 environments for each model.

#### Install required libraries
We will install the required libraries for environments and models we need

In [1]:
import sys
IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    !apt-get install -y python3-opengl
    !apt install ffmpeg
    !apt install xvfb
    !pip install PyVirtualDisplay==3.0
    !pip install gymnasium==0.28.1
    from pyvirtualdisplay import Display

    # Start virtual display
    dis = Display(visible=0, size=(400, 400))
    dis.start()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  freeglut3 libglu1-mesa
Suggested packages:
  libgle3 python3-numpy
The following NEW packages will be installed:
  freeglut3 libglu1-mesa python3-opengl
0 upgraded, 3 newly installed, 0 to remove and 18 not upgraded.
Need to get 824 kB of archives.
After this operation, 8,092 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 freeglut3 amd64 2.8.1-6 [74.0 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 libglu1-mesa amd64 9.0.2-1 [145 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy/universe amd64 python3-opengl all 3.1.5+dfsg-1 [605 kB]
Fetched 824 kB in 1s (1,236 kB/s)
Selecting previously unselected package freeglut3:amd64.
(Reading database ... 120874 files and directories currently installed.)
Preparing to unpack .../freeglut3_2.8.1-6_amd64.deb ...
Unpacking freeglut3:am

In [5]:
!pip install torch
!pip install pyglet
!pip install gymnasium[classic-control]
!pip install PyVirtualDisplay
!pip install moviepy




#### Import required libraries
In this section we will import all the required libraries to implement our algorithms

In [6]:
import torch
import os
from typing import Dict, List, Tuple
import gymnasium as gym
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output


#### Replay Buffer
Replay buffer is used to store the previous experiences in order to stabilize training of the DQN.
The replay buffer is implemented in one of the following
- numpy
- dequeue
- list

In our implementation of the replay buffer we will be using numpy arrays as they benefit for locality of reference, so this well speed up the retreival time of items

In [7]:
class ReplayBuffer:
  def __init__(self, obs_dim:int, size:int, batch_size:int=32):
    # we create empty lists to store the information in the buffers
    self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
    self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
    self.acts_buf = np.zeros([size], dtype=np.float32)
    self.rews_buf = np.zeros([size], dtype=np.float32)
    self.done_buf = np.zeros(size, dtype=np.float32)
    # set the batch size and the maximum size of the replay buffer
    self.max_size, self.batch_size = size, batch_size
    # set the pointer value and the size value
    self.ptr, self.size, = 0, 0

  def store(self, obs, action, reward, next_obs, done):
    self.obs_buf[self.ptr] = obs
    self.next_obs_buf[self.ptr] = next_obs
    self.acts_buf[self.ptr] = action
    self.rews_buf[self.ptr] = reward
    self.done_buf[self.ptr] = done
    # set the pointer value to the modulus of the max size and the current pointer +1
    self.ptr = (self.ptr + 1) % self.max_size
    # if the size exceeds the max size, set the value to max size
    self.size = min(self.size + 1, self.max_size)


  def sample(self):
    idxs = np.random.choice(self.size, size=self.batch_size, replace=False)
    return dict(obs=self.obs_buf[idxs],
                acts=self.acts_buf[idxs],
                rews=self.rews_buf[idxs],
                next_obs=self.next_obs_buf[idxs],
                done=self.done_buf[idxs])
  def __len__(self):
    return self.size

#### Network implementation
In this section we will focus on implementing the neural network for our DQN algorithm

In [8]:
class Network(nn.Module):
  def __init__(self, inp_dim:int, out_dim:int, hidden_dim=128):
    # initiation
    super().__init__()
    self.layers = nn.Sequential(
        nn.Linear(inp_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, out_dim),
        # nn.Softmax()
    )

  def forward(self, x):
    # equation
    return self.layers(x)

In [9]:
class DQNAgent:
  """
  This class handles the training and inference of
  the DQN agent class
  """
  def __init__(self, env, memory_size:int, batch_size,
               target_update, epsilon_decay, seed,
               max_epsilon:float=1.0, min_epsilon:float=0.1,
               gamma:float=0.99):
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.n

    self.env = env
    self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)
    self.batch_size = batch_size
    self.epsilon = max_epsilon
    self.epsilon_decay = epsilon_decay
    self.seed = seed
    self.max_epsilon = max_epsilon
    self.min_epsilon = min_epsilon
    self.target_update = target_update
    self.gamma = gamma # discout factor

    self.device = "cuda" if torch.cuda.is_available() else "cpu"
    self.dqn = Network(obs_dim, act_dim).to(self.device)
    self.dqn_target = Network(obs_dim, act_dim).to(self.device)
    self.dqn_target.load_state_dict(self.dqn.state_dict())
    self.dqn_target.eval()

    self.optimizer = optim.Adam(self.dqn.parameters())
    self.transitions = list()
    self.is_test = False


  def select_action(self, obs):
    # select an action given an observation
    if self.epsilon > np.random.random():
      selected_action = self.env.action_space.sample()
    else:
      selected_action = self.dqn(torch.FloatTensor(obs).to(self.device)).argmax()
      selected_action = selected_action.detach().cpu().numpy()
    if not self.is_test:
      self.transition = [obs, selected_action]

    return selected_action

  def step(self, action):
    """Take an action and return the response of the env."""
    next_state, reward, terminated, truncated, _ = self.env.step(action)
    done = terminated or truncated

    if not self.is_test:
      self.transition += [reward, next_state, done]
      self.memory.store(*self.transition)

    return next_state, reward, done


  def update_model(self):
    samples = self.memory.sample()
    loss = self._compute_dqn_loss(samples)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    return loss.item()


  def _compute_dqn_loss(self, samples):
    # return computed dqn loss
    device = self.device  # for shortening the following lines
    state = torch.FloatTensor(samples["obs"]).to(device)
    next_state = torch.FloatTensor(samples["next_obs"]).to(device)
    action = torch.LongTensor(samples["acts"].reshape(-1, 1)).to(device)
    reward = torch.FloatTensor(samples["rews"].reshape(-1, 1)).to(device)
    done = torch.FloatTensor(samples["done"].reshape(-1, 1)).to(device)

    curr_q_value = self.dqn(state).gather(1, action)
    print(curr_q_value.shape)
    next_q_value = self.dqn_target(
            next_state
        ).max(dim=1, keepdim=True)[0].detach()
    print(next_q_value.shape)
    mask = 1 - done
    target = (reward + self.gamma * next_q_value * mask).to(self.device)
    # calculate dqn loss
    loss = F.smooth_l1_loss(curr_q_value, target)
    return loss

  def train(self, num_frames, plotting_interval:int=200):
    """
    Train the RL agent
    """
    self.is_test = False
    state, _ = self.env.reset(seed=self.seed)
    update_cnt = 0
    epsilons = []
    losses = []
    scores = []
    score = 0

    for frame_idx in range(0, num_frames+1):
      action = self.select_action(state)
      next_state, reward, done = self.step(action)

      state = next_state
      score += reward

      # if episode ends
      if done:
          state, _ = self.env.reset(seed=self.seed)
          scores.append(score)
          score = 0

      # if training is ready
      if len(self.memory) >= self.batch_size:
          loss = self.update_model()
          losses.append(loss)
          update_cnt += 1

          # linearly decrease epsilon
          self.epsilon = max(
              self.min_epsilon, self.epsilon - (
                  self.max_epsilon - self.min_epsilon
              ) * self.epsilon_decay
          )
          epsilons.append(self.epsilon)

          # if hard update is needed
          if update_cnt % self.target_update == 0:
              self._target_hard_update()

      # plotting
      if frame_idx % plotting_interval == 0:
          self._plot(frame_idx, scores, losses, epsilons)

    self.env.close()


  def test(self, video_folder:str):
    """Test the agent."""
    self.is_test = True

    # for recording a video
    naive_env = self.env
    # create new environment for recording video
    self.env = gym.wrappers.RecordVideo(self.env, video_folder=video_folder)

    state, _ = self.env.reset(seed=self.seed)
    done = False
    score = 0

    # we run only a single episode of the environment
    while not done:
      action = self.select_action(state)
      next_state, reward, done = self.step(action)

      state = next_state
      score += reward

    print("score: ", score)
    self.env.close()

    # reset
    self.env = naive_env

  def _target_hard_update(self):
        """Hard update: target <- local."""
        self.dqn_target.load_state_dict(self.dqn.state_dict())

  def _plot(
      self,
      frame_idx: int,
      scores: List[float],
      losses: List[float],
      epsilons: List[float],
  ):
      """Plot the training progresses."""
      clear_output(True)
      plt.figure(figsize=(20, 5))
      plt.subplot(131)
      plt.title('frame %s. score: %s' % (frame_idx, np.mean(scores[-10:])))
      plt.plot(scores)
      plt.subplot(132)
      plt.title('loss')
      plt.plot(losses)
      plt.subplot(133)
      plt.title('epsilons')
      plt.plot(epsilons)
      plt.show()



#### Environment
Here we setup the environment for the experiment, we will be using the 2d cartpole environment

In [None]:
# environment
env = gym.make("CartPole-v1", max_episode_steps=200, render_mode="rgb_array")

In [None]:
# set the random seed to control variabilty of the experiment
seed = 777

def seed_torch(seed):
    torch.manual_seed(seed)
    if torch.backends.cudnn.enabled:
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.benchmark = False
        torch.backends.cudnn.deterministic = True

np.random.seed(seed)
seed_torch(seed)

In [None]:
# parameters
num_frames = 10000
memory_size = 1000
batch_size = 32
target_update = 100
epsilon_decay = 1 / 2000
video_folder="videos/dqn"

agent = DQNAgent(env, memory_size, batch_size, target_update, epsilon_decay, seed)

#### Train and Test DQN model
In this section we will train and test the DQN model iteratively on the cartpole environment

In [None]:
agent.train(num_frames=num_frames)

In [None]:
agent.test(video_folder)

In [None]:
agent.train(num_frames=num_frames)

In [None]:
agent.test(video_folder)

#### Render Video
In this section we will render the video that was generated during the testing of the agent model

In [None]:
import base64
import glob
import io
import os

from IPython.display import HTML, display


def ipython_show_video(path: str) -> None:
    """Show a video at `path` within IPython Notebook."""
    if not os.path.isfile(path):
        raise NameError("Cannot access: {}".format(path))

    video = io.open(path, "r+b").read()
    encoded = base64.b64encode(video)

    display(HTML(
        data="""
        <video width="320" height="240" alt="test" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4"/>
        </video>
        """.format(encoded.decode("ascii"))
    ))


def show_latest_video(video_folder: str) -> str:
    """Show the most recently recorded video from video folder."""
    list_of_files = glob.glob(os.path.join(video_folder, "*.mp4"))
    for file in list_of_files
      ipython_show_video(file)
    return list_of_files


list_of_files = show_latest_video(video_folder=video_folder)
print("Played:", list_of_files)