# Installing necessities

In [None]:
# Installing necessary packages

!pip install ale-py # Package called "The Arcade Learning Environment (ALE)" which allows to develop AI agents for Atari rooms.
!pip install gymnasium # OpenAI's Gymnazium package.
!pip install --upgrade keras # Keras package.

# Setting up the environment

In [None]:
import gymnasium as gym
import ale_py
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.optim.lr_scheduler import CosineAnnealingLR

# Source #1: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html#dqn-algorithm

# Setting up our environment via ALE and Gymnazium, initializing Keras as we use Deep Q-Learning which is an approach for training reinforcment learning agents fit for games.
# Mathplolib is also being set up here as we'll generate graph at the end as a demonstration of the learning process.

gym.register_envs(ale_py) # Registering Atari environments.
env = gym.make('ALE/Pacman-ram-v5', render_mode="rgb_array") # Picking our own environment out of all that Atari offers.

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

In [None]:
# At this point we set something called "Replay Memory". This approach stores each transition (transition=agent's interaction with the environment) which is later observed by the agent, allowing it to reuse the data.
# Moreover, this approach improves learning procedure significantly.

# Two classes are required:
  ## Transition - represents a single transition in the environment. It maps pairs (state, action) to their corresponding result (next state, reward.)
    ### State - current state of the game (current state of the board). In Pacman it's the current position of the player, ghost, food, etc.
    ### Action - helps agent to "think" and make decision that affects current game state. In Pacman it could be moving left, right, up, down, etc.
    ### next_state - State after action has taken place. It represents update to grid (game borad). This provides agent with update regarding consequences of his action/s.
    ### Reward - A numerical value whcih given to the agent after each action. It serves as a reflection of how good/bad the action was or how good/bad what its impact on the game.
  ## ReplayMemory - This is a buffer that holds transition obsorved recently.


Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


# Setting up Deep Q Learning network

In [None]:
# Q-Learning and Q-values
## In Deep Q Learning our goal is to approximate the Q-values for each action in every state. During training the neural network takes a state as an input and outputs Q-values for all possible actions (action evaluation).
## The Q-value of a state-action pair represents the expected cumulativ reward agent receives by taking this action.
## Based on those Q-values the agent performs "optimal action-selection".


# Q-Network
## In our implementation of Q-network we use something called "feed-forward network" whose primary goal is to approximate aformentioned Q-values for each possible action at certain state.
## The network takes current game state (grid state) as an input and "observes" chages and differences between current state and the previous one.
## Subsequently, network calculates and returns Q-values for each possible action where each value represent a reward that agent gets in case it decides to take this action.

## Input [curent_state] -> Q-Network -> Q(current_state, action)



class DQN(nn.Module):

    def __init__(self, n_observations, n_actions): # n_observations - number of observations (states) in the environment, n_actions - number of actions that agent can take.
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 256) 
        self.layer2 = nn.Linear(256, 256)
        self.layer3 = nn.Linear(256, n_actions) # The output of the network is a vector of n_actions elements, each element represents Q-value for each action.



    # The function below is called "Activation function". Activation fucntion is applied on network's output at each layer and server for introduction of non-linearity thus allow the network to learn complex patterns.
    # Varous activation functions exists in the example below ReLU is used. ReLU returns output directly in case the input is positive otherwise returns zero.

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.flatten(start_dim=1) # Flattening the input tensor to 1D tensor.
        x = F.relu(self.layer1(x)) # Applying ReLU activation function on the output of the first layer.
        x = F.relu(self.layer2(x)) 
        return self.layer3(x) # Final layer produces n_actions ( Q(current_state, action) )

# Preparing for training

In [None]:
 # Training

## Here we instantiate our model and its optimizers plus some utulities.
### select_action - sometimes our model gets to pick the following action sometimes one is picked uniformly, probability of that starts at EPS_START and will decay (controlle by EPS_DECAY) exponentialy towards EPS_END
### plot_duration - for plotting duration episodes


# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 256 
GAMMA = 0.95
EPS_START = 1.0 # Start with full exploration
EPS_END = 0.05 # Minimal exploration toward the end
EPS_DECAY = 50000 # Exploration vs exploitation
TAU = 0.005
LR_START = 1e-3 

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
print(state.shape)
n_observations = np.prod(state.shape)  # Calculate total elements after flattening

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR_START, amsgrad=True, weight_decay=0.006) # Optimizer for the network. 
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.7, patience=10, verbose=True) # Scheduler for learning rate
memory = ReplayMemory(100000) # Store previous transitions

steps_done = 0

def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY) # Exponential decay of epsilon
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1).indices.view(1, 1) # Pick the action with the highest Q-value
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long) # Randomly pick an action

# Optimizer

In [None]:
# Training Loop

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE) # Sample a random batch of transitions
    batch = Transition(*zip(*transitions)) 

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))


    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

    return loss.item()

# Evaluation

In [None]:
# Evaluation Function
def evaluate_agent(env, agent, device, episode, render=False):
    state, _ = env.reset() 
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0) # Convert state to tensor
    total_reward = 0
    if render:
      env.start_recording(video_name=f"episode_{episode + 1}")

    for t in count():
        with torch.no_grad():
            action = agent(state).max(1).indices.view(1, 1)
            if t < 30:
                action = torch.tensor([[2]], device=device, dtype=torch.long) # Sometimes the agent gets stuck, so we force it to move up

        observation, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward

        if render:
            env.render()

        if terminated or truncated:
            break

        state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0) # Update state

    return total_reward

# Logging Function
def log_metrics(total_rewards):
    print(f"Mean Reward: {np.mean(total_rewards)}")
    print(f"Max Reward: {np.max(total_rewards)}")
    print(f"Min Reward: {np.min(total_rewards)}")
    print(f"Standard Deviation: {np.std(total_rewards)}")

# Model training

In [None]:
def show_frame(env):
    # Render the current state of the environment as an image
    frame = env.render()
    plt.imshow(frame)
    plt.axis("off")
    plt.show()

In [None]:
# Monitor the reward
rewards = []
episode_avg_losses = []
episode_durations = []

In [None]:
num_episodes = 12000
evaluation_interval = 20  # Evaluate every 100 episodes
evaluation_episodes = 5   # Number of episodes to run during evaluation

for i_episode in range(num_episodes):
    
    # Save the model every 1000 episodes
    if i_episode % 1000 == 0 and i_episode > 0:
        torch.save(policy_net.state_dict(), f"policy_net_{i_episode}.pth")
        torch.save(target_net.state_dict(), f"target_net_{i_episode}.pth")
        print(f"Checkpoint saved at episode {i_episode}")

    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    total_reward = 0
    episode_loss = []
    
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item()) # Perform the action and observe the next state and reward
        reward = torch.tensor([reward], device=device) 
        total_reward += reward
        done = terminated or truncated

        if terminated:
            next_state = None 
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward) 

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        step_loss = optimize_model()

        if step_loss is not None:
          episode_loss.append(step_loss)

        # save model
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        for key in policy_net.state_dict():
          target_net.state_dict()[key].data.copy_(
          TAU * policy_net.state_dict()[key].data + (1 - TAU) * target_net.state_dict()[key].data
        )

        if done:
            episode_durations.append(t + 1)
            rewards.append(total_reward)
            if (len(episode_loss) > 0):
              avg_loss = sum(episode_loss) / len(episode_loss)
            else:
              avg_loss = 0
            episode_avg_losses.append(avg_loss)
            break

    # Evaluation phase
    if i_episode % evaluation_interval == 0:
        round_rewards = []
        for episode in range(evaluation_episodes):
          round_reward = evaluate_agent(env, policy_net, device, episode)
          round_rewards.append(round_reward)
        avg_reward = sum(round_rewards) / len(round_rewards)
        print(f"Trained episode {i_episode: >5}. Avg reward in gameplay: {avg_reward} points")
        if avg_reward > 500: # If the agent gets an average reward of 500 points, we consider the environment solved, preventing overfitting.
          print(f"Environment solved in {i_episode} episodes!")
          break
        
print('Complete')

# Plot training results

In [None]:
def plot_rewards(rewards):
    rewards = [reward.cpu().item() for reward in rewards]
    plt.figure(figsize=(10, 5))
    plt.plot(rewards, label="Reward per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Reward")
    plt.title("Training Progress: Reward Per Epoch")
    plt.legend()
    plt.grid(True)
    plt.show()

def plot_loss(avg_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(avg_losses, label="Average Loss per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Loss")
    plt.title("Training Progress: Loss Per Episode")
    plt.legend()
    plt.grid(True)
    plt.show()

def plot_durations():
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Result')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

# Plot rewards
plot_durations()
plot_rewards(rewards)
plot_loss(episode_avg_losses)

# Saving model

In [None]:
# Save the model's state dictionary
torch.save(policy_net.state_dict(), "policy_net_v10_2840e.pth")
torch.save(target_net.state_dict(), "target_net_v10_2840e.pth")

# Download and upload to avoid constant training

In [None]:
from google.colab import files

# Download the files
files.download("policy_net_v4_200e.pth")
files.download("target_net.pth")

In [None]:
from google.colab import files

# Upload the model file
uploaded = files.upload() 

# The uploaded file will be saved in the Colab workspace
# Get the filename
filename = list(uploaded.keys())[0]
print(f"Uploaded file: {filename}")

# Evaluation

In [None]:
import ale_py
import gymnasium as gym
import torch
import os
from itertools import count
import numpy as np
import matplotlib.pyplot as plt

# Create and wrap the environment
gym.register_envs(ale_py)
env = gym.make('ALE/Pacman-ram-v5', render_mode="rgb_array")
env = gym.wrappers.RecordVideo(env, "videos", episode_trigger=lambda t: True)

# Initialize agent
agent = DQN(n_observations, n_actions).to(device)
agent.load_state_dict(torch.load("policy_net_v10_2840.pth"))
agent.eval()

# Plotting Function
def plot_rewards(rewards):
    plt.figure(figsize=(10, 6))
    plt.scatter(range(1, len(rewards) + 1), rewards, color='blue', label='Episode Reward')
    plt.xlabel("Episode", fontsize=14)
    plt.ylabel("Reward", fontsize=14)
    plt.title("Agent Evaluation: Rewards Per Episode", fontsize=16)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.axhline(np.mean(rewards), color='red', linestyle='--', label='Mean Reward')  # Add a mean line
    plt.legend(fontsize=12)
    plt.show()

# Evaluate Over Multiple Episodes
n_episodes = 10
rewards = []

for episode in range(n_episodes):
    reward = evaluate_agent(env, agent, device, episode, render=True) # Evaluate the agent over multiple episodes
    env.close() 
    rewards.append(reward)
    print(f"Episode {episode + 1}/{n_episodes}: Reward = {reward}")

log_metrics(rewards)
plot_rewards(rewards)

env.reset()

# Display video

In [None]:
!ls videos

In [None]:
from IPython.display import HTML
from base64 import b64encode
mp4 = open('videos/episode_7.mp4','rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)