# NEAT Algorithm Implementation

The below notebook implements the originial NEAT algorithm explained in the original paper published by Stanley et. al in 2002. The below implementation takes a functional approach with each genome being functionally represented by a `networkx` graph.

In [None]:
import networkx as nx
import numpy as np
import pandas as pd

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as img

import math
import statistics
import random
from typing import List, Tuple, Any, Union, Set
import json
import itertools
from copy import deepcopy
import time

## Innovation Database

The innovation database class handles the innovation number system that was introduced in the NEAT paper. The class holds a dictionary and the inverse dictionary that allows for lookup of the innovation number for the nodes as well as the nodes for the innovation number.

In [None]:
class InnovationDatabase:

    def __init__(self):
        self.i2n = {}
        self.n2i = {}
        self.innovation_number = 1

    def __setitem__(self, key: Union[Tuple[int, int], int], value: Union[Tuple[int, int], int]):
        if key in self.i2n.keys() or key in self.n2i.keys():
            raise Exception(f"Not Updating Database. Key {key} already exists.")
            return
        if type(key) is int:
            self.i2n[key] = value
            self.n2i[value] = key
        else:
            self.n2i[key] = value
            self.i2n[value] = key

    def __getitem__(self, key: Union[Tuple[int, int], int]):
        if type(key) is int:
            return self.i2n[key]
        else:
            return self.n2i[key]

    def __str__(self):
        return str(self.i2n)

    def innovation(self, edge: Tuple[int, int]) -> int:
        if edge not in self.n2i.keys():
            self[edge] = self.innovation_number
            self.innovation_number += 1
        return self[edge]

    def reverse_innovation(self, innovation_number: int) -> Union[None, Tuple[int, int]]:
        if innovation_number not in self.i2n.keys():
            return None
        return self.i2n[innovation_number]


database = InnovationDatabase()
database[(1, 2)] = 3
database[4] = (3, 4)
print(database[3])
print(database[(3, 4)])
print(database)

## Iris Flowers Dataset

In [None]:
class IrisFlowers:
    def __init__(self, filename):
        """
        Provide a data loader with random sampling and batched
        sampling for the Iris Flowers dataset.
        """
        self.file = filename
        df = pd.read_csv(self.file)
        dataset = df.to_dict(orient="list")
        self.variety_to_index = {}
        self.index_to_variety = {}

        for index, word in enumerate(set(dataset["variety"])):
            self.variety_to_index[word] = index
            self.index_to_variety[index] = word

        df["variety"] = df["variety"].map(self.variety_to_index)
        df = df.sample(frac=1)

        self.df = df
        self.df_np = self.df.to_numpy()
        self.length = len(df)

    def single_sample(self) -> np.ndarray:
        """
        Return a numpy array with the last row being the labels.
        """
        random_selection = random.randint(0, self.length - 1)
        return self.df_np[random_selection]

    def batched_sample(self, batch_size: int) -> np.ndarray:
        """
        Return a batched sample as a numpy array.
        """
        random_shuffled_df = np.copy(self.df_np)
        np.random.shuffle(random_shuffled_df)
        return random_shuffled_df[:batch_size]

## Topologically Evolving Artificial Neural Network (TWEANN) Construction

Graph sturcture for encoding the TWEANN:

- Each node on the graph represents one of the nodes and each edge represents a gene in the genome.
- Each node has a `bias` attribute that is added to the function similar to a perceptron where $\sigma$ is the activation function.
$$
y= \sigma(b + \sum x_i w_i)
$$

In [None]:
def print_genome(genome: nx.DiGraph):
    dict_genome = nx.to_dict_of_dicts(genome)
    print(json.dumps(dict_genome, indent=4))


def get_inputs_and_outputs(genome: nx.DiGraph) -> Tuple[Set[int], Set[int]]:
    inputs = []
    outputs = []
    for node in genome.nodes:
        if genome.nodes[node]["node"] == "input":
            inputs.append(node)
        elif genome.nodes[node]["node"] == "output":
            outputs.append(node)
        else:
            break
    return set(inputs), set(outputs)


def plot_genome(genome: nx.DiGraph, with_weights: bool = False) -> None:
    # Define the node colors.
    node_colors = {"hidden": "blue", "input": "green", "output": "red"}
    node_colors = [node_colors[genome.nodes[i]["node"]] for i in genome.nodes]
    # Define the edge colors.
    edge_colors = ["green" if genome.edges[edge]["enabled"] else "red" for edge in genome.edges]
    # Define the layout of the drawn genome.
    pos = nx.circular_layout(genome)

    nx.draw(genome, pos=pos, with_labels=True, node_color=node_colors, edge_color=edge_colors)
    if with_weights:
        edge_labels = {edge: round(genome.edges[edge]["w"], 2) for edge in genome.edges}
        nx.draw_networkx_edge_labels(genome, pos=pos, edge_labels=edge_labels, font_color="green")
    plt.show()

### Creation Of Genomes

In [None]:
def create_genome(input_len: int, output_len: int, database: InnovationDatabase):
    """
    Create a fully connected network as the base genome with the specified
    number of inputs and outputs.
    """
    inputs = list(range(1, input_len + 1))
    outputs = list(range(input_len + 1, input_len + output_len + 1))

    genome = nx.DiGraph()
    genome.add_nodes_from([*[(i, {"bias": np.random.normal(), "node": "input"}) for i in inputs], *[(o, {"bias": np.random.normal(), "node": "output"}) for o in outputs]])
    for i in inputs:
        for o in outputs:
            edge_attr = {"w": np.random.normal(), "enabled": True, "i_n": database.innovation((i, o))}
            genome.add_edge(i, o, **edge_attr)

    return genome

database = InnovationDatabase()
genome = create_genome(4, 3, database)

### Genome Mutations

The genome can be mutated in 3 different ways.

1. Weight Change: The weight change occurs every iteration with a very high probability and adds a random normal variable scaled to a certain value to the current weight/bias of the genome. The `p = weight_mutation_probability` variable controls whether the weight is mutated if $U(0, 1) < p$, then the weight is mutated.
2. Vertex: A random enabled gene is chosen and the gene is disabled and 2 new genes are added with a new node is added between the nodes of the disabled gene and the new node.
3. Connection: A new connection is created between 2 nodes that does not already exist. If the connection exists but is disabled, the node is enabled.

In [None]:
def mutate_weight(genome: nx.DiGraph, weight_mutation_probability: float, scale: float) -> nx.DiGraph:
    genome = deepcopy(genome)
    # Mutate the weights.
    for edge in genome.edges:
        if np.random.rand() < weight_mutation_probability:
            genome.edges[edge]["w"] += np.random.normal() * scale
    # Mutate the biases
    for node in genome.nodes:
        if np.random.rand() < weight_mutation_probability:
            genome.nodes[node]["bias"] += np.random.normal() * scale
    return genome

def mutate_vertex(genome: nx.DiGraph, database: InnovationDatabase) -> nx.DiGraph:
    genome = deepcopy(genome)
    # Pick a random enabled edge:
    edge = random.choice([edge for edge in genome.edges if genome.edges[edge]["enabled"]])
    genome.edges[edge]["enabled"] = False
    in_node, out_node = edge[0], edge[1]
    # Get the integer for the new node that can be added.
    new_node = max(genome.nodes) + 1
    genome.add_node(new_node, bias=np.random.normal(), node="hidden")
    genome.add_edges_from([
        (in_node, new_node, {"w": np.random.normal(),
                             "enabled": True,
                             "i_n": database.innovation((in_node, new_node))}
         ),
        (new_node, out_node, {"w": np.random.normal(),
                             "enabled": True,
                             "i_n": database.innovation((new_node, out_node))})])
    return genome

def mutate_connection(genome: nx.DiGraph, database: InnovationDatabase) -> nx.DiGraph:
    genome = deepcopy(genome)
    inputs, outputs = get_inputs_and_outputs(genome)
    # Get the list of all possible genes that can be made and remove the genes that
    # are already in the genome that are enabled.
    possible_genes = set(itertools.product(set(genome.nodes) - outputs,
                                           set(genome.nodes) - inputs))
    possible_genes = {edge for edge in possible_genes if edge[0] != edge[1]}
    already_used_genes = set([edge for edge in genome.edges if genome.edges[edge]["enabled"]])
    possible_genes = possible_genes - already_used_genes
    possible_genes = [edge for edge in possible_genes if edge[1] not in set(nx.ancestors(genome, edge[0]))]
    if len(possible_genes) == 0:
        return genome
    random_gene = random.choice(possible_genes)
    if random_gene in genome.edges:
        genome.edges[random_gene]["enabled"] = True
    else:
        genome.add_edge(*random_gene, **{"w": np.random.normal(), "enabled": True, "i_n": database.innovation(random_gene)})
    return genome

genome = create_genome(4, 3, database)

# Test vertex mutations
for _ in range(10):
    genome = mutate_vertex(genome, database)

# Test connection mutations
for _ in range(10):
    genome = mutate_connection(genome, database)

plot_genome(genome)

### Genome Evaluation

The evaluation procedure for a genome, $G$, for a given input $x$ and an evaluation stack.

1. Keep track of a value dictionary. The dictionary is updated for each node only once and upon updating the node, the values cannot be changed.
2. For each node, if all the parents of the node are in the value dictionary, then calculate the output and set to the value dictionary. If not check if there are any parents that must be added to the evaluation stack and add them. Re-add the node to the stack.
3. For each node that has been evaluated, add the successors to the node into the stack if they are not already in the evaluation stack.

In [None]:
def softmax(x):
    x -= np.max(x, axis=-1, keepdims=True)
    exp_x = np.exp(x)
    return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

def evaluate(genome: nx.DiGraph, x: np.ndarray) -> np.ndarray:
    inputs, outputs = get_inputs_and_outputs(genome)
    values = {i: x[:, idx] for idx, i in enumerate(inputs)}
    evaluation_stack = []
    evaluated_stack = []
    # Add all the successors of the inputs to the evaluation stack.
    for i in inputs:
        for s in genome.successors(i):
            if genome.edges[(i, s)]["enabled"] and s not in evaluation_stack:
                evaluation_stack.append(s)
    # Keep going through the evaluation stack until
    while len(evaluation_stack) > 0:
        node = evaluation_stack.pop(0)
        # Don't evaluate the node if it has already been evaluated.
        if node in values.keys():
            continue
        # Get the enabled parents and successors of the node.
        parents = {parent for parent in set(nx.all_neighbors(genome, node)) - set(genome.successors(node)) if genome.edges[(parent, node)]["enabled"]}
        successors = {i for i in set(genome.successors(node)) if genome.edges[(node, i)]["enabled"]} - set(evaluation_stack)
        if len(parents - set(values.keys())) == 0:
            value = 0
            for parent in parents:
                value += genome.edges[(parent, node)]["w"] * values[parent]
            value += genome.nodes[node]["bias"]
            values[node] = value * (value > 0)
            # Add successors that are not in the evaluation stack.
            evaluation_stack.extend(successors)
        else:
            # Add parents that have yet to be evaluated to the evaluation stack.
            evaluation_stack.extend(parents.union({node}) - set(evaluation_stack) - set(values.keys()))

    output = np.zeros((x.shape[0], len(outputs)))
    for idx, i in enumerate(outputs):
        output[:, idx] = values[i]
    return output

x = np.random.rand(16, 4)
y_hat = evaluate(genome, x)
y_hat.shape

### Genome Crossover

Genome crossover as defined according to the paper where the matching genes are chosen randomly but the disjoint genes and excess genes are chosen from the more fit parent.

In [None]:
ENABLE_CHANCE = 0.2

def crossover(genome1: nx.DiGraph, genome2: nx.DiGraph) -> nx.DiGraph:
    genome1 = deepcopy(genome1)
    innovation_numbers = [edge[2]["i_n"] for edge in genome1.edges(data=True)]
    # Crossover between the weights.
    for edge in genome2.edges:
        if edge in genome1.edges:
            if np.random.rand() >= 0.5:
                continue
            genome1.edges[edge]["w"] = genome2.edges[edge]["w"]
            genome1.edges[edge]["enabled"] = genome2.edges[edge]["enabled"]

            if np.random.rand() < ENABLE_CHANCE:
                genome1.edges[edge]["enabled"] = True

    # Crossover between the bias.
    for node in genome1.nodes:
        if node in genome2.nodes:
            genome1.nodes[node]["bias"] = genome2.nodes[node]["bias"]
    return genome1

genome1 = create_genome(4, 3, database)
# Test vertex mutations
for _ in range(10):
    genome1 = mutate_vertex(genome, database)
# Test connection mutations
for _ in range(10):
    genome1 = mutate_connection(genome, database)

genome2 = create_genome(4, 3, database)
# Test vertex mutations
for _ in range(10):
    genome2 = mutate_vertex(genome, database)
# Test connection mutations
for _ in range(10):
    genome2 = mutate_connection(genome, database)

genome = crossover(genome1, genome2)

## NEAT Generation Algorithm

The generation process algorithm is for the steps taken in each single individual generation. This takes into account the concept of speciation introduced in the paper to reduce the possibility of dominance of a specific genome architecture. The helper functions are highlighted below:

- `score_genome`: Calculate the fitness of an individual genome given a sample $x, y$.
- `split_to_species`: Use the compatability threshold and the similarity between the weights and structure of all the genomes. $c_1, c_2, c_3$ are the scaling factors, $D$ is the number of disjoint genes, $E$ is the number of excess genes, $N$ is the total number of genes, and $\mu_w$ is the average weight difference.
$$
c_1 \times \frac{D}{N} + c_2 \times \frac{E}{N} + c_3 \times \mu_w
$$
- `speciated_fitness`: Calculates the fitness of the individuals in the population taking into account the number of species in the population. Given that $f$ is the fitness score of a genome $x$. The more the number of species, the lower the score. The function in the denominator sums the number of individuals in the species.
$$
f'_i = \frac{f_i}{\sum_{j = 1}^n \text{sh}(\delta(i, j))}
$$

In [None]:
N = 100
OFFSPRING = int(N * 1.0)
ITERATIONS = 1000
BATCH_SIZE = 64
INPUTS = 4
OUTPUTS = 3
DELTA_T = 2
C1, C2, C3 = 1, 1, 1
PR_MUTATE_WEIGHT = 1
PR_MUTATE_CONN = 0.5
PR_MUTATE_VERTEX = 0.3

In [None]:
def score_genome(genome: nx.DiGraph, x: np.ndarray, y: np.ndarray) -> float:
    y_hat = evaluate(genome, x)
    y_hat = softmax(y_hat)

    # Calculate the accuracy
    y_ints = np.max(y_hat, axis=-1).astype(np.int32)
    accuracy = np.sum(y_ints == y)

    return accuracy

def score_genome_entropy(genome: nx.DiGraph, x: np.ndarray, y: np.ndarray) -> float:
    y = y.astype(np.int32)
    y = np.eye(np.max(y) + 1)[y]

    y_hat = evaluate(genome, x)
    y_hat = np.clip(y_hat, 1e-15, 1 - 1e-15)

    cross_entropy = -y * np.log(y_hat) - (1 - y) * np.log(1 - y_hat)
    loss = np.mean(np.sum(cross_entropy, axis=-1))

    return -loss

def calculate_compatability(genome1: nx.DiGraph, genome2: nx.DiGraph) -> Tuple[float, Tuple[float, float, float]]:
    in_g1 = set([genome1.edges[edge]["i_n"] for edge in genome1.edges])
    in_g2 = set([genome2.edges[edge]["i_n"] for edge in genome2.edges])

    n = max(len(in_g1), len(in_g2))
    max_i_n = max(max(in_g1), max(in_g2))

    # Average weight difference.
    same_in = in_g1.intersection(in_g2)
    w_diff = np.array([genome1.edges[edge]["w"] - genome2.edges[edge]["w"]
                       for edge in genome1.edges
                       if genome1.edges[edge]["i_n"] in same_in])
    w_diff = np.mean(np.abs(w_diff))

    # Disjoint and Excess Genes
    disjoint = {i for i in in_g1.union(in_g2) if i <= max_i_n}
    excess = (in_g1.union(in_g2) - in_g1.intersection(in_g2)) - disjoint

    p1 = C1 * len(disjoint) / n
    p2 = C2 * len(excess) / n
    p3 = float(C3 * w_diff)
    return p1 + p2 + p3, (p1, p2, p3)

def split_to_species(population: List[nx.DiGraph]) -> Tuple[List[List[nx.DiGraph]], List[float]]:
    speciated_population = []
    deltas = []

    for idx, individual in enumerate(population):
        if idx == 0:
            speciated_population.append([individual])
        else:
            added = False
            for species in speciated_population:
                delta, split_values = calculate_compatability(species[0], individual)
                if delta < DELTA_T:
                    species.append(individual)
                    added = True
                    break
                deltas.append((delta, split_values))
            if not added:
                speciated_population.append([individual])

    return speciated_population, deltas

def speciated_fitness(population: List[nx.DiGraph]) -> Tuple[dict[nx.DiGraph, float], dict[nx.DiGraph, float]]:
    fitness_dict = {}
    sfitness_dict = {}

    speciated_population, _ = split_to_species(population)

    for species in speciated_population:
        num_species = len(species)

        for individual in species:
            individual_score = score_genome(individual, x, y)
            fitness_dict[individual] = individual_score
            score_species = individual_score / num_species
            sfitness_dict[individual] = score_species

    return sfitness_dict, fitness_dict

def mutate(genome: nx.DiGraph) -> nx.DiGraph:
    genome = mutate_weight(genome, PR_MUTATE_WEIGHT, 1e-3)
    if np.random.rand() < PR_MUTATE_VERTEX:
        genome = mutate_vertex(genome, database)
    if np.random.rand() < PR_MUTATE_CONN:
        genome = mutate_connection(genome, database)
    return genome


database = InnovationDatabase()
iris = IrisFlowers("data/iris.csv")

# Create the population
population = []
for _ in range(N):
    individual = create_genome(INPUTS, OUTPUTS, database)
    population.append(individual)

sample = iris.batched_sample(16)
x, y = sample[:, :4], sample[:, 4]

len(split_to_species(population)[0])

score_genome(population[0], x, y)

In [None]:
database = InnovationDatabase()
iris = IrisFlowers("data/iris.csv")

# Create the population
population = []
for _ in range(N):
    individual = create_genome(INPUTS, OUTPUTS, database)
    population.append(individual)

# -------------
# TRAINING STEP
# -------------

for iter in range(ITERATIONS):
    # Sample from the dataset.
    sample = iris.batched_sample(BATCH_SIZE)
    x, y = sample[:, :4], sample[:, 4]

    offsprings = []
    random_genome_evaluations = {}
    for _ in range(OFFSPRING):
        # Select the first genome randomly.
        # For efficiency purposes, save the genome score so that the score can be
        # recovered later without computation.
        random_genome = random.choice(population)
        if random_genome not in random_genome_evaluations.keys():
            fitness_score = score_genome(random_genome, x, y)
            random_genome_evaluations[random_genome] = fitness_score
        first_score = random_genome_evaluations[random_genome]

        # Calculate the other genome as well, make sure it's not the same genome
        # as the first one.
        another_random = random.choice(population)
        while another_random == random_genome:
            another_random = random.choice(population)
        if another_random not in random_genome_evaluations.keys():
            fitness_score = score_genome(another_random, x, y)
            random_genome_evaluations[another_random] = fitness_score
        second_score = random_genome_evaluations[another_random]

        # Crossover and mutation of each genome.
        offspring = crossover(random_genome, another_random) if first_score > second_score else crossover(another_random, random_genome)
        offspring = mutate(offspring)

        offsprings.append(offspring)

    sfitness_offsprings, fitness_offsprings = speciated_fitness(offsprings)
    offsprings = sorted(sfitness_offsprings.items(), key=lambda item: item[1], reverse=True)

    sfitness_pop, fitness_pop = speciated_fitness(population)
    population = sorted(sfitness_pop.items(), key=lambda item: item[1], reverse=True)

    pop_idx = off_idx = 0
    num_replaced = 0
    new_population = []
    new_pop_fitness = []


    while len(new_population) < N:
        pop_fitness = population[pop_idx][1]

        if off_idx >= len(offsprings):
            new_population.append(population[pop_idx][0])
            pop_idx += 1
            new_pop_fitness.append(pop_fitness)
            continue

        off_fitness = offsprings[off_idx][1]

        if pop_fitness > off_fitness:
            new_population.append(population[pop_idx][0])
            pop_idx += 1
            new_pop_fitness.append(pop_fitness)
        else:
            new_population.append(offsprings[off_idx][0])
            off_idx += 1
            num_replaced += 1
            new_pop_fitness.append(off_fitness)

    if iter % 10 == 0:
        original_pop_fitness = [i[1] for i in population]
        print(f"Iteration: {iter}")
        print(f"Replaced: {num_replaced}")
        print(f"Population Fitness: {statistics.mean(original_pop_fitness)}, {max(original_pop_fitness)}")
        print(f"New Population Fitness: {statistics.mean(new_pop_fitness)}, {max(new_pop_fitness)}")

    population = new_population

sample = iris.batched_sample(BATCH_SIZE)
x, y = sample[:, :4], sample[:, 4]
print(f"Accuracy: {score_genome_accuracy(genome, x, y)}")

In [None]:
for i in population:
    sample = iris.batched_sample(32)
    x, y = sample[:, :4], sample[:, 4]
    print(f"Accuracy: {score_genome_accuracy(population[0], x, y)}")