## Deep Neuroevolution Genetic Algorithm Approach to Super Mario Bros.

This version randomizes the world and stage order of the game while running each agent for multiple episodes each generation to specifically evolve for a better general Neural Mario.

In [1]:
# Boolean check
COLAB = 'google.colab' in str(get_ipython())

## Google Colab specific code

In [2]:
if COLAB:
    !pip install gym-super-mario-bros
    !mkdir Output
    !mkdir .video

In [3]:
if COLAB:
    # Connect Google Drive
    from google.colab import drive
    #drive.mount('/content/gdrive')
    drive.mount('/content/gdrive', force_remount=True)
    print('Google Drive connected.')

In [4]:
if COLAB:
    # Copy over the fittest NeuralMario
    !cp "gdrive/MyDrive/SMB Genetic Random/Output/fittestMario.pkl" -r "Output"

In [5]:
if COLAB:
    # Create a virtual display for video rendering on the headless server.
    !apt-get install -y xvfb python-opengl > /dev/null 2>&1
    !pip install gym pyvirtualdisplay > /dev/null 2>&1
    from pyvirtualdisplay import Display
    display = Display(visible=0, size=(400, 400))

## Regular Code

In [6]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import time

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import math
import copy
import sys
import os
import pickle
import time

import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from gym.wrappers import Monitor
action_count = 7 # SIMPLE_MOVEMENT
import cv2

import multiprocessing
from itertools import repeat

from mutate import mutate
import model

### Model Definitions

In [7]:
"""
# Model Definitions
class Flatten(torch.nn.Module):
    def forward(self, x):
        return x.view(x.size()[0], -1)

class NeuralMario(nn.Module):
        def __init__(self, action_count):
            super().__init__()
            self.cuda()
            self.fc = nn.Sequential(
                        nn.Conv2d(4, 8, 3, bias=True),
                        nn.ReLU(inplace=True),
                        nn.Conv2d(8, 6, 3, bias=True),
                        nn.ReLU(inplace=True),
                        Flatten(),
                        nn.Linear(4704, 32, bias=True),
                        nn.ReLU(inplace=True),
                        nn.Linear(32, action_count, bias=True),
                        nn.Softmax(dim=1)
                        )


        def forward(self, inputs):
            x = self.fc(inputs)
            return x

def init_weights(m):

        # nn.Conv2d weights are of shape [16, 1, 3, 3] i.e. # number of filters, 1, stride, stride
        # nn.Conv2d bias is of shape [16] i.e. # number of filters

        # nn.Linear weights are of shape [32, 24336] i.e. # number of input features, number of output features
        # nn.Linear bias is of shape [32] i.e. # number of output features

        if ((type(m) == nn.Linear) | (type(m) == nn.Conv2d)):
            torch.nn.init.xavier_uniform(m.weight)
            m.bias.data.fill_(0.00)
"""

'\n# Model Definitions\nclass Flatten(torch.nn.Module):\n    def forward(self, x):\n        return x.view(x.size()[0], -1)\n\nclass NeuralMario(nn.Module):\n        def __init__(self, action_count):\n            super().__init__()\n            self.cuda()\n            self.fc = nn.Sequential(\n                        nn.Conv2d(4, 8, 3, bias=True),\n                        nn.ReLU(inplace=True),\n                        nn.Conv2d(8, 6, 3, bias=True),\n                        nn.ReLU(inplace=True),\n                        Flatten(),\n                        nn.Linear(4704, 32, bias=True),\n                        nn.ReLU(inplace=True),\n                        nn.Linear(32, action_count, bias=True),\n                        nn.Softmax(dim=1)\n                        )\n\n\n        def forward(self, inputs):\n            x = self.fc(inputs)\n            return x\n\ndef init_weights(m):\n\n        # nn.Conv2d weights are of shape [16, 1, 3, 3] i.e. # number of filters, 1, stride, stride\n

### Environment Handling

In [8]:
# Environment Handling
def convert_image(input_image):
    image = cv2.cvtColor(input_image, cv2.COLOR_RGB2GRAY)
    return cv2.resize(image, (32,32))

def run_agent(agent, worlds, stages, agent_num, num_agents, rendering=False, monitoring=False, print_reward=False, episodes=2):
    
    global_reward = 0
    
    for i in range(episodes):
        episode_reward = 0
        
        y = str(worlds[i])
        z = str(stages[i])
        env = gym_super_mario_bros.make("SuperMarioBros-" + y + "-" + z + "-v0")
        
        env = JoypadSpace(env, SIMPLE_MOVEMENT)

        if monitoring:
            env = Monitor(env, './video', force=True)

        env.seed(42)

        agent.eval()
        
        state = env.reset()
        if rendering:
            env.render()

        #Conv2d without flatten()
        state = convert_image(state)#.flatten()
        state_list = [state, state, state, state]
        position = -1

        s=0
        
        while True:
            #Conv2d input
            input = torch.from_numpy(np.array(state_list)).type('torch.FloatTensor')\
                .unsqueeze(0)

            #Linear input
            #input = torch.tensor(state_list).type("torch.FloatTensor").view(1,-1)

            output_probabilities = agent(input).detach().numpy()[0]
            action = np.random.choice(range(action_count), 1, \
                p=output_probabilities).item()
            try:
                new_state, reward, done, info = env.step(action)
            except:
                break
            episode_reward += reward

            s=s+1
            if rendering:
                env.render()

            state_list.pop()
            #Conv2d without flatten()
            state_list.append(convert_image(new_state))#.flatten())

            # if mario gets stuck, it gets punished and the loop gets broken
            if position == info["x_pos"]:
                stuck += 1
                if stuck == 100:
                    episode_reward -= 100
                    break
            else:
                stuck = 0

            position = info["x_pos"]
            #env.render()
            #Mario died
            if info["life"] < 2:
                break
                
        print("Agent: " + str(agent_num+1) + "/" + str(num_agents) +\
              "\tEpisode: " + str(i+1) + "/" + str(episodes) + "\tReward: "\
              + str(episode_reward) + "\t", end = '\r')
        
        global_reward += episode_reward
    
    mean_reward = global_reward / episodes
    
    if print_reward:
        print("Total:" + str(global_reward) + "\tMean: " + str(mean_reward))

    return mean_reward


In [9]:
def run_agents_n_times(agents, runs, random=True):
    avg_score = []
    worlds = []
    stages = []
    num_agents = len(agents)
    
    if random:
        for i in range(runs):
            worlds.append(np.random.randint(1, 9))
            stages.append(np.random.randint(1, 5))
    else:
        for i in range(8):
            for j in range(4):
                worlds.append(i+1)
                stages.append(j+1)
        
    for i, agent in enumerate(agents):
        avg_score.append(run_agent(agent, episodes=runs, worlds=worlds, stages=stages, agent_num=i, num_agents=num_agents))
        
    return avg_score


def return_random_agents(num_agents):
    agents = []
    for _ in range(num_agents):

        agent = NeuralMario(action_count)

        for param in agent.parameters():
            param.requires_grad = False

        init_weights(agent)
        agents.append(agent)

    return agents

In [10]:
def test_agent(agent, rendering=True, monitoring=True, print_reward=True):

    env = gym_super_mario_bros.make("SuperMarioBros-v0")
    env = JoypadSpace(env, SIMPLE_MOVEMENT)

    if monitoring:
        env = Monitor(env, './video', force=True)

    env.seed(42)

    agent.eval()
        
    state = env.reset()
    if rendering:
        env.render()

    #Conv2d without flatten()
    state = convert_image(state)#.flatten()
    state_list = [state, state, state, state]
    position = -1
    
    global_reward = 0
    s=0
        
    for _ in range(30000):
        #Conv2d input
        input = torch.from_numpy(np.array(state_list)).type('torch.FloatTensor')\
                .unsqueeze(0)

        #Linear input
        #input = torch.tensor(state_list).type("torch.FloatTensor").view(1,-1)

        output_probabilities = agent(input).detach().numpy()[0]
        action = np.random.choice(range(action_count), 1, \
            p=output_probabilities).item()
        try:
            new_state, reward, done, info = env.step(action)
        except:
            break
        global_reward += reward

        s=s+1
        if rendering:
            env.render()

        state_list.pop()
        #Conv2d without flatten()
        state_list.append(convert_image(new_state))#.flatten())

        # if mario gets stuck, it gets punished and the loop gets broken
        if position == info["x_pos"]:
            stuck += 1
            if stuck == 100:
                global_reward -= 100
                break
        else:
            stuck = 0

        position = info["x_pos"]
        #env.render()
        #Mario died
        if info["life"] < 2:
            break

    return global_reward


In [11]:
'''
# Mutate, as a worker function in this version, must be imported for Windows
def mutate(agent, power=0.2, chance=0.02):
    # Simple method to add gaussian noise to the agents

    child_agent = copy.deepcopy(agent)
    
    """
    gpu = False
    if torch.cuda.is_available():
        gpu = True
        child_agent.to("cuda:0")
    """

    mutation_power = power #hyper-parameter, 0.2 set from https://arxiv.org/pdf/1712.06567.pdf
    
    # Current checks for mutation: 33189
    mutation_chance = chance
    
    for param in child_agent.parameters():
        
        if(len(param.shape)==4): #weights of Conv2D
            for i0 in range(param.shape[0]):
                for i1 in range(param.shape[1]):
                    for i2 in range(param.shape[2]):
                        if np.random.random(1)[0] <= mutation_chance:
                            for i3 in range(param.shape[3]):
                                param[i0][i1][i2][i3]+= mutation_power * np.random.randn()
                        else:
                            pass

        elif(len(param.shape)==2): #weights of linear layer
            for i0 in range(param.shape[0]):
                for i1 in range(param.shape[1]):
                    if np.random.random(1)[0] <= mutation_chance:
                        param[i0][i1]+= mutation_power * np.random.randn()
                    else:
                        pass

        elif(len(param.shape)==1): #biases of linear layer or conv layer
            for i0 in range(param.shape[0]):
                if np.random.random(1)[0] <= mutation_chance:
                    param[i0]+=mutation_power * np.random.randn()
                else:
                    pass
    
    """
    if gpu:
        child_agent.to("cpu")
    """
    
    return child_agent
'''

'\n# Mutate, as a worker function in this version, must be imported for Windows\ndef mutate(agent, power=0.2, chance=0.02):\n    # Simple method to add gaussian noise to the agents\n\n    child_agent = copy.deepcopy(agent)\n    \n    """\n    gpu = False\n    if torch.cuda.is_available():\n        gpu = True\n        child_agent.to("cuda:0")\n    """\n\n    mutation_power = power #hyper-parameter, 0.2 set from https://arxiv.org/pdf/1712.06567.pdf\n    \n    # Current checks for mutation: 33189\n    mutation_chance = chance\n    \n    for param in child_agent.parameters():\n        \n        if(len(param.shape)==4): #weights of Conv2D\n            for i0 in range(param.shape[0]):\n                for i1 in range(param.shape[1]):\n                    for i2 in range(param.shape[2]):\n                        if np.random.random(1)[0] <= mutation_chance:\n                            for i3 in range(param.shape[3]):\n                                param[i0][i1][i2][i3]+= mutation_power * n

In [12]:
def return_children(agents, sorted_parent_indexes, elite_count, mutation_power, mutation_chance, multiprocess=False):
    """ Returning [N-elite_count] mutated agents from sorted_parent_indexes and
        keeping the best [elite_count] agents unchanged
    """
    
    print("Mutating " + str(elite_count) + " elite agent(s) into the next generation.", end='\r')

    if __name__ ==  '__main__': 
        if multiprocess:
            print("Multiprocessing")

            iterable = zip(agents, repeat(mutation_power), repeat(mutation_chance))
            with multiprocessing.Pool() as pool:
                children_agents = pool.map(mutate(), iterable)
            
    else:
        children_agents = []
        for i in range(len(agents)-elite_count):
            selected_agent_index = sorted_parent_indexes[np.random.randint(len(sorted_parent_indexes))]
            children_agents.append(mutate(agents[selected_agent_index], mutation_power, mutation_chance))
    
    
    elite_children = []
    for i in range(elite_count):
        elite_children.append(agents[sorted_parent_indexes[i]])

    children_agents.extend(elite_children)

    return children_agents

def return_hot_start(elite_agent, num_agents, mutation_power, mutation_chance, multiprocess):
    """ Mutate from a single saved elite agent, or fittestMario, in order to
        start evolving a generation immediately without waiting for a single
        generation of elite copies to process.
        
        Only recommended when only a single elite agent is preserved across
        generations.
    """
    if multiprocess:
        print("Multiprocessing")
        
        agents = []
        
        for i in range(num_agents-1):
            agents.append(elite_agent)
        
        with multiprocessing.Pool() as pool:
            iterable = zip(agents, repeat(mutation_power), repeat(mutation_chance))
            children_agents = pool.map(mutate, iterable)
            
    else:
        children_agents = []
    
        for i in range(num_agents-1):
            children_agents.append(mutate(elite_agent, mutation_power, mutation_chance))

    children_agents.extend([elite_agent])
    
    return children_agents
    

In [13]:
def main(elite_agent, mutation_power, mutation_chance, episodes, agents=10, elites=1, show_top=2, generations=100, hot_start=False, random=True, multiprocess=False):
    torch.set_grad_enabled(False)
    num_agents = agents
    elite_count = elites
    top_limit_count = show_top
    n_episodes = episodes

    generation_count = generations

    if (elite_agent != None) and (not hot_start):
        print("Loaded " + str(num_agents) + " elite copies.")
        agents = [elite_agent]*num_agents
    elif (elite_agent != None) and hot_start:
        print("Hot start enabled. Mutating loaded elite " + str(num_agents-1) + " times.")
        agents = return_hot_start(elite_agent, num_agents, mutation_power, mutation_chance, multiprocess)
    elif (elite_agent == None) and hot_start:
        print("Hot start enabled without required elite agent.")
        print("Creating " + str(num_agents) + " random agents.")
        agents = return_random_agents(num_agents)
    else:
        print("Creating " + str(num_agents) + " random agents.")
        agents = return_random_agents(num_agents)
    
    for generation in range(generation_count):
        print("############## Generation {} ##############".format(generation+1))
        
        rewards = run_agents_n_times(agents, n_episodes, random)

        sorted_parent_indexes = np.argsort(rewards)[::-1][:top_limit_count]
        
        top_rewards = []
        for best_parent in sorted_parent_indexes:
            top_rewards.append(rewards[best_parent])

        print("Mean reward: {}\t\t| Mean of top 5: {}".format(\
            int(np.mean(rewards)), int(np.mean(top_rewards[:5]))))
        print("Top agents: {}\t| Reward: {}".format(sorted_parent_indexes, \
            top_rewards))
        print("###########################################\n")
        sys.stdout.flush()

        
        with open("Output/fittestMario.pkl", 'wb') as output:
            pickle.dump(agents[sorted_parent_indexes[0]], output, \
                pickle.HIGHEST_PROTOCOL)

        children_agents = return_children(agents, sorted_parent_indexes, elite_count, mutation_power, mutation_chance, multiprocess)

        agents = children_agents


### Model Loading and Training
The training function is defined here.
Control Arguments:
* <b>training:</b> If True, the genetic algorithm evolves the neural network(s), If False, the best agent is loaded and plays a game, with the gym monitor recording the video.

* <b>load_elite:</b> If True, the previous elite agent is loaded from file. If False, all previous trainings are ignored and overwritten.

* <b>agents:</b> The number of agents in the population.

* <b>elites:</b> The number of elite agents to carry over across generations, and to mutate from.

* <b>show_top:</b> The number of top agent rewards scores to display in the evolution output.

* <b>generations:</b> The number of generations to create and test.

* <b>mutation_power:</b> The Gaussian noise multiplier for the mutation function to use.

* <b>mutation_chance:</b> The percentage chance for a genotype to undergo mutation within a generation creation.

* <b>episodes:</b> The number of game runs to use when evaluating the performance of each agent within a generation.

* <b>hot_start:</b> If True, create the first generation via mutations of the single loaded elite agent from a previous session. If False, either create a generation of completely random agents or make copies of the loaded elite agent if available. <b>Not recommended if implementing multiple-agent elitism.</b>

* <b>random:</b> If True, randomly sample stages across all worlds for the evaluation of each agent each generation. If False, evaluate via every stage in the game sequentially.

* <b>multiprocess:</b> If True,

In [14]:
# Model Training
def train(training=True, load_elite=True, agents=10, elites=1, show_top=2, generations=100, mutation_power=0.2, mutation_chance=0.02, episodes=2, hot_start=False, random=True, multiprocess=False):
    if training:
        elite_agent = None
        if load_elite:
            with open("Output/fittestMario.pkl", 'rb') as input:
                elite_agent = pickle.load(input)
        
        # If not randomly sampling, all 32 stages are sequentially tested.
        if not random:
            print()
            episodes = 32
            
        main(elite_agent, mutation_power, mutation_chance, episodes, agents, elites, show_top, generations, hot_start, random, multiprocess)

    else:
        with open("Output/fittestMario.pkl", 'rb') as input:
            fittest_mario = pickle.load(input)
            test_agent(fittest_mario, True, True, True)


In [15]:
# train, load, agents, elite, display top, generations, mutation_power, mutation_chance, episodes per generation, hot_start, randomize stages, multiprocessing
train(True, True, 100, 1, 5, 50, 0.2, 1.0, 32, True, False, True)

AttributeError: Can't get attribute 'NeuralMario' on <module '__main__'>

In [None]:
if COLAB:
    !cp "Output" -r "gdrive/MyDrive/SMB Genetic Random/"

This will now run the best neural network one more time, wrapped in the monitor environment to record video of play.

In [None]:
if COLAB:
    try:
        display.start()
    except:
        pass
    
train(False)

In [None]:
if COLAB:
    !cp "video" -r "gdrive/MyDrive/SMB Genetic Random/"