# Algorytm genetyczny - implementacja NEAT

### Dodane pozostałe operatory mutacji, funkcja ewaluacyjna oraz krzyżowanie osobników (numery sekwencyjne już były).

## Aleksandra Wichrowska, 291140

"Potwierzam samodzielność poniższej pracy i niekorzystanie przeze mnie z niedozwolonych źródeł." 

~ Aleksandra Wichrowska

In [11]:
import numpy as np
from copy import deepcopy

### Node gene class

In [12]:
class Node:
    
    def __init__ (self, node_id, node_type):
        """
        node_id [int]: global ID of node
        node_type [str]: one of {'input','output','hidden'}
        """
        self.node_id = node_id 
        self.node_type = node_type

In [13]:
def equal_nodes(node1, node2):
    if node1.node_id == node2.node_id:
        return True
    return False

### Connection gene class

In [26]:
class Connection:
    
    def __init__(self, node_in, node_out, weight, enabled, innov):
        """
        node_in [Node]: first node in connections
        node_out [Node]: second node in connections
        weight [float]: connection weight
        enabled [boolean]: True if connections is enabled
        innov [int]: gloabl ID of connection
        """
        self.node_in = node_in
        self.node_out = node_out
        self.weight = weight
        self.enabled = enabled
        self.innov = innov
    
    def get_connection_nodes_ids(self):
        """
        returns tuple of connection nodes ids
        """
        return (self.node_in.node_id, self.node_out.node_id)

In [27]:
def equal_connections(connection1, connection2):
    if connection1.get_connection_nodes_ids() == connection2.get_connection_nodes_ids():
        return True
    return False

### Individual class

In [67]:
class Individual:
    
    def __init__ (self, nodes, connections):
        """
        nodes [list]: list of nodes
        connections [list]: list of connections
        """
        self.nodes = nodes
        self.connections = connections
        
    def get_connections_list(self):
        """
        returns list of individual's connections, 
        each connections is represented as a tuple of nodes ids
        """
        connections_list = []
        for connection in self.connections:
            connections_list.append(connection.get_connection_nodes_ids())
        return connections_list
    
    def get_enabled_connections(self):
        """
        returns list of individual's enabled connections 
        """
        connections_list = []
        for connection in self.connections:
            if connection.enabled: connections_list.append(connection)
        return connections_list
    
    def get_inputs(self):
        inputs = []
        for node in self.nodes:
            if node.node_type == 'input': inputs.append(node)
        return inputs
    
    def get_outputs(self):
        outputs = []
        for node in self.nodes:
            if node.node_type == 'output': outputs.append(node)
        return outputs    
    
    def n_nodes_in(self):
        n_nodes = {}
        for node in self.nodes:
            n_nodes[node.node_id] = len([x for x in  self.get_connections_list () if x[1] == node.node_id])    
        return n_nodes
    
    def connections_from_node(self, node):
        connections_from_node = []
        for connection in self.connections:
            if connection.get_connection_nodes_ids()[0] == node.node_id:
                connections_from_node.append(connection)
        return connections_from_node 
                
    def get_neuron(self, node_id):
        return [node for node in  self.nodes if node.node_id == node_id][0]

### Genetic algorithm (NEAT) class

In [181]:
class GeneticAlgorithm:
    
    def __init__ (self, size, inputs, outputs):
        """
        size [int]: size of population (number of individuals)
        n_in [int]: number of input nodes
        n_out [int]: number of output nodes
        """
    
        self.size = size 
        self.inputs = inputs
        self.outputs = outputs
        self.n_in = len(inputs)
        self.n_out = len(outputs)
        self.connections = {} # dictionary with tuples of connections nodes ids as keys
        self.n_nodes = 0 # counter of nodes
        self.population = self.generate_population()
        
    def generate_population(self):
        """
        return generated population,
        """
        population = []
        for i in range(self.size):
            population.append(self.generate_simple_individual())
        self.n_nodes += (self.n_in+self.n_out)
        return population
        
    def generate_simple_individual(self):
        """
        return generated individual for start population,
        individual is fully connected graph with only inputs and outputs nodes 
        """
        nodes = []
        connections = []
        max_id = 0
        for i in range(self.n_in):
            nodes.append(Node(max_id + 1, 'input'))
            max_id += 1
        for i in range(self.n_out):
            nodes.append(Node(max_id + 1, 'output'))
            max_id += 1 
        for i in range(self.n_in):
            for j in range(self.n_out):
                connection_nodes = (nodes[i].node_id, nodes[self.n_in+j].node_id)
                if not connection_nodes in self.connections.keys():
                    self.connections[connection_nodes] = {'innov': len(self.connections)+1, 'node_between':None}
                connection = Connection(nodes[i], nodes[self.n_in+j], np.random.normal(), True, 
                                       self.connections[connection_nodes]['innov'])
                connections.append(connection)
            
        new_individual = Individual(nodes, connections)
        return new_individual
        
    def evaluate(self):
        return [self.fitness(individual) for individual in self.population]
    
    def eval_function(self, individual):
        inputs = individual.get_inputs()
        nodes_sum = {}
        for i in inputs:
            nodes_sum[i.node_id] = 1
            
        nodes_enter = individual.n_nodes_in()
        nodes_entered = {}

        queue = [i for i in inputs]

        for node in queue:
            #print(node.node_id, [x.get_connection_nodes_ids() for x in individual.connections_from_node(node)])
            for connection in individual.connections_from_node(node):
                id = connection.node_out.node_id
                #print(node.node_id, connection.node_in.node_id, id)
                if not id in nodes_entered.keys():
                    nodes_entered[id] = 0
                nodes_entered[id] += 1
                # cur_sum = self.activation(nodes_sum[connection.node_in])
                cur_sum = nodes_sum[connection.node_in.node_id]
                if not id in nodes_sum.keys():
                    nodes_sum[id] = 0
                nodes_sum[id] += cur_sum * connection.weight
                if nodes_entered[id] == nodes_enter[id]:
                    queue.append(connection.node_out)
                    
        return [nodes_sum[node.node_id] for node in individual.get_outputs()]
            
    def fitness(self, individual):
        return sum((np.array(self.eval_function(individual))-np.array(self.outputs))**2)
        
    ##################################
    ############ MUTATION ############
    ##################################    
    
    def mutate_population(self):
        individual = self.population[np.random.choice(len(self.population))]
        self.add_connection_mutation(individual)
        individual = self.population[np.random.choice(len(self.population))]
        self.add_node_mutation(individual)
        individual = self.population[np.random.choice(len(self.population))]
        self.change_weight_mutation(individual)
        individual = self.population[np.random.choice(len(self.population))]
        self.disable_connection_mutation(individual)
    
                                     
    def add_connection_mutation(self, individual):
        """
        add new connections between existing nodes
        
        individual [Individual]: individual selected for mutation
        """
        for trial in range(50):
            n1, n2 = np.random.choice(individual.nodes,2, False)
            if n1.node_type == 'input' and n2.node_type == 'input':
                continue
            elif n1.node_type == 'output' and n2.node_type == 'output':
                continue
            elif (n1.node_id,n2.node_id) in individual.get_connections_list():
                continue
            else:    
                if not (n1.node_id, n2.node_id) in self.connections:
                    self.connections[(n1.node_id,n2.node_id)] = {'innov': len(self.connections)+1, 'node_between':None}
                innov = self.connections[(n1.node_id,n2.node_id)]['innov']
                connection = Connection(n1,n2,np.random.normal(), True, innov)
                individual.connections.append(connection)
                return
        print('Failed connection adding')
        return
    
    def add_node_mutation(self, individual):
        """
        add new node on existing connection
        
        individual [Individual]: individual selected for mutation
        """
        
        # choose enabled connection on which will be added new node
        connection = np.random.choice(individual.get_enabled_connections(),1)[0]
        connection.enabled = False        
        
        # check if on this connections in other individuals exists node
        # if yes - get this node id and connections (to and from it) innov
        if self.connections[connection.get_connection_nodes_ids()]['node_between'] != None:
            new_node_id = self.connections[connection.get_connection_nodes_ids()][1]
            connection1_innov = self.connections[(connection.node_in.node_id, new_node_id)]['innov']
            connection2_innov = self.connections[(new_node_id,connection.node_out.node_id)]['innov']
            
        # if no - create new global node and two new global connections
        else:
            self.n_nodes += 1
            new_node_id = self.n_nodes
            self.connections[connection.get_connection_nodes_ids()]['node_between'] = new_node_id
            connection1_innov = len(self.connections) + 1
            self.connections[(connection.node_in.node_id, new_node_id)] = {'innov': connection1_innov, 'node_between':None}
            connection2_innov = len(self.connections) + 1
            self.connections[(new_node_id, connection.node_out.node_id)] = {'innov': connection2_innov, 'node_between':None}
    
        new_node = Node(new_node_id, 'hidden')
        individual.nodes.append(new_node)
        
        new_connection1 = Connection(connection.node_in, new_node, 1, True, connection1_innov)
        individual.connections.append(new_connection1)
        
        new_connection2 = Connection(new_node, connection.node_out, connection.weight, True, connection2_innov)
        individual.connections.append(new_connection2)
    
    def change_weight_mutation(self, individual):
        """
        change one randomly selected connection
        
        individual [Individual]: individual selected for mutation
        """        
        connection = np.random.choice(individual.get_enabled_connections(),1)[0]
        rand = np.random.uniform()
        
        # with probability 10% set new weight
        if rand <= 0.1:
            connection.weight = np.random.normal()
        
        # with probability 90% perturbate weight
        else:
            connection.weight += 0.1*np.random.normal()

    def disable_connection_mutation(self, individual):
        """
        disable one randomly selected connection
        
        individual [Individual]: individual selected for mutation
        """        
        connection = np.random.choice(individual.get_enabled_connections,1)[0]
        connection.enabled = False
    
    ##################################
    ########### CROSS OVER ###########
    ##################################
    
    def cross_over_population(self):
        index1, index2 = np.random.choice(range(len(self.population)), 2, replace=False)
        individual1 = self.population[index1]
        individual2 = self.population[index2]                                     

        return individual1, individual2
    
    def cross_over_individuals(self, individual1, individual2):
        
        new_individual_nodes = deepcopy(individual1.nodes)
        for node2 in individual2.nodes:
            if not any([equal_nodes(node1, node2) for node1 in individual1.nodes]):
                new_individual_nodes.append(node2)    

        new_individual_connections = deepcopy(individual1.connections)
        for connection2 in individual2.connections:
            boolean = 1
            for connection1 in new_individual_connections:
                if equal_connections(connection1, connection2): 
                    r = np.random.choice(2)
                    if r == 1:
                        connection1.weight = connection2.weight
                    boolean = 0
            if boolean == 1:
                new_individual_connections.append(connection2)

        new_individual = Individual(new_individual_nodes, new_individual_connections)
        return new_individual
        
    ##################################
    ############ TRAINING ############
    ##################################
        
    def train_one_epoch(self):
        self.cross_over_population()
        self.mutate_population()
        
    def train_population(self, epochs=20):
        results = [min(self.evaluate())]
        for i in range(epochs):
            self.train_one_epoch()
            results.append(min(self.evaluate()))
        return results 

In [182]:
ga = GeneticAlgorithm(10,[1,1,1],[10,5])

In [183]:
ga.evaluate()

[92.69824186116561,
 139.52678570691074,
 176.07552343244902,
 101.5404290294521,
 130.27233821969847,
 152.2622735048724,
 111.91880817190179,
 92.17816802215,
 45.600459932485755,
 116.83629616596212]