In [1]:
pip install gymnasium



In [3]:
from time import sleep
import numpy as np
from IPython.display import clear_output
import gymnasium as gym
from gymnasium.envs.registration import register
import torch
from torch import nn
import torch.optim as optim
import torch.nn.functional as F

In [4]:
#Give colab access to your google drive:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [5]:
#Change current directory to folder with MiniPacMan
%cd /gdrive/MyDrive/SP 25/Reinforcement Learning/DQN

/gdrive/MyDrive/SP 25/Reinforcement Learning/DQN


In [20]:
#Import MiniPacMan environment class definition
# from MiniPacManGym import MiniPacManEnv
from MiniPacManGymV2 import MiniPacManEnv

In [21]:
#Register MiniPacMan in your gymnasium environments
register(
    id="MiniPacMan-v0",
    entry_point=MiniPacManEnv,  # Update with your actual module path
    max_episode_steps=20          # You can also set a default here
)

  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


In [22]:
#Create a MiniPacMan gymnasium environment
env = gym.make("MiniPacMan-v0", render_mode="human", frozen_ghost=False)

In [23]:
class QNetwork(nn.Module):
    def __init__(self, actions):
      super(QNetwork, self).__init__()
      self.fc1 = nn.Linear(6 * 6, 128)
      self.fc2 = nn.Linear(128, 128)
      self.fc3 = nn.Linear(128, 64)
      self.fc4 = nn.Linear(64, actions)

    def forward(self, x):
      x = x.view(x.size(0), -1)
      x = F.relu(self.fc1(x))
      x = F.relu(self.fc2(x))
      x = F.relu(self.fc3(x))
      return self.fc4(x)

In [24]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []

    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, next_states, dones = zip(*[self.buffer[i] for i in indices])
        return torch.stack(states), actions, torch.tensor(rewards), torch.stack(next_states), torch.tensor(dones)

In [25]:
q_network = QNetwork(4)
target_network = QNetwork(4)
target_network.load_state_dict(q_network.state_dict())

optimizer = optim.Adam(q_network.parameters(), lr=0.001)
loss_fn = nn.MSELoss()

In [28]:
#set hyperparams -- play with any of these!
gamma=0.95
buffer_size=5000
batch_size=128
num_episodes=10000
C = 500

RB=ReplayBuffer(buffer_size) #initialize Replay Buffer
epsilon=1 #initialize epsilon

for e in range(num_episodes):
  new_obs,info=env.reset()
  new_obs=torch.tensor(new_obs,dtype=torch.float32).unsqueeze(0)

  done=False
  truncated=False
  steps=0

  while not done and not truncated: #Loop for one episode
    obs=new_obs

    #choose action
    t=np.random.random()
    if t>epsilon:
      with torch.no_grad():
        action=torch.argmax(q_network(obs)).item() #exploitation
    else:
      action= env.action_space.sample()

    #take a step:
    new_obs,reward, done, truncated, info=env.step(action)
    new_obs=torch.tensor(new_obs,dtype=torch.float32).unsqueeze(0)
    RB.push(obs,action,reward,new_obs,done)
    steps+=1

    if len(RB.buffer)>=batch_size:
      states, actions, rewards, next_states, dones=RB.sample(batch_size)

      #current q-value
      actions = torch.tensor(actions, dtype=torch.long)
      current_q = q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)

      #compute target q-value
      with torch.no_grad():
        max_next_q = target_network(next_states).max(1)[0]
        target_q = rewards + (1 - dones.float()) * gamma * max_next_q

      #compute loss
      loss = loss_fn(current_q, target_q)

      #Q-network update rule:
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

  #reduce episilon if its not too low:
  epsilon = max(0.01, 0.5 * (1 - e / num_episodes) + 0.5 * epsilon * 0.997)

  #update target network perodically
  if e % C == 0:
        target_network.load_state_dict(q_network.state_dict())
        print(f"episode {e}: updated target network")

  #periodic reporting:
  if e>0 and e%100==0:
    print(f'episode: {e}, steps: {steps}, epsilon: {epsilon},win: {reward==10}')


episode 0: updated target network
episode: 100, steps: 2, epsilon: 0.9871379878311228,win: False
episode: 200, steps: 2, epsilon: 0.9771678981003151,win: False
episode: 300, steps: 1, epsilon: 0.9671978083695076,win: False
episode: 400, steps: 1, epsilon: 0.9572277186387,win: False
episode 500: updated target network
episode: 500, steps: 2, epsilon: 0.9472576289078924,win: False
episode: 600, steps: 2, epsilon: 0.937287539177085,win: False
episode: 700, steps: 1, epsilon: 0.9273174494462774,win: False
episode: 800, steps: 2, epsilon: 0.9173473597154698,win: False
episode: 900, steps: 1, epsilon: 0.9073772699846623,win: False
episode 1000: updated target network
episode: 1000, steps: 1, epsilon: 0.8974071802538546,win: False
episode: 1100, steps: 1, epsilon: 0.8874370905230471,win: False
episode: 1200, steps: 3, epsilon: 0.8774670007922394,win: False
episode: 1300, steps: 1, epsilon: 0.8674969110614319,win: False
episode: 1400, steps: 3, epsilon: 0.8575268213306243,win: False
episode 15

In [32]:
obs, info = env.reset()
done = False
truncated = False

while not done and not truncated:
    env.render()
    obs=torch.tensor(obs,dtype=torch.float32).unsqueeze(0)
    with torch.no_grad():
        action = torch.argmax(q_network(obs)).item()
    obs, reward, done, truncated, info = env.step(action)
    sleep(1)
    clear_output(wait=True)

env.render()
env.close()

xxxxxx
x····x
x····x
x····x
x·ᗧ··x
xxxxxx

