In [1]:
!pip install gymnasium -U
!pip install gymnasium[atari] -U
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hInstalling collected packages: gymnasium
  Attempting uninstall: gymnasium
    Found existing installation: gymnasium 0.29.0
    Uninstalling gymnasium-0.29.0:
      Successfully uninstalled gymnasium-0.29.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kaggle-environments 1.16.11 requires gymnasium==0.29.0, but you have gymnasium 1.0.0 which is incompatible.
stable-baselines3 2.1.0 requires gymnasium<0.30,>=0.28.1, but you have gymnasium 1.0.0 which is incompatible.[0m[31m
[0mSuccessfully installed gymnasium-1.0.0
Collecting ale-py>=0.9 (from gymnasium[atari])
  Downloading ale_p

# Deep Convolutional Q-Learning for Ms Pac-Man

In [2]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
from torch.utils.data import DataLoader, TensorDataset

In [3]:
torch.cuda.device(torch.device)

<torch.cuda.device at 0x786e5247bc40>

In [4]:
torch.cuda.current_device()

0

## Building the AI

### Creating the architecture of the Neural Network

In [5]:
class Network(nn.Module):

    def __init__(self, action_size, seed = 47):                  # constructor to initialize the network class
        super(Network, self).__init__()                          # call the parent class constructor
        self.seed = torch.manual_seed(seed)                      # set the seed for the network
        self.conv1 = nn.Conv2d(3, 32, kernel_size=8, stride=4)   # first convolutional layer with 3 input channels, 32 feature channels, kernel size of 8 adn stride of 4
        self.bn1 = nn.BatchNorm2d(32)                            # first batch normalization layer with 32 input channels
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)  # second convolutional layer with 32 input channels, 64 feature channels, kernel size of 4 and stride of 2
        self.bn2 = nn.BatchNorm2d(64)                            # second batch normalization layer with 64 input channels
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)  # third convolutional layer with 64 input channels, 64 feature channels, kernel size of 3 and stride of 1
        self.bn3 = nn.BatchNorm2d(64)                            # third batch normalization layer with 64 input channels
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1) # fourth convolutional layer with 64 input channels, 128 feature channels, kernel size of 3 and stride of 1
        self.bn4 = nn.BatchNorm2d(128)                           # fourth batch normalization layer with 128 input channels

        self.fc1 = nn.Linear(128 * 10 * 10, 512)                 # first fully connected layer with 128 * 10 * 10 input features and 512 hidden layer features
        self.fc2 = nn.Linear(512, 256)                           # second fully connected layer with 512 hidden layer features and 256 hidden layer features
        self.fc3 = nn.Linear(256, action_size)                   # third fully connected layer with 256 hidden layer features and action_size output features

    def forward(self, state):                               # forward pass of the network
        x = F.relu(self.bn1(self.conv1(state)))             # first convolutional layer with batch normalization and relu activation
        x = F.relu(self.bn2(self.conv2(x)))                 # second convolutional layer with batch normalization and relu activation
        x = F.relu(self.bn3(self.conv3(x)))                 # third convolutional layer with batch normalization and relu activation
        x = F.relu(self.bn4(self.conv4(x)))                 # fourth convolutional layer with batch normalization and relu activation
        x = x.view(x.size(0), -1)                           # flatten the input tensor while keeping the first tensor dimension same
        x = F.relu(self.fc1(x))                             # first fully connected layer with relu activation
        x = F.relu(self.fc2(x))                             # second fully connected layer with relu activation
        return self.fc3(x)                                  # third fully connected layer without activation

## Training the AI

### Setting up the environment

In [6]:
import gymnasium as gym                                                       # import the gymnasium library
import ale_py                                                                 # import the ale python library which contains the atari environments

gym.register_envs(ale_py)                                                     # as of gymnasium v1.0.0 we need to register the envs before we enable them

env = gym.make('MsPacmanDeterministic-v0', full_action_space = False)         # set up the Ms Pacman game environment
state_shape = env.observation_space.shape                                     # define the shape of the input i.e the observation space (rows x cols)
state_size = env.observation_space.shape[0]                                   # gather the size of the input i.e the number of rows in our input
number_actions = env.action_space.n                                           # number of actions i.e the total actions the agent can take (the output size)

print('State shape {}, State size {}, Number of actions {}'.format(state_shape, state_size, number_actions))

State shape (210, 160, 3), State size 210, Number of actions 9


  logger.deprecation(


### Initializing the hyperparemeters

In [7]:
learning_rate = 5e-4
minibatch_size = 100
gamma = 0.99                    # Is the discount factor: if the value is closer to 0, the agent will look at the shrot term reward and might not train for the entire case: If it is closer to 1, then agent trains with the long term factor in mind

### Preprocessing the frames

In [8]:
from PIL import Image
from torchvision import transforms

def preprocess_frame(frame):
  frame = Image.fromarray(frame)                                                            # converting a numpy array values of the image into a PIL image object
  preprocess = transforms.Compose([transforms.Resize((128,128)), transforms.ToTensor()])    # creating a object to preprocess our PIL objects using the transforms modeule from torchvision by 1st resizing our frames to match the neural network layers and then converting the object into tensors
  return preprocess(frame).unsqueeze(0)                                                     # return the image as torch tensors by preprocessing them using the preprocess object we created in the previous line

### Implementing the DCQN class

In [9]:
class Agent():

  def __init__(self, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")        # runs on gpu if torch detects a gpu else runs on cpu; Gpu trains faster
    self.action_size = action_size                                                      # declare the state_size and the action_size
    self.local_qnetwork = Network(action_size).to(self.device)                          # creating a local q learning network using the neural_network class we created earlier and sending it to the device
    self.target_qnetwork = Network(action_size).to(self.device)                         # creating a target q learning network using the neural_network class we created earlier and sending it to the device
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate)   # creating the Adam optimizer for our q learning network which takes the parameters of the local q network
    self.memory = deque(maxlen = 10000)                                                 # initializing the memory using the ReplayMemory class created

  def step(self, state, action, reward, next_state, done):                              # a method to store the step by step experiences of the agent as it interacts with the environment and update the agent behaviour with the intention to replay the experience from memory
    state = preprocess_frame(state)
    next_state = preprocess_frame(next_state)
    self.memory.append((state, action, reward, next_state, done))
    if len(self.memory) > minibatch_size:                                               # a checkpoint is created to see if the agent has had a certain number of experiences which is denoted by the minibatch size
      experiences = random.sample(self.memory, k=minibatch_size)                        # upon reaching the minibatch size of experience we take a random sample size of the experiences of the agent
      self.learn(experiences, gamma)                                                    # the random sample of the experiences are them sent to the learn method to be learnt from using our deep Q learning algorithm

  def act(self, state, epsilon = 0.):                                                   # a method created to make the agent explore the environment based on the current state and then take an action using the epsilon greedy method
    state = preprocess_frame(state).to(self.device)                                     # the current state is inputted using the preprocess frames function we created and the tensors are feed to the device based on availability
    self.local_qnetwork.eval()                                                          # an evaluation of the agent in the local network is calculated using the eval method og the nn modile from torch
    with torch.no_grad():
      action_values = self.local_qnetwork(state)                                        # using no_grad to ensure no gradients are calculated for faster inference and obtaining the q values using the local network to evaluate our agent before taking an action
    self.local_qnetwork.train()                                                         # a training of the model is done in the local q network to update the q values before the next step
    if random.random() > epsilon:                                                       # implementing the epsilon decay before the agent acts from the next state
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  def learn(self, experiences, gamma):                                                  # a method to implement the learning method i.e the training of our agent in the environment
    states, actions, rewards, next_states, dones = zip(*experiences)

    states = torch.from_numpy(np.vstack(states)).float().to(self.device)                # np.vstack --> to vertically stack the elements .to(self.device) --> To move the stack of states into the GPU or CPU to train the tensors
    actions = torch.from_numpy(np.vstack(actions)).long().to(self.device)
    rewards = torch.from_numpy(np.vstack(rewards)).float().to(self.device)
    next_states = torch.from_numpy(np.vstack(next_states)).float().to(self.device)
    dones = torch.from_numpy(np.vstack(dones).astype(np.uint8)).float().to(self.device)

    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
    q_targets = rewards + (gamma * next_q_targets * (1 - dones))
    q_expected = self.local_qnetwork(states).gather(1, actions)
    loss = F.mse_loss(q_expected, q_targets)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

### Initializing the DCQN Agent

In [10]:
agent = Agent(number_actions)

### Training the DCQN Agent

In [11]:
agent.device

device(type='cuda', index=0)

In [12]:
number_episodes = 2000
max_num_timesteps_per_episode = 10000
epsilon_start_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_start_value
scores_on_100_episodes = deque(maxlen = 100)

In [13]:
for episode in range(1, number_episodes+1):
  state, _ = env.reset()
  score = 0
  for t in range(max_num_timesteps_per_episode):
    action = agent.act(state, epsilon)
    next_state, reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    if done:
      break
  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
  print('\rEpisode {}\t Average Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "")
  if episode % 100 ==0:
    print('\rEpisode {}\t Average Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
  if np.mean(scores_on_100_episodes) >= 500.0:
    print('\nEnvironment solved in {:d} episode!\t Average Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break

Episode 100	 Average Score: 294.30
Episode 200	 Average Score: 400.20
Episode 300	 Average Score: 452.30
Episode 400	 Average Score: 484.00
Episode 407	 Average Score: 500.10
Environment solved in 407 episode!	 Average Score: 500.10


## Visualizing the result

In [14]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
  env = gym.make(env_name, render_mode = 'rgb_array')
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action.item())
  env.close()
  imageio.mimsave('mspacman.mp4', frames, fps=120)

show_video_of_model(agent, 'MsPacmanDeterministic-v4')

def show_video():
  mp4list = glob.glob('*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    display(HTML(data='''<video alt="test" autoplay loop controls style="height: 400px;"><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>'''.format(encoded.decode('ascii'))))
  else:
    print("could not find video")

show_video()