In [2]:
# Imports
import random
import math

## Helper functions

In [4]:
def ReLU(x):
    if x < 0:
        return 0
    return x

def sigmoid(x):
    return 1/(1 + math.exp(-x))

## NEAT Algorithm

Original paper: https://nn.cs.utexas.edu/downloads/papers/stanley.ec02.pdf


#### Network Encoding

In the NEAT algorithm each neuron in the neural network is represented as:

![image](assets/genotype.png)

*Source: [Evolving Neural Networks through Augmenting Topologies](https://nn.cs.utexas.edu/downloads/papers/stanley.ec02.pdf)*


#### Mutation

There are 2 types of mutation:
- Add connection
- Add node

![image](assets/mutation.png)

*Source: [Evolving Neural Networks through Augmenting Topologies](https://nn.cs.utexas.edu/downloads/papers/stanley.ec02.pdf)*


#### Crossover

![image](assets/mutation.png)

*Source: [Evolving Neural Networks through Augmenting Topologies](https://nn.cs.utexas.edu/downloads/papers/stanley.ec02.pdf)*


In [9]:
class InnovationNumber:
    def __init__(self):
        self.value = 0
    
    def pop(self):
        """ Returns the innovation number and increments it automatically """
        tmp = self.value
        self.value += 1
        return tmp

class NodeGene:
    def __init__(self, layer, activation, bias):
        self.layer = layer  # The layer to which the node belongs
        self.activation = activation    # Activation function
        self.bias = bias

class ConnectionGene:
    def __init__(self, in_node: NodeGene, out_node: NodeGene, weight: float,  innov: int, enabled: bool = True):
        self.in_node = in_node
        self.out_node = out_node
        self.weight = weight
        self.enabled = enabled  # Whether the connection is enabled or not
        self.innov = innov  # Innovation number described in the paper
    
    def copy(self):
        return ConnectionGene(self.in_node, self.out_node, self.weight, self.enabled, self.innov)

class Genome:
    def __init__(self, nodes, connections):
        self.nodes = nodes
        self.connections = connections
    
    def mutate_add_connection(self, innov: InnovationNumber):
        node1, node2 = random.sample(self.nodes, 2)
        if node1 == node2:
            return
        for c in self.connections:
            if c.in_node == node1 and c.out_node == node2:
                return
        
        new_conn = ConnectionGene(node1, node2, random.uniform(-1, 1), innov.pop(), True)
        self.connections.append(new_conn)
    
    def mutate_add_node(self, innov: InnovationNumber):
        connection = random.sample(self.connections)
        connection.enabled = False  # Disable the connection

        new_node = NodeGene("hid", ReLU, random.uniform(-1,1))

        conn1 = ConnectionGene(connection.in_node, new_node, 1, innov.pop(), True)
        conn2 = ConnectionGene(new_node, connection.out_node, connection.weight, innov.pop(), True)

        self.nodes.append(new_node)
        self.connections.extend([conn1, conn2])

    def __topological_sort(self, edges):
        visited = set()
        order = []

        def visit(n):
            if n in visited:
                return
            visited.add(n)
            for m in edges[n]:
                visit(m)
            order.append(n)

        for node in edges:
            visit(node)

        return order[::-1]

    def evaluate(self, input_values: list[float]):
        node_values = {}
        node_inputs = {n: [] for n in self.nodes}

        input_nodes = [n for n in self.nodes if n.layer == "input"]
        output_nodes = [n for n in self.nodes if n.layer == "output"]

        # Verify that the number of input values matches the number of input nodes
        if len(input_nodes) != len(input_values):
            raise ValueError("Number of inputs doesn't match number of input nodes")
        
        # Assign input values
        for node, val in zip(input_nodes, input_values):
            node_values[node] = val

        edges = {}
        for n in self.nodes:
            edges[n] = []
        
        for conn in self.connections:
            if conn.enabled:
                edges[conn.in_node].append(conn.out_node)
                node_inputs[conn.out_node].append(conn)
        
        sorted_nodes = self.__topological_sort(edges)

        for node in sorted_nodes:
            if node in node_values:
                continue
            
            incoming = node_inputs[node]
            total_input = sum(
                node_values[c.in_node] * c.weight for c in incoming
            ) + node.bias

            node_values[node] = node.activation(total_input)

        return [node_values[n] for n in output_nodes]


def crossover(parent1: Genome, parent2: Genome) -> Genome:
    """ Crossover assuming parent1 is the fittest parent """
    # Build maps of genes keyed by innovation number
    genes1 = {g.innov: g for g in parent1.connections}
    genes2 = {g.innov: g for g in parent2.connections}

    offspring_connections = []
    offspring_nodes = set()

    # Combine all innovation numbers
    all_innovs = set(genes1.keys()) | set(genes2.keys())

    for innov in sorted(all_innovs):
        gene1 = genes1.get(innov)
        gene2 = genes2.get(innov)
        
        if gene1 and gene2:  # Matching genes
            selected = random.choice([gene1, gene2])
            gene_copy = selected.copy()

        elif gene1 and not gene2:   # Disjoint gene (from the fittest parent)
            gene_copy = gene1.copy()
        
        else:   # Not taking disjoint genes from less fit parent
            continue

        offspring_connections.append(gene_copy)
        offspring_nodes.add(gene_copy.in_node)
        offspring_nodes.add(gene_copy.out_node)

    offspring_nodes = list(offspring_nodes) # Remove the duplicates
    return Genome(offspring_nodes, offspring_connections)


def distance(genome1: Genome, genome2: Genome, c1=1.0, c2=1.0, c3=0.4):
    genes1 = {g.innov: g for g in genome1.connections}
    genes2 = {g.innov: g for g in genome2.connections}

    innovations1 = set(genes1.keys())
    innovations2 = set(genes2.keys())

    matching = innovations1 & innovations2
    disjoint = (innovations1 ^ innovations2)
    excess = set()

    max_innov1 = max(innovations1) if innovations1 else 0
    max_innov2 = max(innovations2) if innovations2 else 0
    max_innov = min(max_innov1, max_innov2)

    # Separate excess from disjoint
    for innov in disjoint.copy():
        if innov > max_innov:
            excess.add(innov)
            disjoint.remove(innov)

    # Weight difference of matching genes
    if matching:
        weight_diff = sum(
            abs(genes1[i].weight - genes2[i].weight) for i in matching
        )
        avg_weight_diff = weight_diff / len(matching)
    else:
        avg_weight_diff = 0

    # Normalize by N
    N = max(len(genome1.connections), len(genome2.connections))
    if N < 20:  # NEAT typically uses 1 if N < 20
        N = 1

    delta = (c1 * len(excess)) / N + (c2 * len(disjoint)) / N + c3 * avg_weight_diff
    return delta




In [14]:
class Species:
    def __init__(self, representative: Genome):
        self.representative = representative
        self.members = [representative]
        self.adjusted_fitness = 0
        self.best_fitness = -float('inf')
        self.stagnant_generations = 0
    
    def add_member(self, genome: Genome):
        self.members.append(genome)
    
    def clear_members(self):
        self.members = []
    
    def compute_adjusted_fitness(self, fitnesses: dict):
        self.adjusted_fitness = 0
        for genome in self.members:
            self.adjusted_fitness += fitnesses[genome] / len(self.members)


class Speciator:
    def __init__(self, compatibility_threshold=3.0):
        self.species = []
        self.compatibility_threshold = compatibility_threshold
    
    def speciate(self, population: list[Genome], fitnesses: dict):
        """ Group genomes into species based on distance """
        # Clear all species for the new generation
        for s in self.species:
            s.clear_members()
        
        for genome in population:
            found_species = False
            for species in self.species:
                if distance(genome, species.representative) < self.compatibility_threshold:
                    species.add_member(genome)
                    found_species = True
                    break
            
            if not found_species:
                new_species = Species(representative=genome)
                self.species.append(new_species)

        # Remove empty species
        self.species = [s for s in self.species if s.members]

        # Recompute adjusted fitness
        for s in self.species:
            s.compute_adjusted_fitness(fitnesses)

    def get_species(self):
        return self.species






In [12]:
input1 = NodeGene("input", activation=lambda x: x, bias=0)
input2 = NodeGene("input", activation=lambda x: x, bias=0)
hidden = NodeGene("hid", activation=sigmoid, bias=0.1)
output = NodeGene("output", activation=sigmoid, bias=0.2)

conn1 = ConnectionGene(input1, hidden, weight=0.5, innov=1)
conn2 = ConnectionGene(input2, hidden, weight=0.5, innov=2)
conn3 = ConnectionGene(hidden, output, weight=1.0, innov=3)

nodes = [input1, input2, hidden, output]
connections = [conn1, conn2, conn3]
genome = Genome(nodes, connections)

result = genome.evaluate([1.0, -1.0])
print("Genome output:", result)


Genome output: [0.6737025243079816]
