In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision 
from tqdm import tqdm

import IPython
from IPython.display import display, HTML

import os
import neat
#import neat.visualize as visualize

In [2]:
device = torch.device('cpu')
device

device(type='cpu')

### Model

In [3]:
#class

### Training

In [4]:
#Train

### Run

In [5]:
#main

In [6]:
import warnings

import graphviz
import matplotlib.pyplot as plt
import numpy as np


def plot_stats(statistics, ylog=False, view=False, filename='avg_fitness.svg'):
    """ Plots the population's average and best fitness. """
    if plt is None:
        warnings.warn("This display is not available due to a missing optional dependency (matplotlib)")
        return

    generation = range(len(statistics.most_fit_genomes))
    best_fitness = [c.fitness for c in statistics.most_fit_genomes]
    avg_fitness = np.array(statistics.get_fitness_mean())
    stdev_fitness = np.array(statistics.get_fitness_stdev())

    plt.plot(generation, avg_fitness, 'b-', label="average")
    plt.plot(generation, avg_fitness - stdev_fitness, 'g-.', label="-1 sd")
    plt.plot(generation, avg_fitness + stdev_fitness, 'g-.', label="+1 sd")
    plt.plot(generation, best_fitness, 'r-', label="best")

    plt.title("Population's average and best fitness")
    plt.xlabel("Generations")
    plt.ylabel("Fitness")
    plt.grid()
    plt.legend(loc="best")
    if ylog:
        plt.gca().set_yscale('symlog')

    plt.savefig(filename)
    if view:
        plt.show()

    plt.close()


def plot_spikes(spikes, view=False, filename=None, title=None):
    """ Plots the trains for a single spiking neuron. """
    t_values = [t for t, I, v, u, f in spikes]
    v_values = [v for t, I, v, u, f in spikes]
    u_values = [u for t, I, v, u, f in spikes]
    I_values = [I for t, I, v, u, f in spikes]
    f_values = [f for t, I, v, u, f in spikes]

    fig = plt.figure()
    plt.subplot(4, 1, 1)
    plt.ylabel("Potential (mv)")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, v_values, "g-")

    if title is None:
        plt.title("Izhikevich's spiking neuron model")
    else:
        plt.title("Izhikevich's spiking neuron model ({0!s})".format(title))

    plt.subplot(4, 1, 2)
    plt.ylabel("Fired")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, f_values, "r-")

    plt.subplot(4, 1, 3)
    plt.ylabel("Recovery (u)")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, u_values, "r-")

    plt.subplot(4, 1, 4)
    plt.ylabel("Current (I)")
    plt.xlabel("Time (in ms)")
    plt.grid()
    plt.plot(t_values, I_values, "r-o")

    if filename is not None:
        plt.savefig(filename)

    if view:
        plt.show()
        plt.close()
        fig = None

    return fig


def plot_species(statistics, view=False, filename='speciation.svg'):
    """ Visualizes speciation throughout evolution. """
    if plt is None:
        warnings.warn("This display is not available due to a missing optional dependency (matplotlib)")
        return

    species_sizes = statistics.get_species_sizes()
    num_generations = len(species_sizes)
    curves = np.array(species_sizes).T

    fig, ax = plt.subplots()
    ax.stackplot(range(num_generations), *curves)

    plt.title("Speciation")
    plt.ylabel("Size per Species")
    plt.xlabel("Generations")

    plt.savefig(filename)

    if view:
        plt.show()

    plt.close()


def draw_net(config, genome, view=False, filename=None, node_names=None, show_disabled=True, prune_unused=False,
             node_colors=None, fmt='svg'):
    """ Receives a genome and draws a neural network with arbitrary topology. """
    # Attributes for network nodes.
    if graphviz is None:
        warnings.warn("This display is not available due to a missing optional dependency (graphviz)")
        return

    # If requested, use a copy of the genome which omits all components that won't affect the output.
    if prune_unused:
        genome = genome.get_pruned_copy(config.genome_config)

    if node_names is None:
        node_names = {}

    assert type(node_names) is dict

    if node_colors is None:
        node_colors = {}

    assert type(node_colors) is dict

    node_attrs = {
        'shape': 'circle',
        'fontsize': '9',
        'height': '0.2',
        'width': '0.2'}

    dot = graphviz.Digraph(format=fmt, node_attr=node_attrs)

    inputs = set()
    for k in config.genome_config.input_keys:
        inputs.add(k)
        name = node_names.get(k, str(k))
        input_attrs = {'style': 'filled', 'shape': 'box', 'fillcolor': node_colors.get(k, 'lightgray')}
        dot.node(name, _attributes=input_attrs)

    outputs = set()
    for k in config.genome_config.output_keys:
        outputs.add(k)
        name = node_names.get(k, str(k))
        node_attrs = {'style': 'filled', 'fillcolor': node_colors.get(k, 'lightblue')}

        dot.node(name, _attributes=node_attrs)

    used_nodes = set(genome.nodes.keys())
    for n in used_nodes:
        if n in inputs or n in outputs:
            continue

        attrs = {'style': 'filled',
                 'fillcolor': node_colors.get(n, 'white')}
        dot.node(str(n), _attributes=attrs)

    for cg in genome.connections.values():
        if cg.enabled or show_disabled:
            # if cg.input not in used_nodes or cg.output not in used_nodes:
            #    continue
            input, output = cg.key
            a = node_names.get(input, str(input))
            b = node_names.get(output, str(output))
            style = 'solid' if cg.enabled else 'dotted'
            color = 'green' if cg.weight > 0 else 'red'
            width = str(0.1 + abs(cg.weight / 5.0))
            dot.edge(a, b, _attributes={'style': style, 'color': color, 'penwidth': width})

    dot.render(filename, view=view)

    return dot

In [None]:
# 2-input XOR inputs and expected outputs.
xor_inputs = [(0.0, 0.0), (0.0, 1.0), (1.0, 0.0), (1.0, 1.0)]
xor_outputs = [(0.0,), (1.0,), (1.0,), (0.0,)]


def eval_genomes(genomes, config):
    for genome_id, genome in genomes:
        genome.fitness = 4.0
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        for xi, xo in zip(xor_inputs, xor_outputs):
            output = net.activate(xi)
            genome.fitness -= (output[0] - xo[0]) ** 2


def run(config_file):
    # Load configuration.
    config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                         neat.DefaultSpeciesSet, neat.DefaultStagnation,
                         config_file)

    # Create the population, which is the top-level object for a NEAT run.
    p = neat.Population(config)

    # Add a stdout reporter to show progress in the terminal.
    p.add_reporter(neat.StdOutReporter(True))
    stats = neat.StatisticsReporter()
    p.add_reporter(stats)
    p.add_reporter(neat.Checkpointer(10))

    # Run for up to 300 generations.
    winner = p.run(eval_genomes, 100)

    # Display the winning genome.
    print('\nBest genome:\n{!s}'.format(winner))

    # Show output of the most fit genome against training data.
    print('\nOutput:')
    winner_net = neat.nn.FeedForwardNetwork.create(winner, config)
    for xi, xo in zip(xor_inputs, xor_outputs):
        output = winner_net.activate(xi)
        print("input {!r}, expected output {!r}, got {!r}".format(xi, xo, output))

    node_names = {-1: 'A', -2: 'B', 0: 'A XOR B'}
    draw_net(config, winner, True, node_names=node_names)
    #draw_net(config, winner, True, node_names=node_names, prune_unused=True)
    plot_stats(stats, ylog=False, view=True)
    plot_species(stats, view=True)

    p = neat.Checkpointer.restore_checkpoint('neat-checkpoint-4')
    p.run(eval_genomes, 10)

In [None]:
# Determine path to configuration file. This path manipulation is
# here so that the script will run successfully regardless of the
# current working directory.
local_dir = os.path.dirname('./')
config_path = os.path.join(local_dir, 'config-feedforward')
run(config_path)

### Testing

In [7]:
import numpy as np
import torch
import math

# 2-input XOR inputs and expected outputs.
xor_inputs = [(0.0, 0.0), (0.0, 1.0), (1.0, 0.0), (1.0, 1.0)]
xor_outputs = [(0.0,), (1.0,), (1.0,), (0.0,)]
device = torch.device('cpu')
alive = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)

scent_conv_weights = torch.tensor([[[
    [0.0, 0.125, 0.25, 0.125, 0.0],
    [0.125, 0.25, 0.5, 0.25, 0.125],
    [0.25, 0.5, 1, 0.5, 0.25],
    [0.125, 0.25, 0.5, 0.25, 0.125],
    [0.0, 0.125, 0.25, 0.125, 0.0]
]]], device=device)

def get_zeros():
    zeros = np.zeros(shape=[17, 17], dtype=np.float32)
    return zeros

def get_ca():
    zeros = get_zeros()
    center_ca = np.array([
        [0, 0, 1, 0, 0],
        [0, 1, 1, 1, 0],
        [1, 1, 1, 1, 1],
        [0, 1, 1, 1, 0],
        [0, 0, 1, 0, 0]
    ])
    ca = zeros
    ca[6:11, 6:11] = center_ca
    return ca

def get_food():
    zeros = get_zeros()
    food = zeros
    food[12, 12] = 1
    return food

def get_input():
    ca = get_ca()
    food = get_food()
    zeros = get_zeros()
    input = np.stack([ca, zeros, zeros, food])
    return input

def alive_filter(x):
    x = x.view(1, 4, 17, 17)
    x = alive(x[:, 0:1, :, :]) > 0.1
    return x.view(1, 17, 17)

def alive_count(x):
    live_count = x.sum()
    return live_count

def live_count_above(x, threshold):
    return (x > threshold).to(torch.float32).sum()

def perceive_scent(food):
    food = food.view(1,1,17,17)
    x = F.conv2d(food, scent_conv_weights, padding=2)[:, 0]
    return x

def perceive_cell_surrounding(x):
    def _perceive_with(x, conv_weights):
        conv_weights = conv_weights.view(1,1,3,3).repeat(4, 1, 1, 1)
        return F.conv2d(x.view(1,4, 17, 17), conv_weights, padding=1, groups=4)

    dx = torch.outer(torch.tensor([1,2,1], device=device), torch.tensor([-1, 0, 1], device=device)) / 8.0 # Sobel filter
    dy = torch.transpose(dx, 0, 1)

    y1 = _perceive_with(x, dx)
    y2 = _perceive_with(x, dy)
    y = torch.cat((x.view(1,4,17,17),y1,y2), dim=1) 
    #y = torch.relu(self.conv2(x)) #perceive neighbor cell state
    return y
    
def average_placement(x):
    size = x.shape[0]
    d1 = 0
    d2 = 0
    total = x.sum() / size
    if total == 0:
        return(100, 100)
    
    def to_int(x):
        if math.isnan(x):
            return 0
        else:
            return int(x)

    for i in range(size):
        d1 += x[i].mean() * i / total
        d2 += x[:, i].mean() * i / total
    return (to_int(d1), to_int(d2))
    
def eval_genomes(genomes, config):
    input = torch.tensor(get_input(), device=device)
    time_steps = np.random.randint(10, 50)

    living_count_before = alive_count(input[0:1])
    living_above_before = live_count_above(input[0:1], 0.1)

    food_placement = average_placement(input[3])

    for genome_id, genome in genomes: #for each genome
        #genome.fitness = 4.0
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        cell = input

        for _ in range(time_steps):
            x = cell
            x[3] = perceive_scent(input[3]).view(1, 17, 17)
            prelife_mask = alive_filter(x)

            x = perceive_cell_surrounding(x).view(12, 17, 17)
            x = x.transpose(0, 2)
            x = x.detach().cpu().numpy()
            #TODO the apply_along_axis does not do what assumed....!!!!! - look at how I've implemented something similar in NeuroEvolution.ipynb
            x = np.apply_along_axis(net.activate, 2, x)
            #x = net.activate(x)
            x = torch.tensor(x).to(device=device).to(torch.float32)
            x = x.transpose(0, 2)

            x = cell + x

            postlife_mask = alive_filter(x)
            life_mask = torch.bitwise_and(prelife_mask, postlife_mask).to(torch.float32)
            x = x*life_mask

            x = torch.clamp(x, -10.0, 10.0)
            cell = x

        living_count = alive_count(x[0:1]) #total pixel val 
        living_above = live_count_above(x[0:1], 0.1) #pixels above ...

        #TODO make methods for measuring the distance from food....
        #calculate average distance between food and CA cells


        living_count_diff = (living_count_before - living_count) ** 2
        living_above_diff = (living_above_before - living_above) ** 2

        ca_placement = average_placement(x[0])
        pythagoras_distance = ((ca_placement[0]-food_placement[0]) ** 2 + (ca_placement[1]-food_placement[1]) ** 2) ** 0.5

        genome.fitness = (-(living_count_diff/3 + living_above_diff/100 + pythagoras_distance/10)).detach().cpu().item()

        #for xi, xo in zip(xor_inputs, xor_outputs):
        #    output = net.activate(xi)
        #    genome.fitness -= (output[0] - xo[0]) ** 2

    print('one generation done')


def run(config_file):
    # Load configuration.
    config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                         neat.DefaultSpeciesSet, neat.DefaultStagnation,
                         config_file)

    p = neat.Population(config)  #Create population (top-level object for NEAT run)

    # Add a stdout reporter to show progress in the terminal.
    p.add_reporter(neat.StdOutReporter(True))
    stats = neat.StatisticsReporter()
    p.add_reporter(stats)
    p.add_reporter(neat.Checkpointer(20))

    winner = p.run(eval_genomes, 1000) #run up to generations

    print('\nBest genome:\n{!s}'.format(winner)) #display winning genome
    print('\nOutput:') #output most fit genome against training data
    winner_net = neat.nn.FeedForwardNetwork.create(winner, config)

    #node_names = {-1: 'A', -2: 'B', 0: 'A XOR B'}
    #draw_net(config, winner, True, node_names=node_names)
    #draw_net(config, winner, True, node_names=node_names, prune_unused=True)
    #plot_stats(stats, ylog=False, view=True)
    #plot_species(stats, view=True)

    p = neat.Checkpointer.restore_checkpoint('neat-checkpoint-9') #restore checkpoint
    p.run(eval_genomes, 10) #run training data with that checkpoint

In [8]:
# Determine path to configuration file. This path manipulation is
# here so that the script will run successfully regardless of the
# current working directory.
local_dir = os.path.dirname('./')
config_path = os.path.join(local_dir, 'config-feedforward')
run(config_path)


 ****** Running generation 0 ****** 

one generation done
Population's average fitness: -1257035.26982 stdev: 1275287.69488
Best fitness: -0.56569 - size: (16, 192) - species 14 - id 14
Average adjusted fitness: 0.545
Mean genetic distance 3.603, standard deviation 0.279
Population of 300 members in 150 species:
   ID   age  size  fitness  adj fit  stag
     1    0     2  -2741430.8    0.007     0
     2    0     2  -189920.2    0.931     0
     3    0     2  -145583.6    0.947     0
     4    0     2  -1701325.2    0.384     0
     5    0     2  -2539705.8    0.080     0
     6    0     2  -1138524.2    0.587     0
     7    0     2  -440711.4    0.840     0
     8    0     2  -1081496.9    0.608     0
     9    0     2  -2759805.2    0.000     0
    10    0     2  -684684.3    0.752     0
    11    0     2  -140876.7    0.949     0
    12    0     2  -339460.8    0.877     0
    13    0     2  -2371614.0    0.141     0
    14    0     2     -0.6    1.000     0
    15    0     2     

KeyboardInterrupt: 