Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB9

Write a local-search algorithm (eg. an EA) able to solve the *Problem* instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.

### Deadlines:

* Submission: Sunday, December 3 ([CET](https://www.timeanddate.com/time/zones/cet))
* Reviews: Sunday, December 10 ([CET](https://www.timeanddate.com/time/zones/cet))

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [1047]:
import lab9_lib

import random as random
import math as math
from copy import deepcopy
from itertools import product
from dataclasses import dataclass
from typing import Optional, List

In [1048]:
#create abstract problem
fitness = lab9_lib.make_problem(5)

# Variables

In [1049]:
# GA and Island model variables
GENERATIONS_SIZE = 100_000
POPULATION_SIZE = 400
OFFSPRING_SIZE = 200
GENOTYPE_SIZE = 1000
TOURNAMENT_SIZE = 2
MUTATION_PROBABILITY = .15
MUTATION_VALUE = 0.01
EXTINTION_VALUE = 20
EXTINTION_QUANTITY = 0.8


# ONLY island Model Parameters
NUM_ISLANDS = 200
MIGRATION_INTERVAL = 5
MIGRATION_SIZE = 10

# My functions

In [1050]:

#Individual class and methods
@dataclass
class Individual:
    fitness: Optional[float] = None
    distance: Optional[float] = None
    genotype: List[int] = None


In [1051]:
#Distances methods

#for euclidan aproch
def euclidean_distance(vet_1, vet_2):
    # check that all vectors have the same dimension
    assert len(vet_1) == len(vet_2)

    # Calculate Euclidean distance
    distance = math.sqrt(sum((bit1 - bit2) ** 2 for bit1, bit2 in zip(vet_1, vet_2)))

    return distance

def euclidean_sort(population, ind_ref:Individual):
    # Create a list of tuples (index, distance) for each vector in the vector_of_vectors
    distance = [(i, euclidean_distance(ind.genotype, ind_ref.genotype)) for i, ind in enumerate(population)]

    # Sort the list of tuples by distances
    sort_distance = sorted(distance, key=lambda x: x[1])

    # Estrai gli indici ordinati
    indici_ordinati = [indice for indice, _ in sort_distance]

    # Return vectors sorted by Euclidean distance
    sort_population = [population[i] for i in indici_ordinati]

    return sort_population


def levenshtein_distance(v1, v2):
    distence = 0
    for i in range(len(v1)):
        if(v1[i] != v2[i]):
            distence = distence + 1

    return distence

def levenshtein_sort(population, ind_ref):
    
    # Create a list of tuples (index, distance) for each vector in the vector_of_vectors
    distance = [(i, levenshtein_distance(ind.genotype, ind_ref.genotype)) for i, ind in enumerate(population)]

    # Sort the list of tuples by distances
    sort_distance = sorted(distance, key=lambda x: x[1])

    # Estrai gli indici ordinati
    indici_ordinati = [indice for indice, _ in sort_distance]

    # Return vectors sorted by Euclidean distance
    sort_population = [population[i] for i in indici_ordinati]

    return sort_population

#order by euclidean
#euclidean_pop = euclidean_sort(pop, pop[0])

In [1052]:
# Selection functions
def elitism_selection(population, num_elites=1):
    # Sort by fitness in descending order
    sorted_population = sorted(population, key=lambda x: x.fitness, reverse=True)
    
    # Select the top individuals (elites)
    elites = sorted_population[:num_elites]
    if(num_elites > 1):
        return elites
    else:
        return elites[0]

def roulette_wheel_selection(population):
    # Calculate total fitness
    total_fitness = 0
    for ind in population:
        total_fitness += ind.fitness
    
    # Generate a random number between 0 and the total fitness
    spin = random.uniform(0, total_fitness)

    # Perform the selection
    current_sum = 0
    for ind in population:
        current_sum += ind.fitness
        if current_sum >= spin:
            return ind

def tournament_selection(pop):
    pool = [random.choice(pop) for _ in range(TOURNAMENT_SIZE)]
    champion = max(pool, key=lambda i: i.fitness)
    return champion

def distance_selection(pop):
    sorted_pop = sorted(pop, key=lambda i: i.distance, reverse=True)
    return sorted_pop[0]

#put all operator in a list of functions
operator_functions = [elitism_selection, roulette_wheel_selection, tournament_selection]

In [1053]:
# Mutation functions
def base_mutation(ind: Individual) -> Individual:
    offspring = deepcopy(ind)
    pos = random.randint(0, GENOTYPE_SIZE-1)
    offspring.genotype[pos] = 1 - offspring.genotype[pos]
    offspring.fitness = None
    return offspring

def swap_mutation(ind:Individual) -> Individual:
    offspring = deepcopy(ind)
    #take two casual loci
    pos_1 = random.randint(0, len(ind.genotype)-1)
    pos_2 = (pos_1 + random.randint(0,len(ind.genotype)-2) +1) % len(ind.genotype)
    #swap the two loci
    offspring.genotype[pos_1] = ind.genotype[pos_2]
    offspring.genotype[pos_2] = ind.genotype[pos_1]
    return offspring

def scramble_mutation(ind: Individual) -> Individual:
    #shift right all number in random position
    # Calculate the swap_dimension value
    scramble_dim = round( len(ind.genotype)*MUTATION_VALUE )
    #check for avoid problem with too short genotype
    if(scramble_dim < 2):
        return base_mutation(ind)
    # get the position numbers for the scramble
    random_numbers = random.sample(range(0, len(ind.genotype)), scramble_dim)
    random_numbers.sort()
    #swap the elements of the offsrping
    offspring = deepcopy(ind)
    offspring.genotype[random_numbers[0]] = ind.genotype[random_numbers[scramble_dim-1]]
    for i in range(0,scramble_dim-1):
        offspring.genotype[random_numbers[i+1]] = ind.genotype[random_numbers[i]]

    return offspring

def insert_mutation(ind: Individual) -> Individual:
    offspring = deepcopy(ind)

    # Get two casual position the first coud be at least 3 position before the end
    pos_1 = random.randint(0, len(ind.genotype)-3)
    pos_2 = random.randint(pos_1+1, len(ind.genotype)-1)

    #do the insert
    offspring.genotype[pos_1+1] = ind.genotype[pos_2]
    for i in range(pos_1+2, pos_2+1):
        offspring.genotype[i] = ind.genotype[i-1]
        
    return offspring

def inversion_mutation(ind: Individual) -> Individual:
    # Get two casual position the first coud be at least 3 position before the end
    pos_1 = random.randint(0, len(ind.genotype)-2)
    pos_2 = random.randint(pos_1+1, len(ind.genotype)-1)
    '''
    offspring =     Individual(
        genotype = ind.genotype[:pos_1]+ind.genotype[pos_2-1:pos_1-1:-1]+ ind.genotype[pos_2:],
        fitness = None,
    )
    '''
    offspring = deepcopy(ind)
    offspring.genotype[pos_1:pos_2+1] = reversed(offspring.genotype[pos_1:pos_2+1])


    return offspring

def mirror_mutation(ind: Individual) -> Individual:
    offspring = deepcopy(ind)

    offspring.genotype = offspring.genotype[::-1]

    return offspring

def clone_mutation(ind: Individual) -> Individual:
    offspring = deepcopy(ind)

    #number of items that will be cloned    
    num_clone = round( GENOTYPE_SIZE*MUTATION_VALUE)

    #take two casual loci
    pos_1 = random.randint(0, len(ind.genotype)-2)
    pos_2 = random.randint(0, len(ind.genotype)-1-num_clone)

    count = 0
    while(count < num_clone):
        new_pos = random.randint(0, len(ind.genotype)-1)
        if(offspring.genotype[new_pos] != ind.genotype[pos_1]):
            offspring.genotype[new_pos] = ind.genotype[pos_1]
            count += 1

    return offspring

def shift_mutation(ind: Individual) -> Individual:
    offspring = deepcopy(ind)

    num_shift = round( GENOTYPE_SIZE*MUTATION_VALUE)
    verse = random.choice((-1,1))
    loci_to_duplicate = random.randint(0,num_shift)*verse
    
    loci_to_duplicate = loci_to_duplicate % len(ind.genotype)  # Assicura che n sia inferiore alla lunghezza del vettore

    # Effettua lo shift a destra
    offspring.genotype = ind.genotype[loci_to_duplicate:] + ind.genotype[:loci_to_duplicate]

    return offspring

#put all mutation in a list of functions
mutation_functions = [base_mutation, swap_mutation, scramble_mutation, insert_mutation, inversion_mutation, clone_mutation, mirror_mutation]

In [1054]:
# Xover functions
def one_cut_xover(ind1: Individual, ind2: Individual) -> (Individual, Individual):
    cut_point = random.randint(0, GENOTYPE_SIZE-1)
    offspring_1 = Individual(fitness=None,
                           genotype=ind1.genotype[:cut_point] + ind2.genotype[cut_point:])
    offspring_2 = Individual(fitness=None,
                        genotype=ind2.genotype[:cut_point] + ind1.genotype[cut_point:])
    assert (len(offspring_1.genotype) == GENOTYPE_SIZE) and (len(offspring_2.genotype) == GENOTYPE_SIZE)
    return offspring_1, offspring_2

def two_cut_xover(ind1: Individual, ind2: Individual) -> (Individual, Individual):
    #check that I have two genotypes made up of at least 3 elements
    assert len(ind1.genotype) > 2 and len(ind2.genotype) > 2
    
    #fisrt cut can't be the first element and neither the 2 before the end (end included)
    first_cut = random.randint(1, len(ind2.genotype)-2)
    #second cut can't be the second and neither the 1 before the end (end included)
    second_cut = random.randint(first_cut+1, len(ind2.genotype)-1)
    #print(first_cut, second_cut)

    #create the new child with the new cut
    offspring_1 = Individual(fitness=None,
                           genotype=ind1.genotype[:first_cut] + ind2.genotype[first_cut:second_cut] + ind1.genotype[second_cut:])
    #create the new child with the new cut
    offspring_2 = Individual(fitness=None,
                           genotype=ind2.genotype[:first_cut] + ind1.genotype[first_cut:second_cut] + ind2.genotype[second_cut:])

    assert (len(offspring_1.genotype) == GENOTYPE_SIZE) and (len(offspring_2.genotype) == GENOTYPE_SIZE)
    return offspring_1, offspring_2

def three_cut_xover(ind1: Individual, ind2: Individual) -> (Individual, Individual):
    #check that I have two genotypes made up of at least 4 elements
    assert len(ind1.genotype) > 3 and len(ind2.genotype) > 3
    
    #fisrt cut can't be the first element and neither the 3 before the end (end included)
    first_cut = random.randint(1, len(ind2.genotype)-3)
    #second cut can't be the second and neither the 2 before the end (end included)
    second_cut = random.randint(first_cut+1, len(ind2.genotype)-2)
    #third cut can't be the second and neither the 1 before the end (end included)
    third_cut = random.randint(second_cut+1, len(ind2.genotype)-1)
    #print(first_cut, second_cut, third_cut)

    #create the new child with the new cut
    offspring_1 = Individual(fitness=None,
                           genotype=ind1.genotype[:first_cut] + ind2.genotype[first_cut:second_cut] +
                                    ind1.genotype[second_cut:third_cut] + ind2.genotype[third_cut:])
    #create the new child with the new cut
    offspring_2 = Individual(fitness=None,
                           genotype=ind2.genotype[:first_cut] + ind1.genotype[first_cut:second_cut] +
                                    ind2.genotype[second_cut:third_cut] + ind1.genotype[third_cut:])
    
    assert (len(offspring_1.genotype) == GENOTYPE_SIZE) and (len(offspring_2.genotype) == GENOTYPE_SIZE)
    return offspring_1, offspring_2

def uniform_xover(ind1: Individual, ind2: Individual) -> (Individual, Individual):
    #check that I have two genotypes of the same lenght
    assert len(ind1.genotype) == len(ind2.genotype) 
    
    #create the new child with the new cut
    offspring_1 = Individual(fitness=None, genotype=[])
    #create the new child with the new cut
    offspring_2 = Individual(fitness=None, genotype=[])

    #fill the two of their genotype
    for i in range(GENOTYPE_SIZE):
        if(random.choice((0, 1)) == 1):
            offspring_1.genotype.append(ind1.genotype[i])
            offspring_2.genotype.append(ind2.genotype[i])
        else:
            offspring_1.genotype.append(ind2.genotype[i])
            offspring_2.genotype.append(ind1.genotype[i])

    assert (len(offspring_1.genotype) == GENOTYPE_SIZE) and (len(offspring_2.genotype) == GENOTYPE_SIZE)
    return offspring_1, offspring_2

#put all xover in a list of functions
xover_functions = [one_cut_xover, two_cut_xover, three_cut_xover, uniform_xover]

In [1055]:
#generate population
def generate_population(popolation_dim):
    population = [
        Individual(
            genotype = random.choices([0, 1], k=GENOTYPE_SIZE),
            fitness = None
        )
        for _ in range(popolation_dim)
    ]
    
    #add the fitness to every individuals
    for i in population:
        i.fitness = fitness(i.genotype)

    #add the distance to every individuals
    for i in population:
        i.distance = levenshtein_distance(i.genotype, population[0].genotype)

    return population

In [1056]:
#Extintion code
# Variable for exit if the fitness is stationary
# The first element of the list represents the previous fitness, and the second element is the number of consecutive times it has been observed.
extintion_values = [0, 0]

def selected_extinction(population):
    # Check to extint the population
    if(population[0].fitness == extintion_values[0]):
        extintion_values[1]+=1
    else:
        extintion_values[1] = 1

    extintion_values[0] = population[0].fitness
    
    if(extintion_values[1] >= EXTINTION_VALUE):
        population = population[:round(POPULATION_SIZE*(1-EXTINTION_QUANTITY))]
        population.extend(generate_population(round(POPULATION_SIZE*EXTINTION_QUANTITY)))
        extintion_values[1] = 0

    return population

def random_extinction(population):
    # Check to extint the population
    if(population[0].fitness == extintion_values[0]):
        extintion_values[1]+=1
    else:
        extintion_values[1] = 1

    extintion_values[0] = population[0].fitness
    
    if(extintion_values[1] >= EXTINTION_VALUE):
        num_ind = round(EXTINTION_QUANTITY*POPULATION_SIZE)

        # Generate num_ind unique random position
        random_position = random.sample(range(len(population)), num_ind)

        # Rimuovi gli elementi nelle posizioni casuali
        population = [population[i] for i in range(len(population)) if i not in random_position]

        return population


    return population

def reset_extition():
    extintion_values = [0, 0]
    

# Basic approach with a lot of variation

In [1057]:
#First algorithm
def base_mode(population=None):
    #start the population if it is not give
    if(population == None):
        population = generate_population(POPULATION_SIZE)

    for generation in range(GENERATIONS_SIZE): #while(False): #
        offspring = list()
        for counter in range(OFFSPRING_SIZE):
            if random.random() < MUTATION_PROBABILITY:  # self-adapt mutation probability
                # Select parents for mutation
                if(random.random() < 0.5):
                    parent = distance_selection(population)
                else:
                    parent = roulette_wheel_selection(population)
                # mutation
                child = mutation_functions[random.randint(0, len(mutation_functions)-1)](parent)
                # add new child to offspring
                offspring.append(child)
            else:
                # Select parents for crossover
                parent_1 = roulette_wheel_selection(population)
                if(random.random() < 0.5):
                    parent_2 = distance_selection(population)
                else:
                    parent_2 = roulette_wheel_selection(population)
                # xover
                #choose random the crossover function to incress the diversity
                child_1, child_2 = xover_functions[random.randint(0, len(xover_functions)-1)](parent_1, parent_2)
                #add new children to offspring
                offspring.extend([child_1, child_2])

        for i in offspring:
            i.fitness = fitness(i.genotype)
        
        population.extend(offspring)
        population.sort(key=lambda i: i.fitness, reverse=True)
        population = population[:POPULATION_SIZE]

        for i in offspring:
            i.distance = levenshtein_distance(i.genotype, population[0].genotype)

        population = selected_extinction(population)

        print("calls:",fitness._calls, "\tfitness: ",population[0].fitness )

        if(population[0].fitness == 1):
            break



# Island Model (mix all type)

In [1058]:
def island_model_mix():
    #create all combination of operator, xover and mutation
    combo_function = list(product(operator_functions, mutation_functions))

    combinations = []

    for tupla in combo_function:
        for elemento in xover_functions:
            nuova_tupla = tupla + (elemento,)
            combinations.append(nuova_tupla)


    NUM_ISLANDS = len(combinations)
    #print("====>", NUM_ISLANDS)

    # Initialize the islands with random populations
    islands = [
                #single population
                generate_population(POPULATION_SIZE)
                for i in range(len(combinations))
            ]
    '''
    #add the fitness to every individuals
    for island_population in islands:
        for individual in island_population:
            individual.fitness = fitness(individual.genotype)
    '''

    for generation in range(GENERATIONS_SIZE):
        for island_index in range(NUM_ISLANDS):
            island_population = islands[island_index]
            offspring = list()
        
            for counter in range(OFFSPRING_SIZE):
                # Apply crossover and mutation to create a new population
                if random.random() < MUTATION_PROBABILITY:  
                    # Select parents for mutation
                    parent = combinations[island_index][0](island_population)
                    # mutation 
                    child = combinations[island_index][1](parent)
                    # add new child to offspring
                    offspring.append(child)
                else:
                    # Select parents for crossover
                    parent_1 = combinations[island_index][0](island_population)
                    parent_2 = combinations[island_index][0](island_population)
                    # xover
                    # choose the crossover function to incress the diversity according to the island number (so every island as a different xover)
                    child_1, child_2 = combinations[island_index][2](parent_1, parent_2)
                    # add new children to offspring
                    offspring.extend([child_1, child_2])

            #calculate the fitness of the new children
            for i in offspring:
                i.fitness = fitness(i.genotype)
            
            # Replace the old population with the new one
            island_population.extend(offspring)
            island_population.sort(key=lambda i: i.fitness, reverse=True)
            island_population = island_population[:POPULATION_SIZE]

        # Perform migration between islands at specified intervals
        if generation % MIGRATION_INTERVAL == 0 and generation > 0:
            #take only best
            for source_index in range(NUM_ISLANDS):
                for destination_index in range(NUM_ISLANDS):
                    if(source_index != destination_index):
                        islands[destination_index].extend(islands[source_index][0:MIGRATION_SIZE])
            
            #select "MIGRATION_SIZE" e put in every island
            '''
            for _ in range(MIGRATION_SIZE):
                source_island = random.randint(0,NUM_ISLANDS-1)
                destination_island = (source_island + random.randint(0,NUM_ISLANDS-2) + 1) % NUM_ISLANDS

                # Select an individual to migrate
                migrant = islands[source_island][random.randint(0, POPULATION_SIZE-1)]

                # Replace a random individual in the destination island with the migrant
                replace_index = random.randint(0, POPULATION_SIZE-1)
                islands[destination_island][replace_index] = migrant
            '''


    best_fitness = 0
    for i, population_island in enumerate(islands):
        population_island.sort(key=lambda i: i.fitness, reverse=True)
        if(population_island[0].fitness > best_fitness):
            best_fitness = population_island[0].fitness
    print("best island fitness:", best_fitness)

    print("call =>", fitness.calls)

# Segregation Model (big quantity type)

In [1059]:
def segregation_model_quantity():

    # Initialize the islands with random populations
    islands = [
                #single population
                generate_population(POPULATION_SIZE)
                for i in range(NUM_ISLANDS)
            ]

    gen = 0
    num = 0

    for generation in range(GENERATIONS_SIZE):
        gen = gen +1
        num = 0
        for island_index in range(NUM_ISLANDS):
            num = num + 1
            #print("GEN: ", gen, "\t isl:", num)
            
            island_population = islands[island_index]
            offspring = list()
        
            for counter in range(OFFSPRING_SIZE):
                # Apply crossover and mutation to create a new population
                if random.random() < MUTATION_PROBABILITY:  
                    # Select parents for mutation
                    parent = operator_functions[random.randint(0, len(operator_functions)-1)](island_population)
                    # mutation 
                    child = mutation_functions[random.randint(0, len(mutation_functions)-1)](parent)
                    # add new child to offspring
                    offspring.append(child)
                else:
                    # Select parents for crossover
                    parent_1 = operator_functions[random.randint(0, len(operator_functions)-1)](island_population)
                    parent_2 = operator_functions[random.randint(0, len(operator_functions)-1)](island_population)
                    # xover
                    # choose the crossover function to incress the diversity according to the island number (so every island as a different xover)
                    child_1, child_2 = xover_functions[random.randint(0, len(xover_functions)-1)](parent_1, parent_2)
                    # add new children to offspring
                    offspring.extend([child_1, child_2])

            #calculate the fitness of the new children
            for i in offspring:
                i.fitness = fitness(i.genotype)
            
            # Replace the old population with the new one
            island_population.extend(offspring)
            island_population.sort(key=lambda i: i.fitness, reverse=True)
            island_population = island_population[:POPULATION_SIZE]


    new_population = []
    for i, population_island in enumerate(islands):
        population_island.sort(key=lambda i: i.fitness, reverse=True)

        tmp_population = population_island[:MIGRATION_SIZE]
        #calculate distance for new population
        for i in tmp_population:
            i.distance = levenshtein_distance(i.genotype, tmp_population[0].genotype)
        #add to new general population
        new_population.extend(tmp_population)


    #call basic alghoritm with best population of the islands
    base_mode(new_population)

    print("call =>", fitness.calls)


In [None]:
base_mode() # I suggest GENERATIONS_SIZE = 10000 POPULATION_SIZE = 400 OFFSPRING_SIZE = 200 
#island_model_mix() # I suggest GENERATIONS_SIZE = 100 POPULATION_SIZE = 40 OFFSPRING_SIZE = 20 NUM_ISLANDS = 200
#segregation_model_quantity() # I suggest GENERATIONS_SIZE = 100 POPULATION_SIZE = 40 OFFSPRING_SIZE = 20 NUM_ISLANDS = 200