<a href="https://colab.research.google.com/github/Jeady1565/A.I-Projects/blob/main/DCQL_Pac_Man_V2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install Dependencies
!pip install gymnasium "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install ale-py gymnasium[box2d] imageio[ffmpeg]

# Import Libraries
import os
import gymnasium as gym
import ale_py
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
from torchvision import transforms
from PIL import Image
import imageio
import base64
import io
from IPython.display import HTML, display

# ------------------ Environment Setup ------------------ #
env = gym.make('ALE/MsPacman-v5', full_action_space=False)
state_shape = env.observation_space.shape
number_actions = env.action_space.n

print(f"State Shape: {state_shape}, Actions: {number_actions}")

# ------------------ Frame Preprocessing ------------------ #
def preprocess_frame(frame):
    frame = Image.fromarray(frame)
    preprocess = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()])
    return preprocess(frame).unsqueeze(0)  # Keep batch dimension



Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.
State Shape: (210, 160, 3), Actions: 9


In [None]:
# ------------------ Dueling Double DQN Model ------------------ #
class DuelingDQN(nn.Module):
    def __init__(self, action_size, seed=42):
        super(DuelingDQN, self).__init__()
        self.seed = torch.manual_seed(seed)

        # Convolutional Layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=8, stride=4)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.bn3 = nn.BatchNorm2d(64)

        # **Dynamically calculate feature size for fully connected layers**
        self._feature_size = self._get_conv_output((3, 128, 128))

        # Fully Connected Layers
        self.fc1 = nn.Linear(self._feature_size, 512)
        self.fc2 = nn.Linear(512, 256)
        self.advantage = nn.Linear(256, action_size)
        self.value = nn.Linear(256, 1)

    def _get_conv_output(self, shape):
        """Compute size of the flattened feature map dynamically."""
        with torch.no_grad():
            x = torch.zeros(1, *shape)  # Create a dummy tensor with the same input shape
            x = self.conv1(x)
            x = self.bn1(x)
            x = self.conv2(x)
            x = self.bn2(x)
            x = self.conv3(x)
            x = self.bn3(x)
            return int(np.prod(x.shape[1:]))  # Flattened size

    def forward(self, state):
        x = F.relu(self.bn1(self.conv1(state)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = x.view(x.size(0), -1)  # Flatten dynamically
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))

        # Compute Advantage and Value streams
        advantage = self.advantage(x)
        value = self.value(x)

        return value + (advantage - advantage.mean())



In [None]:
# ------------------ Agent with Replay Buffer & Soft Updates ------------------ #
class Agent:
    def __init__(self, action_size, buffer_size=100000, batch_size=64, gamma=0.99, lr=5e-4, tau=1e-3):
        self.action_size = action_size
        self.gamma = gamma
        self.tau = tau
        self.batch_size = batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Networks
        self.qnetwork_local = DuelingDQN(action_size).to(self.device)
        self.qnetwork_target = DuelingDQN(action_size).to(self.device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=lr)

        # Replay Memory
        self.memory = deque(maxlen=buffer_size)

    def act(self, state, epsilon=0.0):
        state = preprocess_frame(state).to(self.device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()

        # Epsilon-Greedy Exploration
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def step(self, state, action, reward, next_state, done):
        state = preprocess_frame(state)
        next_state = preprocess_frame(next_state)
        reward = np.clip(reward, -1.0, 1.0)  # Reward Clipping
        self.memory.append((state, action, reward, next_state, done))

        if len(self.memory) > self.batch_size:
            experiences = random.sample(self.memory, self.batch_size)
            self.learn(experiences)

    def learn(self, experiences):
        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.cat(states).float().to(self.device)
        actions = torch.tensor(actions).long().unsqueeze(1).to(self.device)
        rewards = torch.tensor(rewards).float().unsqueeze(1).to(self.device)
        next_states = torch.cat(next_states).float().to(self.device)
        dones = torch.tensor(dones).float().unsqueeze(1).to(self.device)

        # Compute Double DQN Target
        best_actions = self.qnetwork_local(next_states).detach().argmax(dim=1, keepdim=True)
        Q_targets_next = self.qnetwork_target(next_states).gather(1, best_actions).detach()
        Q_targets = rewards + (self.gamma * Q_targets_next * (1 - dones))

        # Get current Q estimates
        Q_expected = self.qnetwork_local(states).gather(1, actions)

        # Compute Loss & Optimize
        loss = F.mse_loss(Q_expected, Q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Soft Target Update
        for target_param, local_param in zip(self.qnetwork_target.parameters(), self.qnetwork_local.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1 - self.tau) * target_param.data)



In [None]:
# ------------------ Train the Agent ------------------ #
agent = Agent(number_actions)
num_episodes = 2000
max_timesteps = 10000
eps_start = 1.0
eps_end = 0.01
eps_decay = 0.999
epsilon = eps_start
scores_on_100_episodes = deque(maxlen=100)

for episode in range(1, num_episodes + 1):
    state, _ = env.reset()
    score = 0

    for t in range(max_timesteps):
        action = agent.act(state, epsilon)
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward

        if done:
            break

    scores_on_100_episodes.append(score)
    epsilon = max(eps_end, eps_decay * epsilon)

    print(f"\rEpisode {episode}\tAverage Score: {np.mean(scores_on_100_episodes):.2f}", end="")
    if episode % 100 == 0:
        print(f"\nEpisode {episode}\tAverage Score: {np.mean(scores_on_100_episodes):.2f}")

    if np.mean(scores_on_100_episodes) >= 500.0:
        print(f"\nEnvironment solved in {episode - 100} episodes! Average Score: {np.mean(scores_on_100_episodes):.2f}")
        torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
        break


Episode 100	Average Score: 268.00
Episode 100	Average Score: 268.00
Episode 200	Average Score: 277.30
Episode 200	Average Score: 277.30
Episode 300	Average Score: 295.40
Episode 300	Average Score: 295.40
Episode 400	Average Score: 286.20
Episode 400	Average Score: 286.20
Episode 500	Average Score: 325.80
Episode 500	Average Score: 325.80
Episode 600	Average Score: 419.20
Episode 600	Average Score: 419.20
Episode 700	Average Score: 367.30
Episode 700	Average Score: 367.30
Episode 800	Average Score: 463.60
Episode 800	Average Score: 463.60
Episode 900	Average Score: 450.90
Episode 900	Average Score: 450.90
Episode 961	Average Score: 500.50
Environment solved in 861 episodes! Average Score: 500.50


In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder
!pip install imageio[ffmpeg]

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action)
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30, format='FFMPEG')

show_video_of_model(agent, 'ALE/MsPacman-v5')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()



