Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB9

Write a local-search algorithm (eg. an EA) able to solve the *Problem* instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.

### Deadlines:

* Submission: Sunday, December 3 ([CET](https://www.timeanddate.com/time/zones/cet))
* Reviews: Sunday, December 10 ([CET](https://www.timeanddate.com/time/zones/cet))

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [776]:
from random import choices, uniform
import random
from dataclasses import dataclass, field
from copy import deepcopy
from typing import Callable, List


import lab9.lab9_lib as lab9_lib

In [777]:
fitness = lab9_lib.make_problem(10)
for n in range(10):
    ind = choices([0, 1], k=50)
    print(f"{''.join(str(g) for g in ind)}: {fitness(ind):.2%}")

print(fitness.calls)

11000010111110111110010001111010110111111110010110: 9.11%
10010101101000000111101111110011111000111101001001: 9.13%
01111110001001110111010110101110111110000101001011: 9.11%
10110011000001011000110111110101100000010010100010: 17.56%
00110111000110001011010011111101111101111111011100: 39.33%
10110001101010111000001110000001110001000011011011: 9.16%
01111011010001001001011110000011111101100100001100: 15.33%
01011010101000000010111011101010010101011100100111: 15.34%
00011001000101111010101100010010001000111000100010: 7.33%
11100011001101101111011011000110101100000101110001: 15.33%
10


In [778]:
@dataclass
class Individual:
    genome : tuple[int]
    n_loci: int
    fitness: float
    fitness_func: Callable[[tuple[int]], float]
    
    def __init__(self, FITNESS_FUNC, n_loci, genome = None):
        self.n_loci = n_loci
        if genome is None:
            self.genome = random.choices([0, 1], k=n_loci)
        else:
            self.genome = genome
        self.fitness_func = FITNESS_FUNC
        self.fitness = self.fitness_func(self.genome)
    
    def __repr__(self):
        return f"{[str(val) for val in self.genome]}"
    
    @staticmethod
    def mutate(individual: "Individual"):
        points_to_mutate = int((1 - individual.fitness)*10)
        index_to_mutate = random.choices(range(individual.n_loci), k=points_to_mutate)
        new_genome = deepcopy(individual.genome)
        for i in index_to_mutate:
            new_genome[i] = 1 - new_genome[i]
        return Individual(individual.fitness_func, individual.n_loci, new_genome)
    
    @staticmethod
    def crossover_one_point(ind1: "Individual", ind2: "Individual"):
        index_to_cross = random.randrange(ind1.n_loci)
        return Individual(ind1.fitness_func, ind1.n_loci, ind1.genome[:index_to_cross] + ind2.genome[index_to_cross:])
        
    @staticmethod
    def crossover_n_point(ind1: "Individual", ind2: "Individual", n=40):
        index_list = sorted(random.sample(range(ind1.n_loci), n))
        child1_genome, child2_genome = ind1.genome[:], ind2.genome[:]
        for i in range(0, n, 2):
            start = index_list[i]
            end = index_list[i+1] if i+1 < n else len(ind1.genome)
            child1_genome[start:end] = child2_genome[start:end]
        return Individual(ind1.fitness_func, ind1.n_loci, child1_genome)
            
    @staticmethod
    def crossover_uniform(ind1: "Individual", ind2: "Individual"):
        index_list = [random.random() for _ in range(ind1.n_loci)]
        return Individual(ind1.fitness_func, ind1.n_loci, [i1 if index < .5 else i2 for i1, i2, index in zip(ind1.genome, ind2.genome, index_list)])
    
@dataclass
class Operators_agent:
    
    n_points_number: int
    crossover_success_rate : {}

    weight_vector_prob : []
    tot_crossover: int
    
    def __init__(self, n_points_number):
        self.n_points_number = n_points_number
        self.weight_vector_prob = [1, 1, 1]
        self.crossover_success_rate = {}
        self.tot_crossover = 0
        CROSSOVER_FUNCTION = ["crossover_one_point", "crossover_n_point", "crossover_uniform"]
        for func_name in CROSSOVER_FUNCTION:
            self.crossover_success_rate[func_name] = 0
    
    
    def mutate(self, individual: "Individual"):
        points_to_mutate = int((1 - individual.fitness)*(individual.n_loci)/50)
        index_to_mutate = random.choices(range(individual.n_loci), k=points_to_mutate)
        new_genome = deepcopy(individual.genome)
        for i in index_to_mutate:
            new_genome[i] = 1 - new_genome[i]
        return Individual(individual.fitness_func, individual.n_loci, new_genome)
    
    def crossover(self, ind1: "Individual", ind2: "Individual"):
        crossover_function = random.choices([Operators_agent.crossover_one_point, Operators_agent.crossover_n_point, Operators_agent.crossover_uniform], 
                                            weights=self.weight_vector_prob, 
                                            k=1)[0]
        child = crossover_function(ind1, ind2, self.n_points_number)
        self.tot_crossover += 1
        if child.fitness > ind1.fitness and child.fitness > ind2.fitness:
            self.crossover_success_rate[crossover_function.__name__] += 1
            if self.tot_crossover % 5 == 0:
                i = 0
                for value in self.crossover_success_rate.values():
                    self.weight_vector_prob[i] = value/self.tot_crossover
                    i += 1
        return child
    
    def crossover_one_point(ind1: "Individual", ind2: "Individual", n):
        index_to_cross = random.randrange(ind1.n_loci)
        child = Individual(ind1.fitness_func, ind1.n_loci, ind1.genome[:index_to_cross] + ind2.genome[index_to_cross:])
        return child
        
    def crossover_n_point(ind1: "Individual", ind2: "Individual", n):
        index_list = sorted(random.sample(range(ind1.n_loci), n))
        child1_genome, child2_genome = ind1.genome[:], ind2.genome[:]
        for i in range(0, n, 2):
            start = index_list[i]
            end = index_list[i+1] if i+1 < n else len(ind1.genome)
            child1_genome[start:end] = child2_genome[start:end]
        return Individual(ind1.fitness_func, ind1.n_loci, child1_genome)
                    
    def crossover_uniform(ind1: "Individual", ind2: "Individual", n):
        index_list = [random.random() for _ in range(ind1.n_loci)]
        return Individual(ind1.fitness_func, ind1.n_loci, [i1 if index < .5 else i2 for i1, i2, index in zip(ind1.genome, ind2.genome, index_list)])

# Mutation rate mapping
It is a function used to change dinamically the mutation rate, by progressively increase it as the average fitness scores increase

In [779]:
MUTATION_RATE_RANGE = [0.1, 0.2, 0.3, 0.4, 0.5]
def mutation_rate_selection(fitness_average: float):
    index = int((fitness_average) / 0.2)
    return MUTATION_RATE_RANGE[index]


# Parent selection function
This function compute a stochastic universal sampling for parent selection and use as parameter the reduction factor "parent_selection_rate" to indicates how much to reduce the initial populations
The resulting population will be the one where the tournament takes place

In [780]:
def stochastic_universal_sampling(population: List[Individual], parent_selection_rate: int) -> List[Individual]:
        
    total_fitness = sum(ind.fitness for ind in population)
    pointer_distance = total_fitness / len(population)
    num_selected_parents = int(len(population)/parent_selection_rate)
    start = uniform(0, pointer_distance)
    pointers = [start + i * pointer_distance for i in range(num_selected_parents)]

    new_population = []
    current_index = 0
    for pointer in pointers:
        while pointer > 0:
            pointer -= population[current_index].fitness
            current_index = (current_index + 1) % len(population)
        new_population.append(population[current_index])
        if len(new_population) == num_selected_parents:
            break 

    return new_population

# Tournament function
It implements a tournament selection to take a parent to use for recombination or reproduction

In [781]:
def select_parent(parents: list[Individual], size: int):
    pool = choices(parents, k=size)
    winner = max(pool, key=lambda ind: ind.fitness)
    return winner

# Offspring generation function
It generates the offspring by apply, using the tournament function to select the parent each iteration, a recombination or mutation depending on the mutation rate

In [782]:
def offspring_generation(parents: List[Individual], populations_number: int, offspring_rate: int, operator_agent: "Operators_agent", mutation_rate: int, tournament_size: int) -> List[Individual]:
    offspring = []
    offspring_length = int(populations_number*offspring_rate)
    
    for _ in range(offspring_length):
        p = select_parent(parents, tournament_size)
        if random.random() < mutation_rate:
            p2 = select_parent(parents, tournament_size)
            offspring.append(operator_agent.crossover(p, p2))
        else:
            offspring.append(Individual.mutate(p))
                
    return offspring

# Evolutionary algorithm
It implements all the evolutionary cycle
The principal steps are:
* Initialization
    * Initializes the fitness_function with the specified instance
    * Generates the initial population randomically
    * Initializes the mutation rate 
* For each generation:
    * It first applies elitism to select the best parent, to avoid that it will not be selected in the consequential stochastic selection
    * Then it uses a stochastic universal sampling to select a subset of the initial parents, based on the parent selection rate parameter, that will be used then
    * It generates the offspring population, with a size specifying by the offspring rate
    * It selects the best individual considering the union of both selected parents and offspring
    * It checks if the best individual has reached the fitness score goal
    * It changes the mutation rate by check the average fitness score
* In the end it returns both the best individual and the number of fitness calls 

In [783]:
import math
import numpy as np


def launch_es_cycle(problem_instance: int, populations_number: int, n_loci: int, generations: int, parent_selection_rate: int, offspring_rate: int, number_crossover_points: int, tournament_size: int, mutation_rate: int):
    
    fitness_func = lab9_lib.make_problem(problem_instance)
    population = [Individual(fitness_func, n_loci) for _ in range(populations_number)]
    operator_agent = Operators_agent(number_crossover_points)
    
    for gen in range(generations):
        best_parent = max(population, key=lambda p: p.fitness)
        population.remove(best_parent)
        selected_parents = stochastic_universal_sampling(population, parent_selection_rate)
        selected_parents.append(best_parent)
        offspring = offspring_generation(selected_parents, populations_number, offspring_rate, operator_agent, mutation_rate, tournament_size)
        population = sorted(selected_parents+offspring, key=lambda p: p.fitness, reverse=True)[:populations_number]
        if math.isclose(1, population[0].fitness):
            break
        standard_deviation = np.std([ind.fitness for ind in population])
        if standard_deviation < 0.1:
            mutation_rate += standard_deviation/2
        elif standard_deviation > 0.3:
            mutation_rate -= standard_deviation/2
    
    
    return population[0], fitness_func.calls

# Parameters tuning
This code has the purpose to tune some parameters, considering a small populations size and generations number, to find the best combination that will be used then to test the algorithm
The parameters are:
* OFFSPRING_RATE: It indicates a multiplicative factor, with respect to the population size, that indicates how big the offspring population will be (Es: with an offspring_rate of 6 and a population number of 100, the resulting offspring population will be composed by 6*100 individuals)
* RECOMBINATION_FUNCTIONS: It contains an array of 3 crossover function used for the recombination operator
* NUMBER_CROSSOVER_POINTS: It contains an array of integer that indicates the point in which the parents will be split in the crossover_n_point function
* PARENT_SELECTION_RATE: It contains a divisor factor, with respect to the population size, that indicates how much the parent population will be decimated (Es: with a parent_selection_rate of 3 and a population number of 300, the resulting population will be composed by 300/3
* TOURNAMENT_SIZE: It indicates how much is the size of the array of individuals selected to compete in the tournament selection

In [784]:
import itertools

LOCI = 1000
PROBLEM_INSTANCES = [1, 2, 5, 10]

GENERATIONS_TEST = 200
POPULATIONS_NUMBER_TEST = 50

OFFSPRING_RATE = 2

RECOMBINATION_FUNCTIONS = [Individual.crossover_one_point, Individual.crossover_n_point, Individual.crossover_uniform]
NUMBER_CROSSOVER_POINTS = [10, 30, 100]
PARENT_SELECTION_RATE = [1, 3, 5]
TOURNAMENT_SIZE = [5, 15, 30]
MUTATION_RATES = {1: 0.8, 2: 0.2, 5: 0.2, 10: 0.2}


parameter_combinations = list(itertools.product(PROBLEM_INSTANCES, NUMBER_CROSSOVER_POINTS, PARENT_SELECTION_RATE, TOURNAMENT_SIZE))

best_parameters = {}
best_individuals = {}

for instance, n_points_cr, p_rate, t_size in parameter_combinations:
    individual, calls = launch_es_cycle(instance, POPULATIONS_NUMBER_TEST, LOCI, GENERATIONS_TEST, p_rate, OFFSPRING_RATE, n_points_cr, t_size, MUTATION_RATES[instance])
    if instance not in best_parameters or individual.fitness > best_individuals[instance][0].fitness:
        best_individuals[instance] = [individual, calls]
        best_parameters[instance] = [n_points_cr, p_rate, t_size]

for instance in PROBLEM_INSTANCES:
    print(f"Instance {instance}\n "
          f"Best Individual: Score: {best_individuals[instance][0].fitness}\n "              
          f"Calls: {best_individuals[instance][1]}")
    print(f"Parameters:\n "
          f"Number of points for crossover: {best_parameters[instance][0]}\n"
          f"Parent selection rate:{best_parameters[instance][1]}\n"
          f"Tournament size:{best_parameters[instance][2]}\n")


Instance 1
 Best Individual: Score: 0.902
 Calls: 20050
Parameters:
 Number of points for crossover: 30
Parent selection rate:1
Tournament size:15

Instance 2
 Best Individual: Score: 0.608
 Calls: 20050
Parameters:
 Number of points for crossover: 100
Parent selection rate:1
Tournament size:5

Instance 5
 Best Individual: Score: 0.505
 Calls: 20050
Parameters:
 Number of points for crossover: 10
Parent selection rate:1
Tournament size:15

Instance 10
 Best Individual: Score: 0.36722699999999997
 Calls: 20050
Parameters:
 Number of points for crossover: 10
Parent selection rate:1
Tournament size:5


In [None]:
FINAL_POPULATIONS_SIZE = 30
FINAL_GENERATIONS = 1000
final_individual = {}
for instance in PROBLEM_INSTANCES:
    individual, calls = launch_es_cycle(instance, FINAL_POPULATIONS_SIZE, LOCI, FINAL_GENERATIONS, best_parameters[instance][1], OFFSPRING_RATE, best_parameters[instance][0], best_parameters[instance][2], MUTATION_RATES[instance])
    final_individual[instance] = (individual, calls)
    print(f"Instance {instance}: Final Score: {final_individual[instance][0].fitness} with {final_individual[instance][1]} calls\n")
    