<a href="https://colab.research.google.com/github/Tanay2109/Mastering-Pac_Man-a-DQN-based-Approach/blob/main/DQN_for_Pac_Man.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install Dependencies
!pip install gym[atari] ale-py autorom imageio
!AutoROM --accept-license

# 📦 Imports
import gym
import numpy as np
if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_

import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import imageio
from IPython.display import HTML
from base64 import b64encode


# Environment Setup
env = gym.make("ALE/MsPacman-v5", render_mode="rgb_array")
n_actions = env.action_space.n
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# DQN Model
class DQN(nn.Module):
    def __init__(self, n_actions):
        super(DQN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=8, stride=2),  # Output: [32 x 105 x 80]
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), # Output: [64 x 52 x 40]
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=2), # Output: [64 x 26 x 20]
            nn.ReLU()
        )
        # Dynamically calculate the input size for the fully connected layer


        # Create a dummy input tensor with the correct shape
        dummy_input = torch.zeros(1, 3, 210, 160)  # Assuming input shape is (3, 210, 160)
        dummy_output = self.conv_layers(dummy_input)
        fc_input_size = dummy_output.reshape(dummy_output.size(0), -1).shape[1]

        # Now use fc_input_size
        self.fc = nn.Sequential(
            nn.Linear(fc_input_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def forward(self, x):
        x = x / 255.0  # normalize pixel values
        x = self.conv_layers(x)
        x = x.reshape(x.size(0), -1)
        return self.fc(x)

# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    def sample(self, batch_size):
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return (np.stack(states), actions, rewards, np.stack(next_states), dones)
    def __len__(self):
        return len(self.buffer)

# Epsilon-Greedy Action Selection
def select_action(state, policy_net, epsilon, n_actions):
    if random.random() < epsilon:
        return random.randint(0, n_actions - 1)
    else:
        with torch.no_grad():
            state = torch.FloatTensor(state).permute(2, 0, 1).unsqueeze(0).to(device)
            return policy_net(state).argmax().item()

# Training Loop
def train_dqn(env, episodes=300, gamma=0.99, batch_size=32, buffer_limit=10000,
              learning_rate=1e-4, target_update=10):
    policy_net = DQN(n_actions).to(device)
    target_net = DQN(n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()
    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    memory = ReplayBuffer(buffer_limit)
    epsilon = 1.0
    epsilon_min = 0.05
    epsilon_decay = 0.995
    rewards = []

    for episode in range(episodes):
        obs = env.reset()[0]
        total_reward = 0
        done = False

        while not done:
            action = select_action(obs, policy_net, epsilon, n_actions)
            next_obs, reward, done, truncated, _ = env.step(action)
            memory.push(obs, action, reward, next_obs, done)
            obs = next_obs
            total_reward += reward

            if len(memory) >= batch_size:
                states, actions, rewards_, next_states, dones = memory.sample(batch_size)
                states = torch.FloatTensor(states).permute(0, 3, 1, 2).to(device)
                next_states = torch.FloatTensor(next_states).permute(0, 3, 1, 2).to(device)
                actions = torch.LongTensor(actions).unsqueeze(1).to(device)
                rewards_ = torch.FloatTensor(rewards_).to(device)
                dones = torch.FloatTensor(dones).to(device)

                q_values = policy_net(states).gather(1, actions).squeeze()
                next_q = target_net(next_states).max(1)[0]
                expected_q = rewards_ + (1 - dones) * gamma * next_q

                loss = nn.MSELoss()(q_values, expected_q)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        rewards.append(total_reward)

        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

        if episode % 10 == 0:
            print(f"Episode {episode}, Total reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")

    return policy_net, rewards

# Train Agent
trained_model, reward_log = train_dqn(env, episodes=200)

# Save Gameplay Video
def record_video(env, model, path="pacman.mp4", max_frames=500):
    frames = []
    obs = env.reset()[0]
    for _ in range(max_frames):
        frame = env.render()
        frames.append(frame)
        action = select_action(obs, model, epsilon=0.0, n_actions=n_actions)
        obs, _, done, truncated, _ = env.step(action)
        if done:
            break
    imageio.mimsave(path, frames, fps=30)
    mp4 = open(path,'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML(f'<video width=400 controls><source src="{data_url}" type="video/mp4"></video>')

# Watch Agent Play
record_video(env, trained_model)


AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/usr/local/lib/python3.11/dist-packages/AutoROM/roms

Existing ROMs will be overwritten.
Episode 0, Total reward: 250.00, Epsilon: 0.99
Episode 10, Total reward: 250.00, Epsilon: 0.95
Episode 20, Total reward: 180.00, Epsilon: 0.90
Episode 30, Total reward: 320.00, Epsilon: 0.86
Episode 40, Total reward: 260.00, Epsilon: 0.81
Episode 50, Total reward: 490.00, Epsilon: 0.77
Episode 60, Total reward: 220.00, Epsilon: 0.74
Episode 70, Total reward: 280.00, Epsilon: 0.70
Episode 80, Total reward: 370.00, Epsilon: 0.67
Episode 90, Total reward: 280.00, Epsilon: 0.63
Episode 100, Total reward: 230.00, Epsilon: 0.60
Episode 110, Total reward: 260.00, Epsilon: 0.57
Episode 120, Total reward: 280.00, Epsilon: 0.55
Episode 130, Total reward: 290.00, Epsilon: 0.52
Episode 140, Total reward: 200.00, Epsilon: 0.49
Episode 150, Total reward: 130.00, Epsilon: 0.47
Episode 160, Total reward: 1020.00, Epsilon: 0.45
Episode 170, Tota

  logger.warn(
