# Resolve simple games using NEAT

#### Loading required libraries

In [1]:
# Tested with python 3.9.6
!pip3 install numpy"==1.26.4" gym"==0.26.2" pygame"==2.5.2" setuptools"==69.5.1" tensorflow"==2.16.1" neat-python"==0.92" joblib"==1.4.2" graphviz"==0.20.3"
import gym
import tensorflow as tf
import neat
import pickle
import pygame
# from joblib import Parallel, delayed

Defaulting to user installation because normal site-packages is not writeable
[0m

2024-05-23 10:16:42.165223: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
print(f"Num GPUs Available fo version {tf.__version__}: {len(tf.config.list_physical_devices('GPU'))}")

Num GPUs Available fo version 2.16.1: 0


#### Analysing our environment

In [3]:
def create_env(show, max_episode_steps = 10000):
    if show:
        return gym.make('CartPole-v1', max_episode_steps = max_episode_steps, render_mode = "human")
    else:
        return gym.make('CartPole-v1', max_episode_steps = max_episode_steps, render_mode = None)

env = create_env(show = False)
print(f"{env.observation_space.shape[0]} states")
print(f"{env.action_space.n} actions")
env.close()

4 states
2 actions


In [4]:
def play_with_random_actions():
    env = create_env(show = True)
    
    rewards = 0
    env.reset()
    done = False
    while not done:
        env.render()
        
        action = env.action_space.sample()
        _, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        rewards += reward

    print(f"Reward is {rewards}")

    env.close()

# play_with_random_actions()

#### Visualization

In [5]:
import warnings
import graphviz
import matplotlib.pyplot as plt
import numpy as np


def plot_stats(statistics, ylog=False, view=False, filename='avg_fitness.svg'):
    """ Plots the population's average and best fitness. """
    if plt is None:
        warnings.warn("This display is not available due to a missing optional dependency (matplotlib)")
        return

    generation = range(len(statistics.most_fit_genomes))
    best_fitness = [c.fitness for c in statistics.most_fit_genomes]
    avg_fitness = np.array(statistics.get_fitness_mean())
    stdev_fitness = np.array(statistics.get_fitness_stdev())

    plt.plot(generation, avg_fitness, 'b-', label="average")
    plt.plot(generation, avg_fitness - stdev_fitness, 'g-.', label="-1 sd")
    plt.plot(generation, avg_fitness + stdev_fitness, 'g-.', label="+1 sd")
    plt.plot(generation, best_fitness, 'r-', label="best")

    plt.title("Population's average and best fitness")
    plt.xlabel("Generations")
    plt.ylabel("Fitness")
    plt.grid()
    plt.legend(loc="best")
    if ylog:
        plt.gca().set_yscale('symlog')

    plt.savefig(filename)
    if view:
        plt.show()

    plt.close()


def plot_spikes(spikes, view=False, filename=None, title=None):
    """ Plots the trains for a single spiking neuron. """
    t_values = [t for t, I, v, u, f in spikes]
    v_values = [v for t, I, v, u, f in spikes]
    u_values = [u for t, I, v, u, f in spikes]
    I_values = [I for t, I, v, u, f in spikes]
    f_values = [f for t, I, v, u, f in spikes]

    fig = plt.figure()
    plt.subplot(4, 1, 1)
    plt.ylabel("Potential (mv)")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, v_values, "g-")

    if title is None:
        plt.title("Izhikevich's spiking neuron model")
    else:
        plt.title("Izhikevich's spiking neuron model ({0!s})".format(title))

    plt.subplot(4, 1, 2)
    plt.ylabel("Fired")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, f_values, "r-")

    plt.subplot(4, 1, 3)
    plt.ylabel("Recovery (u)")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, u_values, "r-")

    plt.subplot(4, 1, 4)
    plt.ylabel("Current (I)")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, I_values, "r-o")

    if filename is not None:
        plt.savefig(filename)

    if view:
        plt.show()
        plt.close()
        fig = None

    return fig


def plot_species(statistics, view=False, filename='speciation.svg'):
    """ Visualizes speciation throughout evolution. """
    if plt is None:
        warnings.warn("This display is not available due to a missing optional dependency (matplotlib)")
        return

    species_sizes = statistics.get_species_sizes()
    num_generations = len(species_sizes)
    curves = np.array(species_sizes).T

    fig, ax = plt.subplots()
    ax.stackplot(range(num_generations), *curves)

    plt.title("Speciation")
    plt.ylabel("Size per Species")
    plt.xlabel("Generations")

    plt.savefig(filename)

    if view:
        plt.show()

    plt.close()


def draw_net(config, genome, view=False, filename=None, node_names=None, show_disabled=True, prune_unused=False,
             node_colors=None, fmt='svg'):
    """ Receives a genome and draws a neural network with arbitrary topology. """
    # Attributes for network nodes.
    if graphviz is None:
        warnings.warn("This display is not available due to a missing optional dependency (graphviz)")
        return

    # If requested, use a copy of the genome which omits all components that won't affect the output.
    if prune_unused:
        genome = genome.get_pruned_copy(config.genome_config)

    if node_names is None:
        node_names = {}

    assert type(node_names) is dict

    if node_colors is None:
        node_colors = {}

    assert type(node_colors) is dict

    node_attrs = {
        'shape': 'circle',
        'fontsize': '9',
        'height': '0.2',
        'width': '0.2'}

    dot = graphviz.Digraph(format=fmt, node_attr=node_attrs)

    inputs = set()
    for k in config.genome_config.input_keys:
        inputs.add(k)
        name = node_names.get(k, str(k))
        input_attrs = {'style': 'filled', 'shape': 'box', 'fillcolor': node_colors.get(k, 'lightgray')}
        dot.node(name, _attributes=input_attrs)

    outputs = set()
    for k in config.genome_config.output_keys:
        outputs.add(k)
        name = node_names.get(k, str(k))
        node_attrs = {'style': 'filled', 'fillcolor': node_colors.get(k, 'lightblue')}

        dot.node(name, _attributes=node_attrs)

    used_nodes = set(genome.nodes.keys())
    for n in used_nodes:
        if n in inputs or n in outputs:
            continue

        attrs = {'style': 'filled',
                 'fillcolor': node_colors.get(n, 'white')}
        dot.node(str(n), _attributes=attrs)

    for cg in genome.connections.values():
        if cg.enabled or show_disabled:
            # if cg.input not in used_nodes or cg.output not in used_nodes:
            #    continue
            input, output = cg.key
            a = node_names.get(input, str(input))
            b = node_names.get(output, str(output))
            style = 'solid' if cg.enabled else 'dotted'
            color = 'green' if cg.weight > 0 else 'red'
            width = str(0.1 + abs(cg.weight / 5.0))
            dot.edge(a, b, _attributes={'style': style, 'color': color, 'penwidth': width})

    dot.render(filename, view=view)

    return dot

#### Training

In [6]:
def load_config():
    # Load configuration
    return neat.Config(
        neat.DefaultGenome, 
        neat.DefaultReproduction,
        neat.DefaultSpeciesSet, 
        neat.DefaultStagnation,
        'config-feedforward'
    )

In [7]:
def load_population(config, checkpoint = None):
    
    if checkpoint:
        print("Resuming from checkpoint: {}".format(checkpoint))
        
        # Load population from latest checkpoint
        return neat.Checkpointer.restore_checkpoint(checkpoint)
    
    else:
        print("Starting run from scratch")

        # Create the population, which is the top-level object for a NEAT run
        return neat.Population(config)
        

In [8]:
def eval_genome(genome, config):

    # Creating a gym env
    env = create_env(show = False) 
    state, _ = env.reset()

    # Creating a FeedForwardNetwork
    net = neat.nn.FeedForwardNetwork.create(genome, config)
    genome.fitness = 0
    
    done = False
    while not done:

        action = np.argmax(net.activate(state))
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # update state and reward
        state = next_state
        genome.fitness += reward

        if done:
            break

    env.close()

In [9]:
def eval_genomes(genomes, config, num_workers = 5):

    # Run evaluation in parallel
    # Parallel(num_workers)(
    #     delayed(eval_genome)(genome, config) for _, genome in genomes
    # )

    for _, genome in genomes:
        eval_genome(genome, config)

In [10]:
def save_genome(genome, path):
    # Unpickle saved winner
    with open(path, "wb") as f:
        pickle.dump(genome, f)

In [11]:
def load_genome(path):
    # Unpickle saved winner
    with open(path, "rb") as f:
        genome = pickle.load(f)
        genome_net = neat.nn.FeedForwardNetwork.create(genome, load_config())
        return (genome, genome_net)

In [12]:
def train(checkpoint, generations_nb):

    # Load config
    config = load_config()

    # Load a previously created population or a new one
    population = load_population(config, checkpoint)

    # Add a reporters to show progress 
    population.add_reporter(neat.StdOutReporter(True))
    stats = neat.StatisticsReporter()
    population.add_reporter(stats)
    population.add_reporter(neat.Checkpointer(generation_interval = 1, filename_prefix="results/net_"))

    # Run for up to n generations
    winner = population.run(eval_genomes, generations_nb)
    print('\nBest genome:\n{!s}'.format(winner))

    # Save & draw results
    save_genome(winner, "results/winner_neat.neat")
    draw_net(config, winner, view=False, node_names=None, filename="results/winner_net")
    plot_stats(stats, ylog=False, view=True, filename="results/fitness.svg") 
    plot_species(stats, view=True, filename="results/speciation.svg")

    return winner

# best_genome = train(checkpoint = None, generations_nb = None)

#### Play with our trained AI

In [13]:
(best_genome, best_genome_net) = load_genome("results/winner_neat.neat")

In [14]:
def play_with_neat(net):
    env = create_env(show = True)

    total_reward = 0
    state, _ = env.reset()

    done = False
    while not done:
        env.render()

        action = np.argmax(net.activate(state))
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        state = next_state
        total_reward += reward

        if total_reward % 250 == 0:
            print(f"Reward > {total_reward}")

        if total_reward > 1000:
            break

    env.close()

play_with_neat(best_genome_net)

  if not isinstance(terminated, (bool, np.bool8)):


Reward > 250.0
Reward > 500.0
Reward > 750.0
Reward > 1000.0
Reward > 1250.0
Reward > 1500.0
Reward > 1750.0
Reward > 2000.0


In [16]:
pygame.quit()

: 