# Pac Man

Traning  double dueling deep convolutional Q-learning AI to solve [Pac Man](https://ale.farama.org/environments/pacman/) from [Gymnasium](https://gymnasium.farama.org/)

## Intalling packages and importing libraries

### Installing NumPy and PyTorch

In [None]:
%pip install numpy
%pip install torch
%pip install torchvision

### Installing Gymnasium

In [None]:
%pip install gymnasium
%pip install ale-py
%pip install swig # Necessary to build the wheel for box2d-py
%pip install gymnasium[box2d] # Contains lunar lander environment

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting ale-py
  Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ale-py
Successfully installed ale

### Importing Libraries

In [None]:
import os
import random
import numpy as np
from collections import deque

# Pytorch stuff

import torch
import torch.nn as nn # Neural network library
import torch.optim as optim # Optimizer to train AI
import torch.nn.functional as F # Activation function
from torch.utils.data import DataLoader, TensorDataset


## Building AI

### Neural Net Architecture

In [None]:
class DuelingDCQN(nn.Module):
  """
    Dueling Deep Convolutional Q-Network

    Dueling separates the value of the state from the value of the actions possible in that state
  """

  def __init__(self, action_size) -> None:
    super(DuelingDCQN, self).__init__()

    self.feature = nn.Sequential(
        nn.Conv2d(3, 32, kernel_size=8, stride=4),
        nn.BatchNorm2d(32),
        nn.Conv2d(32, 64, kernel_size=4, stride=2),
        nn.BatchNorm2d(64),
        nn.Conv2d(64, 64, kernel_size=3, stride=1),
        nn.BatchNorm2d(64),
        nn.Conv2d(64, 128, kernel_size=3, stride=1),
        nn.BatchNorm2d(128),
      )

    # Value stream (outputs a single scalar)
    self.value_stream = nn.Sequential(
        nn.Linear(10 * 10 * 128, 512),
        nn.ReLU(),
        nn.Linear(512, 1)  # Single value
    )

    # Advantage stream (outputs advantage for each action)
    self.advantage_stream = nn.Sequential(
        nn.Linear(10 * 10 * 128, 512),
        nn.ReLU(),
        nn.Linear(512, action_size)
    )

  def forward(self, state): # PyTorch needs this to be called forward to work

    features = self.feature(state).view(state.size(0), -1)  # Flatten
    value = self.value_stream(features)
    advantage = self.advantage_stream(features)
    # Combine value and advantage streams
    return value + (advantage - advantage.mean(dim=1, keepdim=True))

## Training AI

### Set up Pac Man environment

In [None]:
import gymnasium as gym
import ale_py

env = gym.make("MsPacmanDeterministic-v4", full_action_space=False) # Deterministic and no full action space = less computationally expensive

pac_man_state_shape = env.observation_space.shape
pac_man_action_size = env.action_space.n

# Check that environment is set up correctly
assert(pac_man_state_shape == (210, 160, 3))
assert(pac_man_action_size == 9)

### Initialize hyperparameters

In [None]:
learning_rate = 5e-4 # Good for deep Q learning
mini_batch_size = 64 # Better for Pac-Man
discount_factor = 0.99 # Optimal discount factor

# Tau removed b/c soft update doesn't improve Pac-Man results

### Preprocessing frames

In [None]:
# Convert images to PyTorch tensors

from PIL import Image
import torchvision.transforms as transforms

def preprocess_frame(frame: np.ndarray):
  """
  Preprocess frames so input images can be converted to PyTorch tensors
  """
  frame = Image.fromarray(frame) # Convert input frame to PIL image

  # Make sure image is a square
  transform = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()])

  # Function to convert PIL image to PyTorch tensor and also normalize frames
  frame_tensor = transform(frame) # Convert frame image to tensor
  return frame_tensor.unsqueeze(0) # Add batch dimension

### Double Deep Q Network

In [None]:
class Agent():
  """
  Use double deep convolutional Q-learning with gradient clipping to reduce overestimation bias
  """

  def __init__(self, action_size) -> None:
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # GPU acceleration if possible

    self.action_size = action_size

    # Q learning
    self.q_network = DuelingDCQN(action_size).to(self.device) # Local Q network
    self.target_network = DuelingDCQN(action_size).to(self.device) # Target Q network

    self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate) # Optimizer for Q network

    self.memory = deque(maxlen=10000) # Replay memory

    self.time_step = 0 # Time step for updating target network

  def step(self, state, action, reward, next_state, done):
    state = preprocess_frame(state)
    next_state = preprocess_frame(next_state)

    self.memory.append((state, action, reward, next_state, done))

    # Learning
    if len(self.memory) > mini_batch_size:
        experiences = random.sample(self.memory, k = mini_batch_size)
        self.learn(experiences, discount_factor)


  def act(self, state, epsilon=0):
    """
    Select action based on given state in environment using epsilon-greedy action selection policy.

    Epsilon-greedy is standard in Deep Q learning over softmax. It is also simpler and less computationally expensive.
    """

    state = preprocess_frame(state).to(self.device)

    self.q_network.eval()

    with torch.no_grad(): # Disable gradient computation (make sure in inference mode)
      action_values = self.q_network(state)
    self.q_network.train()

    # Use epsilon greedy action-selection policy

    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  def learn(self, experiences, gamma):
    """
    Update Q-values based on sampled experiences
    """

    states, actions, rewards, next_states, dones = zip(*experiences)

    # Convert elements of experience to PyTorch tensors and move them to device

    states = torch.from_numpy(np.vstack(states)).float().to(self.device).squeeze(1)
    actions = torch.from_numpy(np.vstack(actions)).long().to(self.device)
    rewards = torch.from_numpy(np.vstack(rewards)).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([next_states])).float().to(self.device).squeeze(1)

    # Convert Boolean data to float tensor
    dones = torch.from_numpy(np.vstack(dones)).float().to(self.device) # Last elements in experiences

    # Get best actions from the local Q-network
    # Detach = no tracking tensor gradient during backwards propagation
    # Unsqueeze = add batch info at index 1
    next_actions = self.q_network(next_states).detach().argmax(1).unsqueeze(1)

    # Get corresponding Q-values from the target Q-network
    next_q_targets = self.target_network(next_states).gather(1, next_actions)

    # Compute target Q-values
    target_q_values = rewards + (gamma * next_q_targets * (1 - dones))

    # Compute current Q-values
    predicted_q_values = self.q_network(states).gather(1, actions)

    # Calculate loss
    loss = F.mse_loss(predicted_q_values, target_q_values)

    # Backpropagate loss and update weights

    self.optimizer.zero_grad() # Init optimzer
    loss.backward() # Back propagate
    nn.utils.clip_grad_norm_(self.q_network.parameters(), 1.0)  # Gradient clipping
    self.optimizer.step() # Update weights

    # Soft update does not improve Pac-Man results

### Train an Agent

In [None]:
# Initialze an agent and frame stacking
agent = Agent(action_size=pac_man_action_size)

In [None]:
# Initialize training hyperparameters

num_episodes = 2000
max_time_steps_per_episode = 10000 # Need many time steps for Pac-Man

# Epsilon greedy hyperparameters
epsilon_start = 1.0
epsilon_decay = 0.995 # Decay epsilon slowly
epsilon_min = 0.01

epsilon = epsilon_start

# Window of scores on 100 episodes
window_of_scores = deque(maxlen=100)

In [None]:
dummy_state = torch.zeros((64, 3, 128, 128))  # Batch size 64, 3 channels
output = agent.q_network(dummy_state)
print(output.shape)  # Check if the network processes the input correctly

torch.Size([64, 9])


In [None]:
# Final training loop

for episode in range(1, num_episodes+1):
  # Reset environment to initial state
  state, _ = env.reset()
  score = 0

  # Agent learning

  for t in range(max_time_steps_per_episode):
    action = agent.act(state=state, epsilon=epsilon)

    next_state, reward, done, _, _ = env.step(action)

    reward = (reward - np.mean(reward)) / (np.std(reward) + 1e-5) # Normalize reward to stabilize training


    agent.step(state, action, reward, next_state, done)

    state = next_state
    score += reward

    if done:
      break

  window_of_scores.append(score)
  epsilon = max(epsilon_min, epsilon_decay * epsilon) # Decay epsilon

  # Print stuff to get feedback that agent is working
  print(f"\rEpisode: {episode}\tScore: {score}\tAverage Score: {np.mean(window_of_scores)}", end="") # \r allows newly printed line to over-ride previous one

  if episode % 100 == 0:
    print("")

  if np.mean(window_of_scores) >= 500: # Score is score on Pac-Man
    print(f"\nEnvironment solved in {episode} episodes!\t Average Score: {np.mean(window_of_scores)}")

    torch.save(agent.q_network.state_dict(), "pac_man_model.pth") # Save parameters to PyTorch file

    break # No more training needed


## Visualization

### Imports

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers import RecordVideo

### Video

In [None]:

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state) # AI is in inference mode after training is done
        state, reward, done, _, _ = env.step(action.item())

    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'MsPacmanDeterministic-v4')

def show_video():
    # Show video in notebook
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()