# Deep Convolutional Q-Learning for Pac-Man





## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/953.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m122.9/953.9 kB[0m [31m3.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m952.3/953.9 kB[0m [31m14.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting shimmy[atari]<1.0,>=0.1.0 (from gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl (25 kB)
Collecting autorom[accept-r

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch #Pytorch is the framework using which we build this model.
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
from torch.utils.data import DataLoader, TensorDataset

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):

#The fully connected layers act as the brain of the AI whereas the convolution layers act as the eyes of the AI.

  def __init__(self, action_size, seed = 42): #We dont take state_size as an parameter here as in this case the state is not a vector instead it is an image.
    super(Network, self).__init__()
    self.seed = torch.manual_seed(seed)
    self.conv1 = nn.Conv2d(3, 32, kernel_size = 8, stride = 4) #(3 input channels, 32 output channels). 'conv1' is the first convultion which acts as an eye of the AI and is implemented using the 'Conv2d' function of pytorch neural network module. The paramters that 'Conv2d' function takes resembles that of an image (rgb) thereby making 'conv1' act as an eye of the AI.
    self.bn1 = nn.BatchNorm2d(32) #The output from the previous layer which is 32 is normalized using 'BatchNorm2d()' function of the pytorch neural network module. Normalization prevents overfitting (it is a mathematical action).
    self.conv2 = nn.Conv2d(32, 64, kernel_size = 4, stride = 2)
    self.bn2 = nn.BatchNorm2d(64) #Output size from previous layer (64) is taken as input for normalization.
    self.conv3 = nn.Conv2d(64, 64, kernel_size = 3, stride = 1)
    self.bn3 = nn.BatchNorm2d(64)
    self.conv4 = nn.Conv2d(64, 128, kernel_size = 3, stride = 1) #These numbers have been found after lot of hyperparametric tuning and are the optimal values for this scenario.
    self.bn4 = nn.BatchNorm2d(128)
    self.fc1 = nn.Linear(10 * 10 * 128, 512) #10 * 10 * 128 is the output size resulting from all above convolutions (calculated using some formula) that goes as input to full-connection-1. Output layers from fc1 is 512. First fully connected layer which is created as an instance of the Linear class of the nn module of the torch library.
    self.fc2 = nn.Linear(512, 256) #512 is the input size as it is the output size from the previous connection fc1. 256 is the output size of fc2.
    self.fc3 = nn.Linear(256, action_size) #Output size of the final layer is the number of actions the agent can perform.

  def forward(self, state):
    x = F.relu(self.bn1(self.conv1(state))) #Same as Q learning but 3 steps combined. We need to forward pass the state in conv1 which is first sent to bn1 for normalization and then the value is updated using activation function (relu) and result is stored in x.
    x = F.relu(self.bn2(self.conv2(x))) #The x value is sent to conv2 on which the same operations are done and the x value is updated. Same is repeated for conv3 and conv4.
    x = F.relu(self.bn3(self.conv3(x)))
    x = F.relu(self.bn4(self.conv4(x)))
    x = x.view(x.size(0), -1) #Basically we reshape the tensor x to a flattened layer inorder to fit into the fully connected layers. '.view(.size(0), -1)' -> the dimensions corresponding to the batch except the first dimension are flattened (this is for x to fit into the fully connected layers).
    x = F.relu(self.fc1(x)) #x is passed into the first fully connected layer and the x value is updated. The same is repeated for fc2.
    x = F.relu(self.fc2(x))
    return self.fc3(x)


## Part 2 - Training the AI

### Setting up the environment

In [None]:
import gymnasium as gym
#There are multiple versions of Pacman but we are using 'MsPacmanDeterministic-v0' which is the most simplified one.
env = gym.make("MsPacmanDeterministic-v0", full_action_space = False) #Creates an environment object that imports/calls and stores the Pacman environment. 'full_action_space = False' ensures that the agent uses simplified steps thereby reducing the complexity of the environment.
state_shape = env.observation_space.shape #Provied the dimensions of the possible observations (state vector). (210, 160, 3) corresponds to the rgb channels.
state_size = env.observation_space.shape[0] #Retrieves the first dimension of the observations which is the number of elements in the state vector.
number_actions = env.action_space.n #Provides the number of actions the agent can take. Pacman can take 9 actions.
print("State shape: ", state_shape)
print("State size: ", state_size)
print("Number of actions: ", number_actions)


  logger.deprecation(


State shape:  (210, 160, 3)
State size:  210
Number of actions:  9


### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4
minibatch_size = 64 #Number of observations used in one step of the AI to update the model parameters.
discount_factor = 0.99 #Gamma used in Bellman equation. It is the present value of future rewards (Close to 1 indicates higher reward and close to 0 indicates lower reward).

#The hyperparameters are same as before except that the minibatch_size has been changed from 100 t0 64. No interpolation_parameter is used as there is no soft_update implementation here (not requiered for this particular Pacman convolution q learning).
#replay_buffer_size is also not used here as we dont use ReplayMemory class here (replay_buffer_size was a hyperparameter of the ReplayMemory class). We implement experience replay using a much more simpler method.

### Preprocessing the frames

In [None]:
#We are not implementing Experience Replay like in deep Q learning here as input is no longer vector states (small size), instead they are heavy images (large size) and visual inputs and requier more memory. Hence we pre-process the frames so that they can be converted to pytorch tensors which are accepted by the convolution layers and the fully-connected layers.
from PIL import Image #Image class is imported from the PIL (python imaging library) which allows to load images from files and create new images.
from torchvision import transforms #transforms module is imported from the torchvision library.

def preprocess_frame(frame): #Preprocessing function takes the frames as the input (real frames coming from the game of pacman).
  frame = Image.fromarray(frame) #Numpy array of frames (image) is converted into a PIL image object.
  preprocess = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()]) #Preprocess object is created as an instance of the Compose class which is taken from the tranpose module. We resize, normalize and transform the PIL image object (frames) to pytorch tensors.
  return  preprocess(frame).unsqueeze(0) #The result is returned and the extra dimension (batch number) is added using '.unsqueeze()' to understand the batch number.


### Implementing the DCQN class

In [None]:
class Agent():


  #Method1: Creating qnetworks.
  def __init__(self, action_size): #We remove state_size everywhere as it is not requiered here(image input used).
     self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #Sets the CPU or GPU for pytorch operations.
     self.action_size = action_size
     self.local_qnetwork = Network(action_size).to(self.device) #We create local_qnetwork which is an instances of the Network class and is sent to the GPU/CPU. The local_qnetwork basically selects the actions.
     self.target_qnetwork = Network(action_size).to(self.device) #We create target_qnetwork which is an instances of the Network class and is sent to the GPU/CPU. The target_qnetwork calculates the target q values to be used in training the local_qnetwork.
     self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate) #An optimizer object is created as an instance of the Adam class of the optim module. It optimizes the parameters of the local_qnetwork for the agent to take better actions. Learning rate is also taken as an argument.
     self.memory = deque(maxlen = 10000) #Memory is defined in a double ended queue with maximum length being 10000 to store images/frames.
     #time_stamp is not used as the agent is not learning every 4 steps anymore.

  #Method2: Stores experiences and decides when to learn from them.
  def step(self, state, action, reward, next_state, done): #Method step takes experience.
  #Push is a function of the ReplayMemory class and since we are not using the ReplayMemory class, we remove the line of code that calls the push function.
    state = preprocess_frame(state)  #The state and next_state are preprocessed and updated using the preprocess_frame function written above.
    next_state = preprocess_frame(next_state)
    self.memory.append((state, action, reward, next_state, done)) #We update the preprocessed state, next_state and the action, reward to done to the memory using a simple '.append()' function as the memory is a double ended queue.
    if len(self.memory) > minibatch_size: #Ensures that the number of experiences in the memory is greater than the minibatch_size.
    #Instead of using the sample function which we had written in the ReplayMemory class in Deep Q learning, here we call the sample function from the random library as we dont need a ReplayMemory class.
      experiences = random.sample(self.memory, k = minibatch_size) #Samples experiences from the memory (sampling size being minibatch_size) and stores it in the variable 'experiences'.
      self.learn(experiences, discount_factor) #Learn method is called with parameters experiences and discount_factor aka gamma.

  #Method3: Choses an action based on a given state and epsilon value.
  def act(self, state, epsilon = 0.): #Epsilon in initialized as a float.
    state = preprocess_frame(state).to(self.device) #The preprocessed images/frames (state) is sent to GPU/CPU. Unsqueeze method has already been implemented during preprocessing.
    self.local_qnetwork.eval() #The local_qnetwork is set to eval mode. The local_qnetwork is an instance of the network class and the network class has the .eval() function.
    with torch.no_grad(): #Ensures that the model is in inferance mode (does predictions) instead of training mode by disabling gradiant computations.
      action_values = self.local_qnetwork(state) #We pass the state through the local_qnetwork to get the predictions from the model. This is stored in action_values (these are q values). These are q values of a given state (current state).

    #However we need to update these values using epsillon. So action_values can be viewed as q values which is later updated with epsillon values to get the final prediction/take final action.
    self.local_qnetwork.train() #Sets the model back in training mode using '.train() which is a function of the Network class.
    if random.random() > epsilon: #We generate a random number using the random function and check if it is greater than epsilon.
      return np.argmax(action_values.cpu().data.numpy()) #The 'np.argmax()' selects the value with the highest q value from action_values. The 'cpu()' is used to send the operation to the CPU as it is a simple operation, '.data.numpy()' is used as 'np.argmax()' expects a numpy format of data.
    else:
      return random.choice(np.arange(self.action_size)) #The '.choice()' function of the random library randomly choses one among the 4 possible actions (0, 1, 2, 3). 'np.arange()' is used to understand a range i.e, here it is used to understand the action_size which is 4, so the range is from 0 to 4 -> 0, 1, 2, 3 and '.choice()' choses one among these actions randomly.

  #Method4: Implementing the learn method. It updates the q_values of the local_qnetwork.
  def learn(self, experiences, discount_factor):
    states, actions, rewards, next_states, dones = zip(*experiences) #Unpacking experiences into its different element states. We use 'zip()' here as we dont have the stacks of these elements (which was obtained using vstack in Deep Q learning) since we are not using ReplayMemory.
    #Right now the states and next_states are pytorch tensors as they have been preprocessed in the step function But it okay to apply torch.from_numpy() again.
    #np.vstack() Stacks the states from the experiences(states or any other element in experiences) by extracting them and stacking them.
    #torch.from_numpy() converts these numpy arrays (containing the stacked states from experiences) into pythorch tensors for compuation.
    states = torch.from_numpy(np.vstack(states)).float().to(self.device) #We can avoid torch.from_numpy() for states and next_states (to avoid redundancy) as they are already pytorch tensors.
    actions = torch.from_numpy(np.vstack(actions)).long().to(self.device)
    rewards = torch.from_numpy(np.vstack(rewards)).float().to(self.device)
    next_states = torch.from_numpy(np.vstack(next_states)).float().to(self.device)
    dones = torch.from_numpy(np.vstack(dones).astype(np.uint8)).float().to(self.device)
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1) #We forward pass the next_states to the target_qnetwork to predict the q values of the next states. We detach the obtained q values and find the max value from it so that the agent can go to that new state (Bellman Theory). '.unsqueeze(1)' adds the batch number as an extra dimension at index 1. These are intermediary q values (q values of intermediary states) used in calculating the q_target.
    q_targets = rewards + (discount_factor * next_q_targets * (1 - dones)) #Bellman Equation. This is different from action_values as q_targets incorporate future rewards to update q parameters as requiered.It is the final q value of the target state which is calculated by incorporating the intermediary q values (next_q_targets).
    q_expected = self.local_qnetwork(states).gather(1, actions) #The expected q value from the local_networ (current states) is calculated and gathered.
    loss = F.mse_loss(q_expected, q_targets) #Calculates the loss between the target value and the expected value. 'F.mse_loss' gives the mean squared error loss where F is a module of the torch library.
    self.optimizer.zero_grad() #Resets gradiant parameters to 0 as it has to be done before back propogation.
    loss.backward() #The loss is backpropogated back to the nodes/network.
    self.optimizer.step() #Parameters of the model are updated (local q_network) using the step function.

#There is no soft_update requiered here.

### Initializing the DCQN agent

In [None]:
agent = Agent(number_actions) #We are calling an instance of the 'Agent' class and we call this object (the instance of the 'Agent' class in an object) as 'agent' and it will take the number of actions as parameter.


### Training the DCQN agent

In [None]:
#Initializing.
number_episodes = 2000 #Maximum number of episodes over which we need to train our model.
maximum_number_timesteps_per_episode = 10000 #Maximum time steps per episode is set to 10000 to prevent infinite loop.
#A timestep represents one step in the interaction between the agent and the environment. An episode is a sequence of timesteps.
epsilon_starting_value = 1.0 #Starting value of epsilon is set.
epsilon_ending_value = 0.01 #Ending value of epsilon is set.
epsilon_decay_value = 0.995 #Decay value of epsilon is set. During updation the epsilon value is multiplied with the epsilon decay value (will be done till we arrive at epsilon_ending_value).
epsilon = epsilon_starting_value #The epsilon value is initialized to its start value.
scores_on_100_episodes = deque(maxlen = 100) #Scores are calculated after every 100 episodes. Score basically refers to the cumilative reward that the agent receives for its actions and in this case it is calculated after every 100 episodes. 'deque' is a double ended queue.

#We are going to measure the score of the model (cumilative reward) after every 100 episodes.
for episode in range(1, number_episodes + 1): #We loop episodes from 1 to total number of episodes over which we train the model. We add +1 as python does not consider upper limit of range by default.
  state, _ = env.reset() #Here 'state' returns the initial state and '_' eliminates any other unnecessary thing that is returned along with state. The environment is reset as we are starting a new episode.
  score = 0 #We initialize the score to 0 before an episode.
  for t in range(maximum_number_timesteps_per_episode): #We start the second loop which is from 1 to the maximum number of timesteps per episode that we prescribed. This is to prevent the AI from going into an infinite state (similar to infinite loop).
    action = agent.act(state, epsilon) #The model now has to chose an action which is done using the function .act() which is described in the 'Agent' class. 'agent' was initialized as an instance of the 'Agent' class earlier.
    next_state, reward, done, _,_ = env.step(action) #Information we get after performing an action (next_state, reward, done) are returned by the step function for the particular action. The '_' is used to avoid unnecessary information coming with the step function.
    agent.step(state, action, reward, next_state, done) #The step method (which takes experience as parameters) is implemented which has the learn method with which the agent learns.
    state = next_state #The current state is updated to the next state.
    score += reward #The score is updated with the reward. Score is basically the cumilative reward. So we add reward for each action to the score every time an action is performed.
    if done: #If the episode is done at a specific time_step before the maximum number of time_steps then we break the episode and go to the next one.
      break
  scores_on_100_episodes.append(score) #We append the score of the finished episode to the window of 100 episodes. The oldest episode gets removed if the number of episodes exceed 100.
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon) #The epsilon value is updated by multiplying it with the epsilon_decay_value and the higher value among epsilon and epsilon_end_value is chosen as the next epsilon.
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "") #Real-time score of every episode is printed and is replaced by the next one (more like running/real-time score update).
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes))) #Score after every 100 episodes are printed.
  if np.mean(scores_on_100_episodes) >= 500.0:
    print('\n Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes))) #The solution episode and the average score is printed when average score crosses 500 which is the ideal result for pacman.
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth') #Model parameters (state, action, reward, next_state, done) are saved in 'checkpoint.pth' file (pytorch file) using .state_dict() function.
    break

Episode 12	Average Score: 277.50

## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action)
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'MsPacmanDeterministic-v0') #Name of the environment is different from that of lunar-lander. Apart from that no other changes to the visualization part.

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

  logger.deprecation(
  logger.warn(
