# AIIR Project - AI Mario
This jupyter notebook contains the application of nueral network and reinforcement learning algorithms learnt from the tutorials to simulate Mario completing a variety of levels in a pybullet gym environment.


## Mario Environment
We use a Super Mario Bros environment (https://pypi.org/project/gym-super-mario-bros/) with a continuous state space and discrete action space. The goal of this activity is to complete Mario levels as fast as possible. Episodes end when Mario reaches the end of the level, if Mario dies, or if a certain time as elapsed.

### Action Space
- 0: No Movement
- 1: Move Right
- 2: Move Right + Jump
- 3: Move Right + Speed Up
- 4: Move Right + Jump + Speed Up
- 5: Jump
- 6: Move Left
- 7: Move Left + Jump
- 8: Move Left + Speed Up
- 9: Move Left + Jump + Speed Up
- 10: Down
- 11: Up

### Observation Space
The info dictionary returned by step contains the following:
| Key | Unit | Description |
| --- | ---- | ----------- |
| coins | int | Number of collected coins |
| flag_get | bool | True if Mario reached a flag |
| life | int | Number of lives left |
| score | int | Cumulative in-game score |
| stage | int | Current stage |
| status | str | Mario's status/power |
| time | int | Time left on the clock |
| world | int | Current world |
| x_pos | int | Mario's x position in the stage |
| y_pos | int | Mario's y position in the stage |

### Rewards
| Feature | Description | Value when Positive | Value when Negative | Value when Equal |
|---------|-------------|---------------------|---------------------|------------------|
| Difference in agent x values between states | Controls agent's movement | Moving right | Moving left | Not moving |
| Time difference in the game clock between frames | Prevents agent from staying still | - | Clock ticks | Clock doesn't tick |
| Death Penalty | Discourages agent from death | - | Agent dead | Agent alive |
| - | - | - | - | - |

## Installation Guide
For installing the Super Mario Bros gym environment package, use the following command using a python 3.8 kernel:

In [8]:
%pip install gym-super-mario-bros

Note: you may need to restart the kernel to use updated packages.


Importing the necessary packages and following helper function to display video runs within jupyter notebook

In [9]:
%pip install pybullet
%pip install pyvirtualdisplay
import os
os.environ['PYVIRTUALDISPLAY_DISPLAYFD'] = '0' 

import gym
import pybullet as p
import matplotlib.pyplot as plt
from pyvirtualdisplay import Display
from IPython.display import HTML
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import torch
import random

display = Display(visible=0, size=(400, 300))
display.start()

def display_video(frames, framerate=30):
  """Generates video from `frames`.

  Args:
    frames (ndarray): Array of shape (n_frames, height, width, 3).
    framerate (int): Frame rate in units of Hz.

  Returns:
    Display object.
  """
  height, width, _ = frames[0].shape
  dpi = 70
  orig_backend = matplotlib.get_backend()
  matplotlib.use('Agg')  # Switch to headless 'Agg' to inhibit figure rendering.
  fig, ax = plt.subplots(1, 1, figsize=(width / dpi, height / dpi), dpi=dpi)
  matplotlib.use(orig_backend)  # Switch back to the original backend.
  ax.set_axis_off()
  ax.set_aspect('equal')
  ax.set_position([0, 0, 1, 1])
  im = ax.imshow(frames[0])
  def update(frame):
    im.set_data(frame)
    return [im]
  interval = 1000/framerate
  anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                  interval=interval, blit=True, repeat=False)
  return HTML(anim.to_html5_video())

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


FileNotFoundError: [WinError 2] The system cannot find the file specified

## Hyperparameters

In [10]:
# Hyperparameters
EPISODES = 2500                  # Number of episodes to run for training
LEARNING_RATE = 0.0005         # Learning rate for optimizing neural network weights
MEM_SIZE = 50000                 # Maximum size of replay memory
REPLAY_START_SIZE = 10000        # Amount of samples to fill replay memory before training
BATCH_SIZE = 32                  # Number of samples to draw from replay memory for training
GAMMA = 0.99                     # Discount factor for future rewards
EPSILON_START = 0.1              # Starting exploration rate
EPSILON_END = 0.0001             # Final exploration rate
EPSILON_DECAY = 4 * MEM_SIZE     # Decay rate for exploration rate
MEM_RETAIN = 0.1                 # Percentage of memory to retain on each episode
NETWORK_UPDATE_ITERS = 5000      # Number of steps to update target network

FC1_DIMS = 128                   # Number of neurons in our MLP's first hidden layer
FC2_DIMS = 128                   # Number of neurons in our MLP's second hidden layer

# Metrics for displaying training status
best_reward = 0
average_reward = 0
episode_history = []
episode_reward_history = []
np.bool = np.bool_

## Neural Network
Below is the class definition for a neural network used to approximate Q-values for the use within a reinforcement learning framework.

In [11]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Neural network class used to approximate Q-values within the Mario environment
class Network(torch.nn.Module):
    def __init__(self, env):
        super(Network, self).__init__()  # Inheriting from torch.nn.Module

        self.input_shape = env.observation_space.shape  # Getting shape of observation space
        self.action_space = env.action_space.n          # Getting number of actions in action space

        # Defining the layers of the neural network
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(*self.input_shape, FC1_DIMS),   # 
            torch.nn.ReLU(),                                # 
            torch.nn.Linear(FC1_DIMS, FC2_DIMS),            #
            torch.nn.ReLU(),                                #
            torch.nn.Linear(FC2_DIMS, self.action_space)    #
        )

        self.optimizer = optim.Adam(self.parameters(), lr=LEARNING_RATE)  # Optimizer
        self.loss = nn.MSELoss()  # Loss Function

    def forward(self, x):
        return self.layers(x)  # Forward pass through the network

## Reinforcement Learning Framework

In [12]:
# handles the storing and retrival of sampled experiences
class ReplayBuffer:
    def __init__(self, env):
        self.mem_count = 0
        self.states = np.zeros((MEM_SIZE, *env.observation_space.shape),dtype=np.float32)
        self.actions = np.zeros(MEM_SIZE, dtype=np.int64)
        self.rewards = np.zeros(MEM_SIZE, dtype=np.float32)
        self.states_ = np.zeros((MEM_SIZE, *env.observation_space.shape),dtype=np.float32)
        self.dones = np.zeros(MEM_SIZE, dtype=np.bool)

    def add(self, state, action, reward, state_, done):
        # if memory count is higher than the max memory size then overwrite previous values
        if self.mem_count < MEM_SIZE:
            mem_index = self.mem_count
        else:
            ############ avoid catastropic forgetting - retain initial 10% of the replay buffer ##############
            mem_index = int(self.mem_count % ((1-MEM_RETAIN) * MEM_SIZE) + (MEM_RETAIN * MEM_SIZE))
            ##################################################################################################

        self.states[mem_index]  = state[0]
        self.actions[mem_index] = action
        self.rewards[mem_index] = reward
        self.states_[mem_index] = state_
        self.dones[mem_index] =  1 - done

        self.mem_count += 1

    # returns random samples from the replay buffer, number is equal to BATCH_SIZE
    def sample(self):
        MEM_MAX = min(self.mem_count, MEM_SIZE)
        batch_indices = np.random.choice(MEM_MAX, BATCH_SIZE, replace=True)

        states  = self.states[batch_indices]
        actions = self.actions[batch_indices]
        rewards = self.rewards[batch_indices]
        states_ = self.states_[batch_indices]
        dones   = self.dones[batch_indices]

        return states, actions, rewards, states_, dones

class DQN_Solver:
    def __init__(self, env):
        self.memory = ReplayBuffer(env)
        self.policy_network = Network(env)  # Q
        self.target_network = Network(env)  # \hat{Q}
        self.target_network.load_state_dict(self.policy_network.state_dict())  # initially set weights of Q to \hat{Q}
        self.learn_count = 0    # keep track of the number of iterations we have learnt for

    # epsilon greedy
    def choose_action(self, observation):
        # only start decaying epsilon once we actually start learning, i.e. once the replay memory has REPLAY_START_SIZE
        if self.memory.mem_count > REPLAY_START_SIZE:
            eps_threshold = EPS_END + (EPS_START - EPS_END) * \
                math.exp(-1. * self.learn_count / EPS_DECAY)
        else:
            eps_threshold = 1.0
        # if we rolled a value lower than epsilon sample a random action
        if random.random() < eps_threshold:
            return np.random.choice(np.array(range(12)), p=[0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.05,0.05,0.05,0.05])    # sample random action with set priors (if we flap too much we will die too much at the start and learning will take forever)

        # otherwise policy network, Q, chooses action with highest estimated Q-value so far
        state = torch.tensor(observation).float().detach()
        state = state.unsqueeze(0)
        self.policy_network.eval()  # only need forward pass
        with torch.no_grad():       # so we don't compute gradients - save memory and computation
            ################ retrieve q-values from policy network, Q ################################
            q_values = self.policy_network(state)
            ##########################################################################################
        return torch.argmax(q_values).item()

    # main training loop
    def learn(self):
        states, actions, rewards, states_, dones = self.memory.sample()  # retrieve random batch of samples from replay memory
        states = torch.tensor(states , dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.long)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        states_ = torch.tensor(states_, dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.bool)
        batch_indices = np.arange(BATCH_SIZE, dtype=np.int64)

        self.policy_network.train(True)
        q_values = self.policy_network(states)                # get current q-value estimates (all actions) from policy network, Q
        q_values = q_values[batch_indices, actions]           # q values for sampled actions only

        self.target_network.eval()                            # only need forward pass
        with torch.no_grad():                                 # so we don't compute gradients - save memory and computation
            ###### get q-values of states_ from target network, \hat{q}, for computation of the target q-values ###############
            q_values_next = ...
            ###################################################################################################################

        q_values_next_max = torch.max(q_values_next, dim=1)[0]  # max q values for next state

        q_target = rewards + GAMMA * q_values_next_max * dones  # our target q-value

        ###### compute loss between target (from target network, \hat{Q}) and estimated q-values (from policy network, Q) #########
        loss = self.policy_network.loss(q_values, q_target)
        ###########################################################################################################################

        #compute gradients and update policy network Q weights
        self.policy_network.optimizer.zero_grad()
        loss.backward()
        self.policy_network.optimizer.step()
        self.learn_count += 1

        # set target network \hat{Q}'s weights to policy network Q's weights every C steps
        if  self.learn_count % NETWORK_UPDATE_ITERS == NETWORK_UPDATE_ITERS - 1:
            print("updating target network")
            self.update_target_network()

    def update_target_network(self):
        self.target_network.load_state_dict(self.policy_network.state_dict())

    def returning_epsilon(self):
        return self.exploration_rate

## Training the AI
The following code uses the above Neural Network and Reinforcement Learning framework to train the AI to play Super Mario Bros on a variety of its levels.

In [13]:
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym_super_mario_bros.actions import COMPLEX_MOVEMENT
import gym

env = gym_super_mario_bros.make('SuperMarioBrosRandomStages-v0', apply_api_compatibility=True, render_mode="rgb_array")
env = JoypadSpace(env, COMPLEX_MOVEMENT)

env.action_space.seed(0)
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
episode_batch_score = 0
episode_reward = 0
agent = DQN_Solver(env)  # create DQN agent
plt.clf()
state = env.reset()

for i in range(EPISODES):
    state = env.reset()  # this needs to be called once at the start before sending any actions
    print("########################episode no:", i)
    print(state)
    done = False
    steps = 0
    state = state[0]
    while not done and steps < 200:
        # sampling loop - sample random actions and add them to the replay buffer
        action = agent.choose_action(state)
        state_, reward, done, _,info = env.step(action)
        agent.memory.add(state, action, reward, state_, done)

        # only start learning once replay memory reaches REPLAY_START_SIZE
        if agent.memory.mem_count > REPLAY_START_SIZE:
            print('--------------LEARNING')
            agent.learn()

        state = state_
        episode_batch_score += reward
        episode_reward += reward
        steps += 1

        if done:
            break
    
    episode_history.append(i)
    episode_reward_history.append(episode_reward)
    episode_reward = 0.0

    # save our model every batches of 100 episodes so we can load later. (note: you can interrupt the training any time and load the latest saved model when testing)
    if i % 100 == 0 and agent.memory.mem_count > REPLAY_START_SIZE:
        torch.save(agent.policy_network.state_dict(), "C:/Users/61420/Documents/GitHub/MARIO/AIIR-Project-main/policy_network.pkl")
        print("average total reward per episode batch since episode ", i, ": ", episode_batch_score/ float(100))
        episode_batch_score = 0
    elif agent.memory.mem_count < REPLAY_START_SIZE:
        print("waiting for buffer to fill...")
        print("@@@@@@@@@@@@@@@@memory_storage = ", agent.memory.mem_count)
        episode_batch_score = 0

plt.plot(episode_history, episode_reward_history)
plt.show()



  logger.warn(
  logger.warn(


AssertionError: Torch not compiled with CUDA enabled

In [None]:
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym_super_mario_bros.actions import COMPLEX_MOVEMENT
import gym


if TRAIN:
    env = gym_super_mario_bros.make('SuperMarioBrosRandomStages-v0', apply_api_compatibility=True, render_mode="rgb_array")
    env = JoypadSpace(env, COMPLEX_MOVEMENT)
    # set manual seeds so we get same behaviour everytime - so that when you change your hyper parameters you can attribute the effect to those changes
    env.action_space.seed(0)
    random.seed(0)
    np.random.seed(0)
    torch.manual_seed(0)
    episode_batch_score = 0
    episode_reward = 0
    agent = DQN_Solver(env)  # create DQN agent
    plt.clf()

    for i in range(EPISODES):
        state = env.reset()  # this needs to be called once at the start before sending any actions
        while True:
            # sampling loop - sample random actions and add them to the replay buffer
            action = agent.choose_action(state)
            state_, reward, done,_, info = env.step(action)
            

            ####### add sampled experience to replay buffer ##########
            agent.memory.add(state, action, reward, state_, done)
            ##########################################################

            # only start learning once replay memory reaches REPLAY_START_SIZE
            if agent.memory.mem_count > REPLAY_START_SIZE:
                agent.learn()

            state = state_
            episode_batch_score += reward
            episode_reward += reward

            if done:
                break

        episode_history.append(i)
        episode_reward_history.append(episode_reward)
        episode_reward = 0.0

        # save our model every batches of 100 episodes so we can load later. (note: you can interrupt the training any time and load the latest saved model when testing)
        if i % 100 == 0 and agent.memory.mem_count > REPLAY_START_SIZE:
            torch.save(agent.policy_network.state_dict(), "C:/Users/61420/Documents/GitHub/MARIO/AIIR-Project-main/policy_network.pkl")
            print("average total reward per episode batch since episode ", i, ": ", episode_batch_score/ float(100))
            episode_batch_score = 0
        elif agent.memory.mem_count < REPLAY_START_SIZE:
            print("waiting for buffer to fill...")
            episode_batch_score = 0

    plt.plot(episode_history, episode_reward_history)
    plt.show()

## Testing the AI
The following code tests the AI policy generated during training in random Super Mario Bros levels.

In [None]:
plt.plot(episode_history, episode_reward_history)
plt.show()

agent = DQN_Solver(env)
agent.policy_network.load_state_dict(torch.load("C:/Users/61420/Documents/GitHub/MARIO/AIIR-Project-main/policy_network.pkl"))
frames = []
state = env.reset()
agent.policy_network.eval()

while True:
    with torch.no_grad():
        q_values = agent.policy_network(torch.tensor(state, dtype=torch.float32))
    action = torch.argmax(q_values).item() # select action with highest predicted q-value
    state, reward, done, info = env.step(action)
    frames.append(np.fliplr(np.rot90(env.render(mode="rgb_array"), 3)))
    if done:
        break

env.close()

env.close()
display_video(frames)

