<a href="https://colab.research.google.com/github/EslamMahmoud001/AI-Deep_Learning/blob/main/Deep_Convolutional_Q_Learning_for_Pac_Man.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Convolutional Q-Learning for Pac-Man

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!pip install ale-py
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m17.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting ale-py>=0.9 (from gymnasium[accept-rom-license,atari])
  Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling colle

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
from torch.utils.data import DataLoader, TensorDataset

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module): #Inherit from nn module
  def __init__(self, action_size, seed = 42):
    super(Network, self).__init__()
    self.seed = torch.manual_seed(seed)

    # building the eyes (convolution layers network)

    #Convolutional layer 1
    self.conv1 = nn.Conv2d(3, 32, kernel_size=8, stride = 4) # 3 input channels (rgb), 32 output channels (convolutional layer) for pacman game, 8x8 kernel size, stride = 4
    # Batch Normalization operation for layer 1
    self.bn1 = nn.BatchNorm2d(32) # 32 is the number of output channells of convonutional layer 1

    #Convolutional layer 2
    self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride = 2) # 32 input channels (convolutional layer 1), 64 output channels (convolutional layer 2) for pacman game, 4x4 kernel size, stride = 2
    # Batch Normalization operation for layer 2
    self.bn2 = nn.BatchNorm2d(64) # 64 is the number of output channells of convonutional layer 2

    #Convolutional layer 3
    self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride = 1) # 64 input channels (convolutional layer 2), 64 output channels (convolutional layer 3) for pacman game, 3x3 kernel size, stride = 1
    # Batch Normalization operation for layer 3
    self.bn3 = nn.BatchNorm2d(64) # 64 is the number of output channells of convonutional layer 3

    #Convolutional layer 4
    self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride = 1) # 64 input channels (convolutional layer 3) , 128 output channels (convolutional layer 4) for pacman game, 3x3 kernel size, stride = 1
    # Batch Normalization operation for layer 4
    self.bn4 = nn.BatchNorm2d(128) # 128 is the number of convonutional layer 4

    # Now agent have eyes

    # building the brain (full connection layers neural network)

    #flattening formula for each convolutional layer
    # The Pacman game typically uses an input size of  210 × 160 × 3 210×160×3 (height  𝐻 = 210 H=210, width  𝑊 = 160 W=160, and 3 color channels for RGB). Assuming no padding is applied ( Padding = 0 Padding=0):
    # Recalculate the output size of each convolutional layer based on the input size (128x128),
    # kernel size, stride, and padding using the formulas:
    #
    # H_out = floor((H_in - kernel_size + 2 * padding) / stride) + 1
    # W_out = floor((W_in - kernel_size + 2 * padding) / stride) + 1
    #
    # Convolutional Layers:
    # 1. Conv1:
    #    H_in = 128, W_in = 128, kernel_size = 8, stride = 4, padding = 0
    #    H_out = floor((128 - 8 + 2 * 0) / 4) + 1 = 31
    #    W_out = floor((128 - 8 + 2 * 0) / 4) + 1 = 31
    #    Resulting size: 31x31, output channels = 32
    #
    # 2. Conv2:
    #    H_in = 31, W_in = 31, kernel_size = 4, stride = 2, padding = 0
    #    H_out = floor((31 - 4 + 2 * 0) / 2) + 1 = 14
    #    W_out = floor((31 - 4 + 2 * 0) / 2) + 1 = 14
    #    Resulting size: 14x14, output channels = 64
    #
    # 3. Conv3:
    #    H_in = 14, W_in = 14, kernel_size = 3, stride = 1, padding = 0
    #    H_out = floor((14 - 3 + 2 * 0) / 1) + 1 = 12
    #    W_out = floor((14 - 3 + 2 * 0) / 1) + 1 = 12
    #    Resulting size: 12x12, output channels = 64
    #
    # 4. Conv4:
    #    H_in = 12, W_in = 12, kernel_size = 3, stride = 1, padding = 0
    #    H_out = floor((12 - 3 + 2 * 0) / 1) + 1 = 10
    #    W_out = floor((12 - 3 + 2 * 0) / 1) + 1 = 10
    #    Resulting size: 10x10, output channels = 128
    #
    # Final Flattened Size:
    # 10 * 10 * 128 = 12,800



    self.fc1 = nn.Linear(10*10*128, 512) # 512 neurons for the first fully connected layer by experiment
    self.fc2 = nn.Linear(512, 256)
    self.fc3 = nn.Linear(256, action_size)

    # Now the agent have brain


    # Implementing the forward propagation

  def forward(self, state): #input is the state because its gonna propagate the state from the input layer to the output layer

    # *****propagating from the image to the convolutional layer

    # Signal from Images to 1st conv layer then from from 1st conv to 1st batch normalization layer, then activate it using rectifier func
    x = F.relu(self.bn1(self.conv1(state)))

    # Signal from 1st conv layer to 2nd conv layer then from  2nd conv to 2nd batch normalization layer, then activate it using rectifier func
    x = F.relu(self.bn2(self.conv2(x)))

    # Signal from 2nd conv layer to 3rd conv layer then from  3rd conv to 3rd batch normalization layer, then activate it using rectifier func
    x = F.relu(self.bn3(self.conv3(x)))

    # Signal from 3rd conv layer to 4th conv layer then from  4th conv to 4th batch normalization layer, then activate it using rectifier func
    x = F.relu(self.bn4(self.conv4(x)))

    # reshape for flattening
    x = x.view(x.size(0), -1)

    # ******* propagating from the conv layers to ANN

    # propagate the signal from the input layer to first fully connected layer with rectfier activation function
    x = self.fc1(x) # take the state as the input to the first fully connected layer
    x = F.relu(x) # assigning it to rectifier activation function

    # propagate the signal from the  first fully connected layer to the second fully connected layer with rectfier activation function
    x = self.fc2(x) # take the first fully connected layer output as the input to the second fully connected layer
    x = F.relu(x) # assigning it to rectifier activation function

    # propagate the signal from the  second fully connected layer to the output with rectfier activation function
    return self.fc3(x) # take the second fully connected layer output as the input of the output layer


## Part 2 - Training the AI

### Setting up the environment

In [None]:
import ale_py
import gymnasium as gym
env = gym.make('MsPacmanDeterministic-v0', full_action_space = False)
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (210, 160, 3)
State size:  210
Number of actions:  9


  logger.deprecation(


### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4 # From expermintation for Training Ai to play pacman
minibatch_size = 64 # Number of observations used in one step of the training to update the weights
discount_factor = 0.99 # Close to one to make the agent look for accumlated future reward (not being short sighted)
# REPLAY MEMORY NOT NEEDED FOR CONVOLUTIONAL DEPP Q - LEARNING
# Soft update not needed for this specific enviroment

### Preprocessing the frames

In [None]:
# To make the input images converted into Pytorch tensors
# So they can be fed into the ANN

from PIL import Image
from torchvision import transforms

def preprocess_frame(frame):
  #convert the numpy array into PIL image
  frame = Image.fromarray(frame)
  # Do pre-processing (making the dimesnsions smaller and reshape it into squares 128x128 pixel)
  preprocess = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()])
  return preprocess(frame).unsqueeze(0) # .unsqueeze(0) to keep track of which batch each frame belongs to, and set it to the first dimension

### Implementing the DCQN class

In [None]:
class Agent():

# state size not needed for images input

  def __init__(self, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.action_size = action_size

    self.local_qnetwork = Network(action_size).to(self.device)
    self.target_qnetwork = Network(action_size).to(self.device)

    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate) # Intiallizing the optimizer
    self.memory = deque(maxlen= 10000) # instead of the replay memory

  # Store experiences and decide when to learn from them
  def step(self, state, action, reward, next_state, done):

    #pre-process the state and the next state
    state = preprocess_frame(state)
    next_state = preprocess_frame(next_state)

    # append experience to the memory as tuple
    self.memory.append((state, action, reward, next_state, done))

    if len(self.memory) > minibatch_size: #there are at least 64 of observations
      experiences = random.sample(self.memory, k = minibatch_size) # take random 64 sample (minibatch) of the observations from the memory

      # then learn
      self.learn(experiences, discount_factor)

  #Act method thatt will select an action based on a given state and certain epsilon value for an epsilon greedy action selection policy
  def act(self, state, epsilon = 0.):

    state = preprocess_frame(state).to(self.device) # preprocess the state
    self.local_qnetwork.eval # putting the local q network in evaluation mode

    # do check that we are in predection (inference) mode not training mode
    with torch.no_grad():
      # Now we making prediction
      action_values = self.local_qnetwork(state)

    #return back to training mode
    self.local_qnetwork.train()

    # Now use the epsilon, generate random number, if the random number > epsilon, then select the action number with the highest q value, else select random action
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  # Make the learn method
  def learn(self, experiences, discount_factor):

    # Implementing Eligibility Trace (stacking the experience elements)
    states, actions, rewards, next_states, dones = zip(*experiences)

    states = torch.from_numpy(np.vstack(states)).float().to(self.device) #stacking all the states from the sampled experienced together, # conver states into pytorch tensors, # Convert them to float, # Make sure this functions whether CPU or GPU
    actions = torch.from_numpy(np.vstack(actions)).long().to(self.device) #stacking all the actions from the sampled experienced together, # conver states into pytorch tensors, # Convert them to long integers, # Make sure this functions whether CPU or GPU
    rewards = torch.from_numpy(np.vstack(rewards)).float().to(self.device) #stacking all the rewards from the sampled experienced together, # conver states into pytorch tensors, # Convert them to float, # Make sure this functions whether CPU or GPU
    next_states = torch.from_numpy(np.vstack(next_states)).float().to(self.device) #stacking all the next states from the sampled experienced together, # conver states into pytorch tensors, # Convert them to float, # Make sure this functions whether CPU or GPU
    dones = torch.from_numpy(np.vstack(dones).astype(np.uint8)).float().to(self.device) #stacking all the dones from the sampled experienced together, # conver states into pytorch tensors, # Convert them to boolean, # Make sure this functions whether CPU or GPU



    # prepare to compute Cross-Entropy Function
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1) # forward propagate next state from our target q network, this gives the action values of our target q network propagating the next state, detatch the action values in the tensror, since we want to take the maximum q values, we need the maximum value along dimension 1, square bracket zero is because we dont want its indices
    q_targets = rewards + (discount_factor * next_q_targets * (1 - dones))

    q_expected = self.local_qnetwork(states).gather(1, actions)

    # Compute the loss function (Cross-Entropy)
    loss = F.mse_loss(q_expected, q_targets)

    # Intialize the optimizer (reset it)
    self.optimizer.zero_grad()

    # Back Propagate the loss
    loss.backward()

    # single optimization step
    self.optimizer.step()

# Self update not needed

      # # Update the target network parameters with thios of local network parameters
      # self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  # # Method that will update the parameters
  # def soft_update(self, local_model, target_model, interpolation_parameter):
  #   for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
  #     target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)


### Initializing the DCQN agent

In [None]:
agent = Agent(action_size = number_actions)

### Training the DCQN agent

In [None]:
number_episodes = 2000
max_number_timesteps_per_episode = 10000
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100)

for episodes in range(1, number_episodes + 1):
  # reset enviroment to intial state
  state, _ = env.reset()

  # intialize the score (cumulative reward)
  score = 0

  for t in range(max_number_timesteps_per_episode):
    action = agent.act(state, epsilon)
    next_state, reward, done, _, _ = env.step(action)

    agent.step(state=state, action=action, reward=reward, next_state=next_state, done=done)
    state = next_state
    score += reward

    if done:
      break

  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)

  # Dynamic print
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episodes, np.mean(scores_on_100_episodes)), end = "")
  if episodes % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episodes, np.mean(scores_on_100_episodes)))

  if np.mean(scores_on_100_episodes) >= 500.0:
    print('\nEnviroment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episodes - 100, np.mean(scores_on_100_episodes)))

    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break #exit training

Episode 100	Average Score: 323.80
Episode 200	Average Score: 364.60
Episode 300	Average Score: 383.40
Episode 400	Average Score: 427.40
Episode 499	Average Score: 443.00

## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action)
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'MsPacmanDeterministic-v0')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()