#Ty Turner - Math 5366 - Data Science 2
#Using a DQN to simulate an Agent playing Atari-PacMan

In [None]:
# Install necessary libraries
!pip install numpy torch torchvision opencv-python pillow
!pip install --upgrade gymnasium[atari,accept-rom-license] autorom
!pip install --upgrade ale-py
!AutoROM --accept-license

# Imports
import gym
import pygame
import numpy as np
import random
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import cv2
import gymnasium as gym
import ale_py
import os
import shutil
import imageio
import datetime

Collecting autorom
  Downloading AutoROM-0.6.1-py3-none-any.whl.metadata (2.4 kB)
Collecting gymnasium[accept-rom-license,atari]
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[accept-rom-license,atari])
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Collecting ale-py>=0.9 (from gymnasium[accept-rom-license,atari])
  Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Downloading AutoROM-0.6.1-py3-none-any.whl (9.4 kB)
Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m

  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)


In [None]:
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive')

  and should_run_async(code)


Mounted at /content/drive


In [None]:
# Deep Q-Network Class
class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        num_input_channels = input_shape[0]  # stack_size × channels
        self.conv1 = nn.Conv2d(num_input_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc = nn.Sequential(
            nn.Linear(64 * 7 * 7, 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  # Flatten the tensor
        return self.fc(x)

    def predict(self, state):
        """
        Predict the best action for a given state.
        """
        with torch.no_grad():  # Disable gradient computation for inference
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # Add batch dimension
            q_values = self(state)  # Get Q-values from the model
            action = torch.argmax(q_values).item()  # Select the action with the highest Q-value
        return action

    def save_model(self, filepath, optimizer=None):
        """
        Saves the model's state dict and optionally the optimizer's state dict.
        """
        torch.save({
            'model_state_dict': self.state_dict(),
            'optimizer_state_dict': optimizer.state_dict() if optimizer else None
        }, filepath)
        print(f"Model saved to {filepath}")

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class CustomPacmanEnv(gym.Env):
    def __init__(self, render_mode=None):
        super().__init__()  # Initialize the base Gym environment
        self.render_mode = render_mode
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(210, 160, 3), dtype=np.uint8)
        self.action_space = gym.spaces.Discrete(5)  # Example action space for Pacman, adjust as needed

        # Load the base environment from the ALE/Pacman-v5
        self.base_env = gym.make("MsPacman-v4", render_mode=render_mode)
        self.event = None  # Initialize the event attribute
        self.previous_nearest_reward_distance = float('inf')  # Initialize previous distance as infinity
        self.previous_pacman_position = None  # Initialize previous position as None
        self.steps_since_last_event = 0  # Counter for steps since the last event
        self.max_inactive_steps = 100  # Threshold for inactivity

    def reset(self):
        # Call the reset method of the base environment
        state, info = self.base_env.reset()
        self.event = None  # Reset the event attribute
        self.previous_nearest_reward_distance = float('inf')  # Reset the previous distance
        self.previous_pacman_position = self.get_pacman_position(state)  # Set the initial position
        self.steps_since_last_event = 0  # Reset inactivity counter
        return state, info

    def step(self, action):
        next_state, _, done, truncated, info = self.base_env.step(action)

        # Detect events
        self.event = self.detect_event(next_state, info)
        current_nearest_reward_distance = self.calculate_nearest_reward_distance(next_state)

        # Update inactivity counter
        if self.event:  # Reset counter if an event occurs
            self.steps_since_last_event = 0
        else:  # Increment counter if no event
            self.steps_since_last_event += 1

        # Check for reward collection and update the state
        pacman_position = self.get_pacman_position(next_state)
        if pacman_position is not None:
            # Check if Pacman is on a reward
            reward_positions = self.get_reward_positions(next_state)
            if any(np.array_equal(pacman_position, reward_pos) for reward_pos in reward_positions):
                # Remove the reward from the state
                next_state[pacman_position[0], pacman_position[1]] = 0

        # Calculate the number of remaining rewards
        remaining_rewards = len(self.get_reward_positions(next_state))
        info["rewards_remaining"] = remaining_rewards  # Add this to the info dictionary

        # Calculate custom reward
        current_nearest_reward_distance = self.calculate_nearest_reward_distance(next_state)
        reward = self.custom_reward(current_nearest_reward_distance, next_state)


        # Check for inactivity penalty
        if self.steps_since_last_event > self.max_inactive_steps:
            reward -= .1  # Apply penalty for inactivity

        # Update previous distance and position
        self.previous_nearest_reward_distance = current_nearest_reward_distance
        self.previous_pacman_position = self.get_pacman_position(next_state)

        return next_state, reward, done, truncated, info

    def detect_event(self, state, info):
        """
        Detects and returns the current event based on state and info.
        """
        # Example pseudo-code for detecting events
        if "pellets_remaining" in info and info["pellets_remaining"] < self.previous_pellets:
            return "eat_pellet"
        elif "power_pellet_eaten" in info and info["power_pellet_eaten"]:
            return "eat_power_pellet"
        elif "ghost_eaten" in info and info["ghost_eaten"]:
            return "eat_ghost"
        elif "caught_by_ghost" in info and info["caught_by_ghost"]:
            return "caught_by_ghost"
        else:
            return None

    def get_ghost_positions(state):
        """
        Extract positions of all ghosts from the state.
        Assumes ghosts have a specific identifier in the state (e.g., 4).
        """
        ghost_positions = np.argwhere(state == 4)  # Assuming ghosts are represented by the value 4
        return ghost_positions

    def calculate_nearest_reward_distance(self, state):
        """
        Calculate the distance from Pacman to the nearest reward.
        """
        # Extract Pacman position and reward positions from the state
        pacman_position = self.get_pacman_position(state)
        reward_positions = self.get_reward_positions(state)

        if not reward_positions:  # If there are no rewards left, return a high value
            return float('inf')

        # Calculate distances to all rewards
        distances = [
            np.linalg.norm(np.array(pacman_position) - np.array(reward_position))
            for reward_position in reward_positions
        ]

        # Return the distance to the nearest reward
        return min(distances)

    def get_pacman_position(self, state):
        """
        Extract Pacman's position from the state.
        Assumes the state is a grid or image where Pacman has a unique identifier.
        """
        pacman_position = np.argwhere(state == 3)  # Example: Assume Pacman is represented by the value 3
        return pacman_position[0] if len(pacman_position) > 0 else None

    def get_reward_positions(self, state):
        """
        Extract positions of all rewards (e.g., pellets, power pellets) from the state.
        Assumes rewards have specific identifiers in the state.
        """
        reward_positions = np.argwhere((state == 1) | (state == 2))  # Example: Pellets = 1, Power Pellets = 2
        return reward_positions

    def custom_reward(self, current_distance, state):
        """
        Assign rewards based on the current event, distance to nearest reward, and movement.
        """
        reward = 0

        # Event-based rewards
        if self.event == "eat_pellet":
            reward += 75
        elif self.event == "eat_power_pellet":
            reward += 200
        elif self.event == "eat_ghost":
            reward += 200
        elif self.event == "collect_fruit":
            reward += 500
        elif self.event == "clear_maze":
            reward += 10000
        elif self.event == "caught_by_ghost":
            reward -= 500

        # Closeness-based reward
        if current_distance != float('inf'):
            distance_change = self.previous_nearest_reward_distance - current_distance
            if distance_change > 0:  # Pacman moved closer to the reward
                reward += 8.0
            elif distance_change < 0:  # Pacman moved further from the reward
                reward -= 5.0  # Penalize slightly for moving away

        # Penalize lack of movement
        if self.previous_pacman_position is not None:
            if np.array_equal(self.previous_pacman_position, self.get_pacman_position(state)):
                reward -= 2

        return reward

    def render(self, mode="human"):
        # Delegate rendering to the base environment
        return self.base_env.render(mode)

    def close(self):
        # Close the base environment
        self.base_env.close()

In [None]:
# Preprocess frame using PyTorch transforms (as you described)
def preprocess_frame(frame):
    transform = T.Compose([
        T.ToPILImage(),
        T.Grayscale(num_output_channels=1),  # Ensure single channel
        T.Resize((84, 84)),
        T.ToTensor()
    ])
    return transform(frame)  # Shape: [1, 84, 84]

# Stack multiple frames
def stack_frames(frames, new_frame, stack_size=4):
    if frames is None:
        frames = []  # Initialize if None
    frames.append(new_frame)
    if len(frames) > stack_size:
        frames = frames[-stack_size:]  # Keep the most recent `stack_size` frames
    elif len(frames) < stack_size:
        while len(frames) < stack_size:
            frames.append(new_frame)  # Pad with the current frame
    stacked_frames = torch.cat(frames, dim=0)  # Concatenate along the channel dim
    return stacked_frames, frames

In [None]:
def adjust_model_to_action_space(model, n_actions):
    """
    Adjusts the output layer of the model to match the number of actions.
    """
    old_fc = model.fc[-1]  # Get the current output layer
    if old_fc.out_features != n_actions:
        print(f"Adjusting model output layer from {old_fc.out_features} to {n_actions} actions.")
        model.fc[-1] = nn.Linear(old_fc.in_features, n_actions)
    return model

def safe_load_model(model, checkpoint_path, n_actions):
    """
    Loads a model from the checkpoint, adjusting for changes in action space.
    """
    checkpoint = torch.load(checkpoint_path)
    model_state_dict = checkpoint['model_state_dict']
    model_dict = model.state_dict()

    # Filter out mismatched keys (e.g., output layers with different shapes)
    filtered_state_dict = {k: v for k, v in model_state_dict.items() if k in model_dict and v.size() == model_dict[k].size()}

    # Load the filtered state dictionary
    model_dict.update(filtered_state_dict)
    model.load_state_dict(model_dict)

    # Adjust the output layer if necessary
    model = adjust_model_to_action_space(model, n_actions)

    print(f"Loaded model with adjustments for {n_actions} actions.")
    return model

# Function to load or create a new model
def initialize_model(input_shape, n_actions, optimizer=None, is_training=True):
    model = DQN(input_shape, n_actions)
    target_model = DQN(input_shape, n_actions)

    # Default save directory if no model is loaded
    save_dir = None

    choice = input("Would you like to load a saved model? (yes/no): ").strip().lower()
    task_count = int(input("Enter the number of episodes to train the model: "))

    if choice == "yes":
        model_path = input("Enter the path to the saved model file: ").strip()
        if os.path.isfile(model_path):
            checkpoint = torch.load(model_path)  # Load the checkpoint here

            model = safe_load_model(model, model_path, n_actions)

            # Create a unique save directory for this session
            if is_training:
              base_name = os.path.splitext(os.path.basename(model_path))[0]
              timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')  # Add timestamp
              save_dir = os.path.join(os.path.dirname(model_path), f"{base_name}_{timestamp}")  # Create folder with timestamp

              if not os.path.exists(save_dir):
                  os.makedirs(save_dir)
                  print(f"Directory created at {save_dir}")

            if optimizer and 'optimizer_state_dict' in checkpoint:
                optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
                print("Model loaded successfully.")
            else:
              print("Model failed.")
              sys.exit(1)
            print(f"Loaded model from {model_path}.")
        else:
            print(f"File not found at {model_path}. Starting with a new model.")
    else:
        print("Starting with a new model.")

    # Synchronize the target model with the main model
    target_model.load_state_dict(model.state_dict())

    return model, target_model, task_count, save_dir

def load_model(filepath, model, optimizer=None):
    """
    Loads the model's state dict and optionally the optimizer's state dict.
    """
    checkpoint = torch.load(filepath, weights_only=True)
    model.load_state_dict(checkpoint['model_state_dict'])
    if optimizer and 'optimizer_state_dict' in checkpoint:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        print("Model loaded successfully.")
    else:
      print("Model failed.")
      sys.exit(1)
    print(f"Model loaded from {filepath}")
    return model, optimizer

In [None]:
# Training Function
def train_dqn(dqn_model, target_model, memory, optimizer, batch_size, gamma, loss_fn):
    if len(memory) < batch_size:
        return  # Skip if there aren't enough samples

    # Use the `sample` method of ReplayMemory
    minibatch = memory.sample(batch_size)
    states, actions, rewards, next_states, dones = zip(*minibatch)

    # Convert to tensors
    states = torch.cat([s.unsqueeze(0) for s in states])  # [batch_size, 4, 84, 84]
    next_states = torch.cat([ns.unsqueeze(0) for ns in next_states])  # [batch_size, 4, 84, 84]
    rewards = torch.tensor(rewards, dtype=torch.float32)  # [batch_size]
    dones = torch.tensor(dones, dtype=torch.bool)  # [batch_size]
    actions = torch.tensor(actions).view(-1, 1)  # [batch_size, 1]

    # Calculate target Q-values
    with torch.no_grad():
        max_next_q_values = target_model(next_states).max(1)[0]  # Shape: [batch_size]
        targets = rewards + (1 - dones.float()) * gamma * max_next_q_values

    # Predicted Q-values for the actions taken
    predicted_q_values = dqn_model(states)  # Shape: [batch_size, n_actions]
    selected_q_values = predicted_q_values.gather(1, actions).squeeze(1)  # Shape: [batch_size]
    # Compute loss
    optimizer.zero_grad()
    loss = loss_fn(selected_q_values, targets)
    loss.backward()
    optimizer.step()

# Function to trigger video recording based on the highest reward
def best_episode_trigger(episode_id, reward, highest_reward):
    if reward > highest_reward:
        highest_reward = reward
        print(f"New highest reward: {reward}, recording episode {episode_id}")
        return True  # Record this episode
    return False  # Skip recording

In [None]:
# Function to clear the contents of a folder
def clear_folder(folder_path):
    # If the folder exists, delete all files inside it
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)
    # Recreate the folder after clearing
    os.makedirs(folder_path, exist_ok=True)

In [None]:
# Main Training Loop
def train_pacman():
    env = CustomPacmanEnv(render_mode="rgb_array")  # Use the custom environment
    n_actions = env.action_space.n
    input_shape = (4, 84, 84)  # Stack size × resized frame dimensions
    memory_capacity = 20000
    memory = ReplayMemory(memory_capacity)

    # Hyperparameters
    gamma = 0.99
    learning_rate = 0.00025
    epsilon = 1.0
    epsilon_min = 0.01
    epsilon_decay = 0.995
    batch_size = 32
    n_episodes = 10

    model = DQN(input_shape, n_actions)
    target_model = DQN(input_shape, n_actions)
    target_model.load_state_dict(model.state_dict())
    target_model.eval()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    loss_fn = nn.MSELoss()

    # Load or create the model
    is_training = True
    model, target_model, n_episodes, file_loc = initialize_model(input_shape, n_actions, optimizer, is_training)

    # Training loop
    for episode in range(n_episodes):
        state, _ = env.reset()
        state = preprocess_frame(state)  # Preprocess the initial state
        stacked_state, frame_stack = stack_frames(None, state, stack_size=4)  # Stack the frames

        done = False
        total_reward = 0

        while not done:
            # Select an action
            if np.random.rand() <= epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    action = torch.argmax(model(stacked_state.unsqueeze(0))).item()

            # Take action in the environment
            next_state, reward, done, truncated, info = env.step(action)
            done = done or truncated

            # Preprocess and stack the next state
            next_state = preprocess_frame(next_state)
            stacked_next_state, frame_stack = stack_frames(frame_stack, next_state, stack_size=4)

            # Store the transition in replay memory
            memory.remember(stacked_state, action, reward, stacked_next_state, done)

            # Train the model using replay memory
            train_dqn(model, target_model, memory, optimizer, batch_size, gamma, loss_fn)

            # Update the current state
            stacked_state = stacked_next_state
            total_reward += reward

        # Update epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # Update target model every 5 episodes
        if (episode + 1) % 5 == 0:
            target_model.load_state_dict(model.state_dict())

        # Save with timestamp every 25 episodes
        if (episode + 1) & 25 == 0:
            timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
            filename = file_loc + f'/PACMAN_dqn_model_{timestamp}.pth'
            model.save_model(filename, optimizer)
            print(f"Model with timestamp saved after episode {episode+1}: {filename}")

        print(f"Episode {episode+1}/{n_episodes}, Total Reward: {total_reward}, Epsilon: {epsilon}")

    # Save the trained model
    model.save_model(f'/content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model.pth', optimizer)
    env.close()

def train_pacman_with_video():
    env = CustomPacmanEnv(render_mode="rgb_array")  # Use the custom environment
    n_actions = env.action_space.n
    input_shape = (4, 84, 84)  # Stack size × resized frame dimensions
    memory_capacity = 20000
    memory = ReplayMemory(memory_capacity)

    # Hyperparameters
    gamma = 0.99
    learning_rate = 0.00025
    epsilon = 1.0
    epsilon_min = 0.01
    epsilon_decay = 0.995
    batch_size = 32
    n_episodes = 10

    model = DQN(input_shape, n_actions)
    target_model = DQN(input_shape, n_actions)
    target_model.load_state_dict(model.state_dict())
    target_model.eval()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    loss_fn = nn.MSELoss()

    # Wrap the environment to record the video
    env = gym.wrappers.RecordVideo(env, video_folder=video_folder, episode_trigger=lambda x: True)

    # Load or create the model
    is_training = True
    model, target_model, n_episodes, file_loc = initialize_model(input_shape, n_actions, optimizer, is_training)


    video_folder = '/content/pacman_training_videos'  # Local folder to save all videos
    os.makedirs(video_folder, exist_ok=True)
    clear_folder(video_folder)

    # Track the highest reward and corresponding episode
    highest_reward = [float('-inf')]  # Use a list to store the mutable highest reward
    best_episode_video_folder = "/content/best_pacman_training_video"
    os.makedirs(best_episode_video_folder, exist_ok=True)
    clear_folder(best_episode_video_folder)

    best_episode_video_wrapper = None  # Initialize the video wrapper

    # Training loop
    for episode in range(n_episodes):
        state, _ = env.reset()
        state = preprocess_frame(state)  # Preprocess the initial state
        stacked_state, frame_stack = stack_frames(None, state, stack_size=4)  # Stack the frames

        done = False
        total_reward = 0

        # Initialize the video wrapper for the current episode
        current_episode_video_wrapper = gym.wrappers.RecordVideo(env, video_folder=video_folder, episode_trigger=lambda eid: eid == episode)
        env = current_episode_video_wrapper  # Use the wrapper for recording the video

        while not done:
            # Select an action
            if np.random.rand() <= epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    action = torch.argmax(model(stacked_state.unsqueeze(0))).item()

            # Take action in the environment
            next_state, reward, done, truncated, info = env.step(action)
            done = done or truncated

            # Preprocess and stack the next state
            next_state = preprocess_frame(next_state)
            stacked_next_state, frame_stack = stack_frames(frame_stack, next_state, stack_size=4)

            # Store the transition in replay memory
            memory.remember(stacked_state, action, reward, stacked_next_state, done)

            # Train the model using replay memory
            train_dqn(model, target_model, memory, optimizer, batch_size, gamma, loss_fn)

            # Update the current state
            stacked_state = stacked_next_state
            total_reward += reward

        # Update epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # Update target model every 5 episodes
        if (episode + 1) % 5 == 0:
            target_model.load_state_dict(model.state_dict())

        # Save with timestamp every 25 episodes
        if (episode + 1) & 25 == 0:
            timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
            filename = file_loc + f'/PACMAN_dqn_model_{timestamp}.pth'
            model.save_model(filename, optimizer)
            print(f"Model with timestamp saved after episode {episode+1}: {filename}")

        # Update the video recording wrapper when a new highest reward is found
        if total_reward > highest_reward[0]:
            highest_reward[0] = total_reward  # Update the highest reward
            print(f"New highest reward: {total_reward}, recording episode {episode + 1}")

            if best_episode_video_wrapper:
                best_episode_video_wrapper.close()  # Close the previous video recording wrapper

            # Dynamically wrap the environment for the best episode only
            best_episode_video_wrapper = gym.wrappers.RecordVideo(env, video_folder=best_episode_video_folder, episode_trigger=lambda eid: eid == episode)
            env = best_episode_video_wrapper  # Use the updated wrapper for this episode

        print(f"Episode {episode+1}/{n_episodes}, Total Reward: {total_reward}, Epsilon: {epsilon}")

    # Save the trained model
    model.save_model(f'/content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model.pth', optimizer)
    env.close()

In [None]:
# Run Training
train_pacman()
#train_pacman_with_video()

Would you like to load a saved model? (yes/no): yes
Enter the number of episodes to train the model: 200
Enter the path to the saved model file: /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model.pth


  checkpoint = torch.load(model_path)  # Load the checkpoint here
  checkpoint = torch.load(checkpoint_path)
  if not reward_positions:  # If there are no rewards left, return a high value


Loaded model with adjustments for 5 actions.
Directory created at /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model_2024-12-10_19-20-01
Model loaded successfully.
Loaded model from /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model.pth.
Episode 1/200, Total Reward: -2855.7999999999706, Epsilon: 0.995
Model saved to /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model_2024-12-10_19-20-01/PACMAN_dqn_model_2024-12-10_19-21-43.pth
Model with timestamp saved after episode 2: /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model_2024-12-10_19-20-01/PACMAN_dqn_model_2024-12-10_19-21-43.pth
Episode 2/200, Total Reward: -2417.1999999999784, Epsilon: 0.990025
Episode 3/200, Total Reward: -2284.599999999981, Epsilon: 0.985074875
Model saved to /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model_2024-12-10_19-20-01/PACMAN_dqn_model_2024-12-10_19-23-21.pth

In [None]:
# Initialize the Pacman environment with render_mode as "rgb_array"
env = gym.make("MsPacman-v4", render_mode="rgb_array")

n_actions = env.action_space.n
input_shape = (4, 84, 84)  # Stack size × resized frame dimensions

# Hyperparameters
gamma = 0.99
learning_rate = 0.00025
epsilon = 0.1
epsilon_min = 0
epsilon_decay = 0.995
batch_size = 32
n_episodes = 10

video_folder = '/content/pacman_eval_videos'  # Local folder to save videos
os.makedirs(video_folder, exist_ok=True)
clear_folder(video_folder)

# Track the highest reward and corresponding episode
highest_reward = float('-inf')
best_episode_video_folder = "/content/best_pacman_eval_video"
os.makedirs(best_episode_video_folder, exist_ok=True)
clear_folder(best_episode_video_folder)

# Wrap the environment to record the video
env = gym.wrappers.RecordVideo(env, video_folder=video_folder, episode_trigger=lambda x: True)

# Initialize list to track total rewards per episode
total_test_rewards = []

model = DQN(input_shape, n_actions)
target_model = DQN(input_shape, n_actions)
target_model.load_state_dict(model.state_dict())
target_model.eval()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Load or create the model
is_training = False
model, target_model, n_eval_episodes, file_loc = initialize_model(input_shape, n_actions, optimizer, is_training)

# Set the model to evaluation mode
model.eval()
target_model.eval()

# Check if GPU is available, otherwise fallback to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# List of possible actions (adjust this if necessary)
possible_actions = [0, 1, 2, 3, 4]  # For example
steps = 0

for episode in range(n_eval_episodes):
    state, _ = env.reset()
    state = preprocess_frame(state)  # Preprocess the initial state
    stacked_state, frame_stack = stack_frames(None, state, stack_size=4)  # Stack the frames

    done = False
    total_reward = 0

    while not done:
        # Model prediction
        q_values = model(torch.FloatTensor(stacked_state).unsqueeze(0).to(device))
        q_values = q_values.detach().cpu().numpy()

        # Epsilon-greedy action selection
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_values)

        # Take action in the environment
        next_state, reward, done, truncated, info = env.step(action)
        done = done or truncated

        # Preprocess and stack the next state
        next_state = preprocess_frame(next_state)
        stacked_next_state, frame_stack = stack_frames(frame_stack, next_state, stack_size=4)

        # Update the current state
        stacked_state = stacked_next_state
        total_reward += reward

    print(f"Episode {episode+1}/{n_eval_episodes}, Total Reward: {total_reward}")
    total_test_rewards.append(total_reward)
env.close()

# Compute and print evaluation metrics
average_reward = np.mean(total_test_rewards)
print(f"Average reward over {n_eval_episodes} episodes: {average_reward}")

  logger.warn(


Would you like to load a saved model? (yes/no): yes
Enter the number of episodes to train the model: 10
Enter the path to the saved model file: /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model.pth


  checkpoint = torch.load(model_path)  # Load the checkpoint here
  checkpoint = torch.load(checkpoint_path)


Loaded model with adjustments for 9 actions.
Model loaded successfully.
Loaded model from /content/drive/MyDrive/Colab Notebooks/Math5366 - Final Project/PACMAN_dqn_model.pth.
Episode 1/10, Total Reward: 90.0
Episode 2/10, Total Reward: 670.0
Episode 3/10, Total Reward: 240.0
Episode 4/10, Total Reward: 470.0
Episode 5/10, Total Reward: 620.0
Episode 6/10, Total Reward: 320.0
Episode 7/10, Total Reward: 390.0
Episode 8/10, Total Reward: 290.0
Episode 9/10, Total Reward: 370.0
Episode 10/10, Total Reward: 450.0
Average reward over 10 episodes: 391.0
