## Installation Guide
For installing the Super Mario Bros gym environment package, as well as the required packages:

In [None]:
%pip install gym-super-mario-bros

In [1]:
import gym
import pybullet as p
import matplotlib.pyplot as plt
from pyvirtualdisplay import Display
from IPython.display import HTML
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import torch
import random
import math
import os
os.environ['PYVIRTUALDISPLAY_DISPLAYFD'] = '0' 

display = Display(visible=0, size=(400, 300))
display.start()

# Function to display the testing video of the agent in the juypyter notebook
def display_video(frames, framerate=30):
  """Generates video from `frames`.

  Args:
    frames (ndarray): Array of shape (n_frames, height, width, 3).
    framerate (int): Frame rate in units of Hz.

  Returns:
    Display object.
  """
  height, width, _ = frames[0].shape
  dpi = 70
  orig_backend = matplotlib.get_backend()
  matplotlib.use('Agg')  # Switch to headless 'Agg' to inhibit figure rendering.
  fig, ax = plt.subplots(1, 1, figsize=(width / dpi, height / dpi), dpi=dpi)
  matplotlib.use(orig_backend)  # Switch back to the original backend.
  ax.set_axis_off()
  ax.set_aspect('equal')
  ax.set_position([0, 0, 1, 1])
  im = ax.imshow(frames[0])
  def update(frame):
    im.set_data(frame)
    return [im]
  interval = 1000/framerate
  anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                  interval=interval, blit=True, repeat=False)
  return HTML(anim.to_html5_video())

pybullet build time: Nov 28 2023 23:51:11


## Hyperparameters

In [None]:
EPISODES = 50000                # Number of episodes to train the AI on
SAVE_INTERVAL = 5000            # Interval of episodes until model is saved
MEM_SIZE = 100000               # Size of the memory in replay buffer
REPLAY_START_SIZE = 10000       # Amount of samples to fill the replay buffer before training
EPSILON_START = 0.1             # Starting exploration rate
EPSILON_END = 0.0001            # Ending exploration rate
EPSILON_DECAY = 4 * MEM_SIZE    # Rate at which exploration rate decays
BATCH_SIZE = 32                 # Size of random batches when sampling experiences
MEM_RETAIN = 0.1                # Size of memory that cannot be overwritten (avoids catastrophic forgetting)
LEARNING_RATE = 0.00025         # Learning rate for optimizing neural network weights
NETWORK_UPDATE_ITERS = 5000     # Number of iterations before learning func updates the Q weights
GAMMA = 0.9                     # Discount factor for future rewards
DQN_DIM1 = 256                  # Number of neurons in DQN's first hidden layer
DQN_DIM2 = 256                  # Number of neurons in DQN's second hidden layer

## Neural Network

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

# Neural network class comprised of CNN and DQN to approximate Q-values for reinforcement learning
class NeuralNetwork(nn.Module):
    # Constructor for Neural Network class
    def __init__(self, env):
        super().__init__()  # Inheriting from torch.nn.Module constructor

        # Getting the input and output shapes for the neural network layers
        self.input_shape = env.observation_shape.shape
        self.output_shape = env.action_space.n

        # Defining the layers of the Neural Network
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(*self.input_shape, DQN_DIM1),
            torch.nn.ReLU(),
            torch.nn.Linear(DQN_DIM1, DQN_DIM2),
            torch.nn.ReLU(),
            torch.nn.Linear(DQN_DIM2, self.output_shape)
        )

        self.optimizer = optim.Adam(self.parameters(), lr=LEARNING_RATE)
        self.loss = nn.MSELoss()  # Loss function

    # Foward pass through the layers of the Neural Network
    def forward(self, x):
        return self.layers(x)

## Replay Buffer

In [None]:
# Replay Buffer class for storing and retrieving sampled experiences
class ReplayBuffer:
    # Constructor for Replay Buffer class
    def __init__(self, env):
        # Initialising memory count and creating arrays to store experiences
        self.mem_count = 0
        self.states = np.zeros((MEM_SIZE, *env.observation_space.shape), dtype=np.float32)
        self.actions = np.zeros(MEM_SIZE, dtype=np.int64)
        self.rewards = np.zeros(MEM_SIZE, dtype=np.float32)
        self.states_ = np.zeros((MEM_SIZE, *env.observation_space.shape), dtype=np.float32)
        self.dones = np.zeros(MEM_SIZE, dtype=np.bool)

    # Function to add experiences to the memory buffer
    def add(self, state, action, reward, state_, done):
        # If the memory count is at its max size, overwrite previous values
        if self.mem_count < MEM_SIZE:
            mem_index = self.mem_count  # Using mem_count if less than max memory size
        else:
            # Avoiding catastrophic forgetting - retrain initial 10% of the replay buffer
            mem_index = int(self.mem_count % ((1-MEM_RETAIN) * MEM_SIZE) + (MEM_RETAIN * MEM_SIZE))

        # Adding the states to the replay buffer memory
        self.states[mem_index] = state
        self.actions[mem_index] = action
        self.rewards[mem_index] = reward
        self.states_[mem_index] = state_
        self.dones[mem_index] = 1 - done
        self.mem_count += 1  # Incrementing memory count

    # Function to sample random batch of experiences
    def sample(self):
        MEM_MAX = min(self.mem_count, MEM_SIZE)
        batch_indices = np.random.choice(MEM_MAX, BATCH_SIZE, replace=True)

        states = self.states[batch_indices]
        actions = self.actions[batch_indices]
        rewards = self.rewards[batch_indices]
        states_ = self.rewards[batch_indices]
        dones = self.dones[batch_indices]

        # Returning the random sampled experiences
        return states, actions, rewards, states_, dones

## Reinforcement Learning

In [None]:
# Reinforcement Learning class
class ReinforcementLearning:
    # Constructor for Reinforcement Learning class
    def __init__(self, env):
        self.memory = ReplayBuffer(env)  # Creating replay buffer
        self.policy_network = NeuralNetwork(env)  # Q
        self.target_network = NeuralNetwork(env)  # \hat{Q}
        self.target_network.load_state_dict(self.policy_network.state_dict())  # Initially set weights of Q to \hat{Q}
        self.learn_count = 0  # Tracking number of learning iterations

    # Epsilon-greedy policy
    def choose_action(self, observation):
        # Only start decaying the epsilon once we start learning
        if self.memory.mem_count > REPLAY_START_SIZE:
            eps_threshold = EPSILON_END + (EPSILON_START - EPSILON_END) * \
                math.exp(-1. * self.learn_count / EPSILON_DECAY)
        else:
            eps_threshold = 1.0

        # If we rolled a value lower than the epsilon sample a random action
        if random.random() < eps_threshold:
            return np.random.choice(np.array(range(12)), p=[0.05, 0.1, 0.1, 0.1, 0.1, 0.05, 0.1, 0.1, 0.1, 0.1, 0.05, 0.05])  # Random action with set priors
        
        # Otherwise policy network (Q) chooses action with highest estimated Q value so far
        state = torch.tensor(observation).float().detach()
        state = state.unsqueeze(0)
        self.policy_network.eval()
        with torch.no_grad():
            q_values = self.policy_network(state)  # Get Q-values from policy network

        return torch.argmax(q_values).item()

    # Main training/learning loop
    def learn(self):
        # Sampling a random batch of experiences and converting them to tensors
        states, actions, rewards, states_, dones = self.memory.sample()
        states = torch.tensor(states, dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.long)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        states_ = torch.tensor(states_, dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.bool)
        batch_indices = np.arrange(BATCH_SIZE, dtype=np.int64)

        self.policy_network.train(True)  # Training the neural network
        q_values = self.policy_network(states)  # Getting predicted Q-values from neural network
        q_values = q_values[batch_indices, actions]  # Getting the Q-values for the sampled experience

        self.target_network.eval()
        with torch.no_grad():
            q_values_next = self.target_network(states_)  # Getting Q-values from target network

        q_values_next_max = torch.max(q_values_next, dim=1)[0]  # Getting max Q-values for next state
        q_target = rewards + GAMMA * q_values_next_max * dones  # Getting target Q-values

        loss = self.policy_network.loss(q_values, q_target)  # Calcualting the loss from target and pred Q-values

        # Computing the gradients and updating Q weights
        self.policy_network.optimizer.zero_grad()
        loss.backward()
        self.policy_network.optimizer.step()  # Updating Q weights
        self.learn_count += 1  # Incrementing learning count

        # Set target network weights to policy network weights every set increment of learning steps
        if self.learn_count % NETWORK_UPDATE_ITERS == NETWORK_UPDATE_ITERS - 1:
            print("Updating target network")
            self.update_target_network()

    # Function to synchronize the weights of the target network with the policy network
    def update_target_network(self):
        self.target_network.load_state_dict(self.policy_network.state_dict())

    # Function to return the exploration rate (epsilon) of the agent
    def returning_epsilon(self):
        return self.exploration_rate

## Training

In [None]:
# Function to apply additional rewards that aren't in the environment already
def reward_shaping(prev_info, info):
    shapedReward = 0  # Container to store the additional reward
    reward_values = {  # Container to store keys for rewards
        'coins': 1,
        'score': lambda previous, current: current - previous,
        'flag_get': 50,
        'powerup': lambda previous, current: 10 if current > previous else 0
    }

    # Applying the reward values to the shaped reward
    for key, reward in reward_values.items():
        prev_value = prev_info.get(key, 0)  # Getting the previous info values for keys
        curr_value = info.get(key, 0)       # Getting the current info values for keys

        # If the reward is a function, apply the function to the previous and current values
        if callable(reward):
            shapedReward += reward(prev_value, curr_value)

        # Otherwise, apply the reward value to the shaped reward
        elif curr_value > prev_value:
            shapedReward += reward

    return shapedReward  # Return the shaped reward

In [None]:
import gym_super_mario_bros
from gym_super_mario_bros.actions import COMPLEX_MOVEMENT
from nes_py.wrappers import JoypadSpace

# Checking if GPU is available
if torch.cuda.is_available():
    print("Using CUDA device:", torch.cuda.get_device_name(0))
else:
    print("CUDA is not available")

# Loading the Super Mario Bros gym environment and initialising joypad type
env = gym_super_mario_bros.make('SuperMarioBros-v0', render_mode='rgb', apply_api_compatibility=True)
env = JoypadSpace(env, COMPLEX_MOVEMENT)
env.reset()  # Resetting/Activating the environment
agent = ReinforcementLearning(env)
plt.clf()  # Clearing previous plot

# Metrics for displaying training status
step_count = 0
best_reward = 0
average_reward = 0
episode_reward = 0
episode_batch_score = 0
episode_history = []
episode_reward_history = []
np.bool = np.bool_

# Looping through the episodes to train the model
for episode in range(EPISODES):
    done = False  # Setting default done state
    state, info = env.reset()  # Resetting environment and getting state
    
    # Running the episode until done or max steps reached
    while not done:
        # Sampling random actions and adding to the replay buffer
        action = agent.choose_action(state)
        state_, reward, done, trunc, info = env.step(action)
        agent.memory.add(state, action, reward, state_, done)  # Add experience to replay buffer

        # Only start learning once replay memory has reached set number of samples
        if agent.memory.mem_count > REPLAY_START_SIZE:
            agent.learn()

        state = state_  # Updating current state
        episode_batch_score += reward  # Updating batch reward
        episode_reward += reward  # Updating episode reward

    # Appending episode and associated reward to history
    episode_history.append(episode)
    episode_reward_history.append(episode_reward)
    episode_reward = 0  # Resetting episode reward

    # Saving model every batches of 100 episodes
    if episode % 100 == 0 and agent.memory.mem_count > REPLAY_START_SIZE:
        save_path = os.path.join(os.getcwd(), "policy_network.pkl")
        torch.save(agent.policy_network.state_dict(), save_path)
        print("average total reward per episode batch since episode ", episode, ": ", episode_batch_score/ float(100))
        episode_batch_score = 0
    elif agent.memory.mem_count < REPLAY_START_SIZE:
        print("waiting for buffer to fill...")
        episode_batch_score = 0

# Plotting the episode history and reward history
plt.plot(episode_history, episode_reward_history)
plt.show()

## Testing