In [3]:
# New approach
import typing
import copy
import math
import random

"""
XOR using neat
"""

""" spotkanie 9 listopad
    1) dokończyć swoją implementację
    2) gymnasium + NEAT
    3) zastąpić sumowanie filtrem kalmana
"""
import random

def relu(x):
    if x > 0:
        return x
    else:
        return 0

WEIGHT_MUTATION_CHANCE = 0.8
NEW_CONNECTION_CHANCE = 0.05
NEW_NODE_CHANCE = 0.03
ACTIVATION_FUNC = relu

C1 = 1.0
C2 = 1.0
C3 = 0.4
SPECIES_THRESHOLD = 3.0  # Threshold for determining if two genomes belong to the same species


In [4]:

class Node:
    def __init__(self, node_id, node_type):
        self.node_id = node_id
        self.node_type = node_type  #  'input' | 'hidden' | 'output'

    def __repr__(self):
        return f"{self.node_type} Node {self.node_id} "


class Connection:
    def __init__(self, in_node, out_node, weight, enabled=True, innov_number=None):
        self.in_node = in_node
        self.out_node = out_node
        self.weight = weight
        self.enabled = enabled
        self.innov_number = innov_number  # crossover

    def __repr__(self):
        return f"Conn from {self.in_node} to {self.out_node}, weight:{self.weight}, en:{self.enabled}, innv:{self.innov_number}"


In [6]:

class Species:
    def __init__(self, representative):
        self.members = [representative]
        self.representative = representative

    def add_member(self, genome):
        self.members.append(genome)

    def clear_members(self):
        self.members = []

    def adjust_fitnesses(self):
        total_fitness = sum([genome.fitness for genome in self.members])
        for genome in self.members:
            genome.fitness /= total_fitness

    def set_representative(self):
        self.representative = random.choice(self.members)


def speciate(genomes, species):
    for spec in species:
        spec.clear_members()

    for genome in genomes:
        found_species = False
        for spec in species:
            if genome.compatibility_distance(spec.representative) < SPECIES_THRESHOLD:
                spec.add_member(genome)
                found_species = True
                break
        if not found_species:
            new_species = Species(genome)
            species.append(new_species)

    # Remove empty species
    species = [spec for spec in species if spec.members]

    return species


In [73]:

class Genome:
    def __init__(self):
        self.nodes = []
        self.connections = []
        
        self.next_node_id = 0
        self.next_innov_number = 0
    
    @staticmethod
    def create_from_shape(shape:tuple[int,int]):
        inputs,outputs = shape
        g = Genome()

        for i in range(inputs):
            g.add_node('input')

        for o in range(outputs):
            g.add_node('output')

        ins = list(filter(lambda x:x.node_type == 'input', g.nodes))
        outs = list(filter(lambda x:x.node_type == 'output', g.nodes))
        for i in ins:
            for o in outs:
                g.add_connection(i.node_id, o.node_id, 1.0)
        return g

    def add_node(self, node_type):
        node = Node(self.next_node_id, node_type)
        self.nodes.append(node)
        self.next_node_id += 1

    def evaluate(self, input):
        ff_in = {}
        for (i, n) in enumerate(input):
            ff_in[i] = n
        return feed_forward(self, ff_in, ACTIVATION_FUNC)

    def add_connection(self, in_node, out_node, weight):
        connection = Connection(in_node, out_node, weight, innov_number=self.next_innov_number)
        self.connections.append(connection)
        self.next_innov_number += 1

    def mutate(self):
        for connection in self.connections:
            if random.random() < WEIGHT_MUTATION_CHANCE:
                connection.weight += random.uniform(-1, 1) 

        if random.random() < NEW_CONNECTION_CHANCE:
            node1 = random.choice(self.nodes)
            node2 = random.choice(self.nodes)
            if node1.node_type != 'output' and node2.node_type != 'input' and node1 != node2:
                self.add_connection(node1.node_id, node2.node_id, random.uniform(-1, 1))

        if random.random() < NEW_NODE_CHANCE:
            connection = random.choice(self.connections)
            connection.enabled = False

            new_node = self.add_node('hidden')
            self.add_connection(connection.in_node, new_node.node_id, 1.0)
            self.add_connection(new_node.node_id, connection.out_node, connection.weight)

    def compatibility_distance(self, other_genome):
        matching_weight_diffs = []
        disjoint_count = 0
        excess_count = 0

        self_genes = {c.innov_number: c for c in self.connections}
        other_genes = {c.innov_number: c for c in other_genome.connections}

        for innov in set(self_genes.keys()).union(other_genes.keys()):
            if innov in self_genes and innov in other_genes:
                matching_weight_diffs.append(abs(self_genes[innov].weight - other_genes[innov].weight))
            elif innov in self_genes:
                if innov > max(other_genes.keys()):
                    excess_count += 1
                else:
                    disjoint_count += 1
            elif innov in other_genes:
                if innov > max(self_genes.keys()):
                    excess_count += 1
                else:
                    disjoint_count += 1

        avg_weight_diff = sum(matching_weight_diffs) / len(matching_weight_diffs) if matching_weight_diffs else 0.0
        N = max(len(self.connections), len(other_genome.connections))
        N = max(N, 1)

        distance = (C1 * disjoint_count + C2 * excess_count) / N + C3 * avg_weight_diff
        return distance



In [42]:

import numpy as np
def feed_forward(genome:Genome, input_values:dict[int,float], activation_func):
    """
    O boze o kurwa
    To moze mieć rekurencyjne połączenia
    TODO zaimplementować to. Albo chociaz przekminić sposoby
    """
    all_output_nodes = set([n for n in genome.nodes if n.node_type == 'output'])
    valid_connections = [ c for c in genome.connections if c.enabled] 
    dependency_tree = {}
    values = input_values 

    for c in valid_connections:
        if dependency_tree.get(c.out_node) is not None:
            dependency_tree[c.out_node].add(c)
        else:
            dependency_tree[c.out_node] = set([c])
    print('after init, ', dependency_tree)

    def eval_node(node_id):
        if(values.get(node_id) is not None):
            return values.get(node_id)
        else:
            deps = []
            dependants = dependency_tree.get(node_id)
            assert(dependants is not None) 
            for conn in dependants:
                deps.append(eval_node(conn.in_node) * conn.weight)
            v = activation_func(sum(deps))
            values[node_id] = v
            return v
    
    return [eval_node(o.node_id) for o in all_output_nodes]


In [62]:
INPUT = [[0,0], [0,1], [1,0], [1,1]]
OUTPUT = [0, 1, 1, 0]
import copy
def eval(genome):
    """
    Evaluation function as specified in the paper. TODO: research why exactly is it like that.
    """
    result = [feed_forward(genome, input_values= dict(enumerate(iv)), activation_func=ACTIVATION_FUNC)[0] for iv in INPUT]
    assert(len(OUTPUT) == len(result))
    error = sum([ r - o for (r,o) in zip(result, OUTPUT)])
    y = (4 - error) ** 2
    return y


In [71]:

class Population:
    """
    Toplevel object
    """
    population = 40
    max_iters = 100
    def __init__(self, shape):
        self.generation = 0
        self.genomes = []
        default = Genome.create_from_shape(shape)
        for _ in range(self.population):
            self.genomes.append(copy.deepcopy(default))
        self.species = [Species(self.genomes[0])]

    def run(self):
        self.generation += 1
        results = map(eval, self.genomes)
        top_results = sorted(results, reverse=True)
        print(f"generation {self.generation}, top 5 results are {top_results[0:5]}\n")
        for (g,r) in zip(self.genomes, results): 

            g.fitness = r 
        speciate(self.genomes, self.species)


    def run_all(self):
        while self.generation < self.max_iters: 
            self.run()


In [74]:
p = Population((2,1))
p.run()

generation 1, top 5 results are [4.0, 4.0, 4.0, 4.0, 4.0]



In [85]:
p.run()

generation 9, top 5 results are [4.0, 4.0, 4.0, 4.0, 4.0]



In [87]:
len(p.species[0].members)

40

In [None]:
class Problem:
    def __init__(self, inputs, outputs, compute_fitness):
        if len(inputs) != len(outputs): raise Exception("Number of inputs and outputs dont match")


In [None]:
# One of the first tests, checking if structures were building correctly
gen = Genome()
gen.add_node('input') #     0
gen.add_node('input') #     1
gen.add_node('hidden')#     2
gen.add_node('output')#     3
gen.add_node('output')#     4
gen.add_connection(0,4, weight=0.8)
gen.add_connection(1,2, weight=0.8)
gen.add_connection(0,3, weight=0.8)
gen.add_connection(1,4, weight=0.8)
gen.add_connection(2,3, weight=0.8)
result = feed_forward(gen, {0:0.25, 1:0.5}, relu)
print("RESULT:", result)
