Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB9

Write a local-search algorithm (eg. an EA) able to solve the *Problem* instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.

### Deadlines:

* Submission: Sunday, December 3 ([CET](https://www.timeanddate.com/time/zones/cet))
* Reviews: Sunday, December 10 ([CET](https://www.timeanddate.com/time/zones/cet))

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [2]:
from random import choices, choice, randint, random, shuffle
from copy import copy
from dataclasses import dataclass
from tqdm import tqdm
import numpy as np

import lab9_lib

In [57]:
fitness = lab9_lib.make_problem(2)
some = [0] * 1000
for i in range(1, 1000, 2):
    some[i] = 1
print(fitness(some))
# for n in range(10):
#     ind = choices([0, 1], k=1000)
#     print(f"{''.join(str(g) for g in ind)}: {fitness(ind):.2%}")
# fitnesses = sorted((some[s :: 2] for s in range(2)), reverse=True)
# print(fitnesses)
# print(fitness.calls)

0.5


In [19]:
POPULATION_SIZE = 50
OFFSPRING_SIZE = 20
TOURNAMENT_SIZE = 2
MUTATION_PROBABILITY = .20
INDIVIDUAL_SIZE = 1000
ACCEPTANCE = 0.0001

In [11]:

class Individual:
    def __init__(self, fitness, genotype: np.ndarray):
        self.fitness = fitness
        self.genotype = genotype

    @staticmethod
    def generate(size = INDIVIDUAL_SIZE):
        return Individual(fitness = None, genotype = np.random.choice(a = np.array([True, False]), size = size))

def select_parent(pop):
    pool = [choice(pop) for _ in range(TOURNAMENT_SIZE)]
    champion = max(pool, key=lambda i: i.fitness)
    return champion

def mutate(ind: Individual) -> Individual:
    offspring = Individual(None, ind.genotype.copy())
    pos = randint(0, INDIVIDUAL_SIZE-1)
    offspring.genotype[pos] = not offspring.genotype[pos]
    return offspring

def one_cut_xover(ind1: Individual, ind2: Individual) -> Individual:
    cut_point = randint(0, INDIVIDUAL_SIZE-1)
    offspring = Individual(fitness=None,
                           genotype=np.concatenate( (ind1.genotype[:cut_point], ind2.genotype[cut_point:]) ))
    assert offspring.genotype.shape[0] == INDIVIDUAL_SIZE
    return offspring

def uniform_xover(ind1: Individual, ind2: Individual) -> Individual:
    mask1 = np.random.choice([True, False], size = INDIVIDUAL_SIZE)
    mask2 = np.logical_not(mask1)
    assert np.logical_or(mask1, mask2).sum() == INDIVIDUAL_SIZE
    tmp1 = np.logical_and(ind1.genotype, mask1)
    tmp2 = np.logical_and(ind2.genotype, mask2)
    new_genotype = np.logical_or(tmp1, tmp2)
    return Individual(None, new_genotype)

def or_xover(ind1: Individual, ind2: Individual) -> Individual:
    new_genotype = np.logical_or(ind1.genotype, ind2.genotype)
    return Individual(None, new_genotype)

In [12]:
def generate_population(fitness_func, size = POPULATION_SIZE):
    population = [ Individual.generate() for _ in range(size)]
    for ind in population:
        ind.fitness = fitness_func(ind.genotype)
        # print(ind.fitness)
    return population

# 1 - Problem

In [18]:
fitness = lab9_lib.make_problem(2)
population = generate_population(fitness, size = POPULATION_SIZE)
cache = {tuple(ind.genotype): ind.fitness for ind in population} # memo object to avoid recomputing the fitness
best_fitness = 0.0

for generation in range(1000):
    offspring = list()
    for counter in range(OFFSPRING_SIZE):
        if random() < MUTATION_PROBABILITY:  # self-adapt mutation probability
            # mutation  # add more clever mutations
            p = select_parent(population)
            o = mutate(p)
        else:
            # xover # add more xovers
            p1 = select_parent(population)
            p2 = select_parent(population)
            # o = one_cut_xover(p1, p2)
            o = uniform_xover(p1, p2)
        offspring.append(o)

    for i in offspring:
        tmp = tuple(i.genotype)
        if tmp in cache:
            i.fitness = cache[tmp]
        else:
            i.fitness = fitness(i.genotype)
    population.extend(offspring)
    population.sort(key=lambda x: x.fitness, reverse=True)
    population = population[:POPULATION_SIZE]
    # every 10 generations test improvements
    if generation % 200 == 0:
        curr_best_fitness = population[0].fitness
        if abs(best_fitness - curr_best_fitness) < ACCEPTANCE:
            break
        best_fitness = max(best_fitness, curr_best_fitness)
    if generation % 200 == 0:
        print(f'Gen: {generation} --- fitness: {population[0].fitness}')

Gen: 0 --- fitness: 0.2511
Gen: 200 --- fitness: 0.4148
Gen: 400 --- fitness: 0.4575
Gen: 600 --- fitness: 0.4741
Gen: 800 --- fitness: 0.483


In [16]:
print(f"fitness Calls: {fitness.calls}")
best = population[0]
print(fitness(list(best.genotype)))
print(f'{fitness(list(best.genotype)):.2%}')

fitness Calls: 1324
0.53
53.00%


# 2 - Problem

In [20]:
POPULATION_SIZE = 50
OFFSPRING_SIZE = 20
TOURNAMENT_SIZE = 5
INDIVIDUAL_SIZE = 1000
ACCEPTANCE = 0.0001

In [23]:
fitness = lab9_lib.make_problem(2)
population = generate_population(fitness, size = POPULATION_SIZE)
cache = {tuple(ind.genotype): ind.fitness for ind in population} # memo object to avoid recomputing the fitness
best_fitness = 0.0
generation = 0
for generation in range(100_000):
    offspring = list()
    for counter in range(OFFSPRING_SIZE):
            # xover # add more xovers
            p1 = select_parent(population)
            p2 = select_parent(population)
            # o = one_cut_xover(p1, p2)
            o1 = mutate(p1)
            o2 = mutate(p2)
            o = uniform_xover(o1, o2)
            offspring.append(o)

    for i in offspring:
        tmp = tuple(i.genotype)
        if tmp in cache:
            i.fitness = cache[tmp]
        else:
            i.fitness = fitness(i.genotype)
    population.extend(offspring)
    population.sort(key=lambda x: x.fitness, reverse=True)
    population = population[:POPULATION_SIZE]
    # every 10 generations test improvements
    if generation % 200 == 0:
        curr_best_fitness = population[0].fitness
        if abs(best_fitness - curr_best_fitness) < ACCEPTANCE:
            break
        best_fitness = max(best_fitness, curr_best_fitness)
    if generation % 50 == 0:
        print(f'Gen: {generation} --- fitness: {population[0].fitness}')
        print(f'{len(population)}')
print(f'Generation: {generation}')

Gen: 0 --- fitness: 0.486
50
Gen: 50 --- fitness: 0.514
50
Gen: 100 --- fitness: 0.536
50
Gen: 150 --- fitness: 0.558
50
Gen: 200 --- fitness: 0.576
50
Gen: 250 --- fitness: 0.598
50
Gen: 300 --- fitness: 0.616
50
Gen: 350 --- fitness: 0.636
50
Gen: 400 --- fitness: 0.66
50
Gen: 450 --- fitness: 0.674
50
Gen: 500 --- fitness: 0.694
50
Gen: 550 --- fitness: 0.712
50
Gen: 600 --- fitness: 0.728
50
Gen: 650 --- fitness: 0.75
50
Gen: 700 --- fitness: 0.764
50
Gen: 750 --- fitness: 0.78
50
Gen: 800 --- fitness: 0.792
50
Gen: 850 --- fitness: 0.804
50
Gen: 900 --- fitness: 0.818
50
Gen: 950 --- fitness: 0.832
50
Gen: 1000 --- fitness: 0.844
50
Gen: 1050 --- fitness: 0.856
50
Gen: 1100 --- fitness: 0.86
50
Gen: 1150 --- fitness: 0.87
50
Gen: 1200 --- fitness: 0.88
50
Gen: 1250 --- fitness: 0.888
50
Gen: 1300 --- fitness: 0.896
50
Gen: 1350 --- fitness: 0.902
50
Gen: 1400 --- fitness: 0.91
50
Gen: 1450 --- fitness: 0.918
50
Gen: 1500 --- fitness: 0.926
50
Gen: 1550 --- fitness: 0.93
50
Gen: 16

In [24]:
print(f"fitness Calls: {fitness.calls}")
best = population[0]
print(fitness(list(best.genotype)))
print(f'{fitness(list(best.genotype)):.2%}')

fitness Calls: 44037
0.966
96.60%


# 5 - Problem

In [36]:
POPULATION_SIZE = 99
OFFSPRING_SIZE = 30
TOURNAMENT_SIZE = 3
INDIVIDUAL_SIZE = 1000
ACCEPTANCE = 0.0001
NICHES = 3
NICHES_SIZE = POPULATION_SIZE // NICHES
MIGRANTS = 10
RANDOM_EXPLORE = 3
SURVIVALS = POPULATION_SIZE * 2 // 3
EXINTS = POPULATION_SIZE - SURVIVALS

In [26]:
def create_niches(pop, n = 5):
    shuffle(pop)
    niches = [pop[i:i + len(pop)//n] for i in range(0, len(pop) + 1 - len(pop) // n, len(pop) // n)]
    return niches

In [27]:
def generate_offspring(population, cache):
    offspring = list()
    for counter in range(OFFSPRING_SIZE):
        # xover # add more xovers
        p1 = select_parent(population)
        p2 = select_parent(population)
        # o = one_cut_xover(p1, p2)
        o1 = mutate(p1)
        o2 = mutate(p2)
        o = uniform_xover(o1, o2)
        offspring.append(o)
    for i in offspring:
        tmp = tuple(i.genotype)
        if tmp in cache:
            i.fitness = cache[tmp]
        else:
            i.fitness = fitness(i.genotype)
    return offspring


In [38]:
fitness = lab9_lib.make_problem(5)
population = generate_population(fitness, size = POPULATION_SIZE)
niches = create_niches(population, n = NICHES)
cache = {tuple(ind.genotype): ind.fitness for ind in population} # memo object to avoid recomputing the fitness
best_fitness = 0.0

for generation in range(1000):
    # evolution in every niche
    for i in range(NICHES):
        offspring = generate_offspring(niches[i], cache)
        niches[i].extend(offspring)
        niches[i].sort(key=lambda x: x.fitness, reverse=True)
        niches[i] = niches[i][:NICHES_SIZE]
    # Periodically add random variability to favore exploration
    if generation % 20:
        for i in range(NICHES):
            num_mutations = choice(list(range(INDIVIDUAL_SIZE)))
            for ind in range(NICHES_SIZE - 1,NICHES_SIZE - 1 - RANDOM_EXPLORE, -1): # low fitness RANDOM EXPLORE ELEMENTS
                for _ in range(num_mutations):
                    niches[i][ind] = mutate(niches[i][ind])
                niches[i][ind].fitness = fitness(niches[i][ind].genotype)
    if generation % 50 == 0:
        curr_best_fitness = max([niche[0].fitness for niche in niches])
        if abs(best_fitness - curr_best_fitness) < ACCEPTANCE:
            # extinction
            whole_population = []
            for niche in niches:
                whole_population.extend(niche)
            whole_population.sort(key=lambda x: x.fitness, reverse=True)
            whole_population = whole_population[:SURVIVALS]
            new_ind = generate_population(fitness, size=EXINTS)
            whole_population.extend(new_ind)
            niches = create_niches(whole_population, n = NICHES)
        best_fitness = max(best_fitness, curr_best_fitness)
    # Migration
    if generation % 30 == 0:
        niches_indexes = set(range(NICHES))
        for _ in range(MIGRANTS):
            niche1 = choice(list(niches_indexes))
            niches_indexes.remove(niche1)
            niche2 = choice(list(niches_indexes))
            niches_indexes.add(niche1)
            to_swap = choices(list(range(NICHES_SIZE)), k = 2)
            niches[niche1][to_swap[0]], niches[niche2][to_swap[1]] = niches[niche2][to_swap[1]], niches[niche1][to_swap[0]]
        print(f"GENERATION: {generation}")
        for i in range(NICHES):
            print(f"NICHES_{i} --- fitness: {niches[i][0].fitness}")

GENERATION: 0
NICHES_0 --- fitness: 0.1054596
NICHES_1 --- fitness: 0.20246899999999998
NICHES_2 --- fitness: 0.20603
GENERATION: 30
NICHES_0 --- fitness: 0.32426
NICHES_1 --- fitness: 0.32412
NICHES_2 --- fitness: 0.44289999999999996
GENERATION: 60
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0.4447
NICHES_2 --- fitness: 0.44480000000000003
GENERATION: 90
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0.52
NICHES_2 --- fitness: 0.52
GENERATION: 120
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0.52
NICHES_2 --- fitness: 0.52
GENERATION: 150
NICHES_0 --- fitness: 0.0943634
NICHES_1 --- fitness: 0.52
NICHES_2 --- fitness: 0.52
GENERATION: 180
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0.52
NICHES_2 --- fitness: 0.52
GENERATION: 210
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0.52
NICHES_2 --- fitness: 0.52
GENERATION: 240
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0.52
NICHES_2 --- fitness: 0.52
GENERATION: 270
NICHES_0 --- fitness: 0.52
NICHES_1 --- fitness: 0

In [39]:
print(f"fitness Calls: {fitness.calls}")
curr_best_fitness = max([niche[0].fitness for niche in niches])
print(f'{curr_best_fitness:.2%}')

fitness Calls: 99243
52.00%


# Strange Approach

In [180]:
POPULATION_SIZE = 50
OFFSPRING_SIZE = 70
TOURNAMENT_SIZE = 3
INDIVIDUAL_SIZE = 1000
ACCEPTANCE = 0.001
NICHES = 5
NICHES_SIZE = POPULATION_SIZE // NICHES
MIGRANTS = 10
RANDOM_EXPLORE = 3
SURVIVALS = POPULATION_SIZE * 2 // 3
EXINTS = POPULATION_SIZE - SURVIVALS

In [169]:
class Individual:
    def __init__(self, fitness, genotype: np.ndarray):
        self.fitness = fitness
        self.genotype = genotype

    @staticmethod
    def generate(size = INDIVIDUAL_SIZE, density = 0.50):
        return Individual(fitness = None, genotype = np.random.choice(a = np.array([True, False]), size = size, p = np.array([density, 1.0 - density])))

def select_parent(pop):
    pool = [choice(pop) for _ in range(TOURNAMENT_SIZE)]
    champion = max(pool, key=lambda i: i.fitness)
    return champion

def mutate(ind: Individual,interval: tuple) -> Individual:
    offspring = Individual(None, ind.genotype.copy())
    pos = randint(interval[0], interval[1])
    offspring.genotype[pos] = not offspring.genotype[pos]
    return offspring

def one_cut_xover(ind1: Individual, ind2: Individual) -> Individual:
    cut_point = randint(0, INDIVIDUAL_SIZE-1)
    offspring = Individual(fitness=None,
                           genotype=np.concatenate( (ind1.genotype[:cut_point], ind2.genotype[cut_point:]) ))
    assert offspring.genotype.shape[0] == INDIVIDUAL_SIZE
    return offspring

def uniform_xover(ind1: Individual, ind2: Individual) -> Individual:
    mask1 = np.random.choice([True, False], size = INDIVIDUAL_SIZE)
    mask2 = np.logical_not(mask1)
    assert np.logical_or(mask1, mask2).sum() == INDIVIDUAL_SIZE
    tmp1 = np.logical_and(ind1.genotype, mask1)
    tmp2 = np.logical_and(ind2.genotype, mask2)
    new_genotype = np.logical_or(tmp1, tmp2)
    return Individual(None, new_genotype)

def or_xover(ind1: Individual, ind2: Individual) -> Individual:
    new_genotype = np.logical_or(ind1.genotype, ind2.genotype)
    return Individual(None, new_genotype)

In [170]:
def generate_offspring(population, interval, cache):
    offspring = list()
    for counter in range(OFFSPRING_SIZE):
        # xover # add more xovers
        p1 = select_parent(population)
        p2 = select_parent(population)
        # o = one_cut_xover(p1, p2)
        o1 = mutate(p1, interval)
        o2 = mutate(p2, interval)
        o = uniform_xover(o1, o2)
        offspring.append(o)
    for i in offspring:
        tmp = tuple(i.genotype)
        if tmp in cache:
            i.fitness = cache[tmp]
        else:
            i.fitness = fitness(i.genotype)
    return offspring


In [192]:
def generate_population(fitness_func, size = POPULATION_SIZE, density = 0.50):
    population = [ Individual.generate(density=density) for _ in range(size)]
    for ind in population:
        ind.fitness = fitness_func(ind.genotype)
        # print(ind.fitness)
    return population

In [None]:
DENSITY = 0.50
fitness = lab9_lib.make_problem(5)
population = generate_population(fitness, size = POPULATION_SIZE, density= DENSITY)
niches = create_niches(population, n = NICHES)
cache = {tuple(ind.genotype): ind.fitness for ind in population} # memo object to avoid recomputing the fitness
best_fitness = 0.0
stats = [0, 0]
for generation in range(1000):
    # evolution in every niche
    for i in range(NICHES):
        offspring = generate_offspring(niches[i], (i * NICHES_SIZE, i * NICHES_SIZE + NICHES_SIZE), cache)
        offspring.sort(key=lambda x: x.fitness, reverse=True)
        stats[0] = sum([a.fitness > b.fitness for a, b in zip(offspring[:NICHES_SIZE], niches[i])])
        stats[1] += NICHES_SIZE
        niches[i].extend(offspring)
        niches[i].sort(key=lambda x: x.fitness, reverse=True)
        niches[i] = niches[i][:NICHES_SIZE]

    if generation % 50 == 0:
        curr_best_fitness = max([niche[0].fitness for niche in niches])
        if abs(best_fitness - curr_best_fitness) < ACCEPTANCE:
            # EXTINCTION
            whole_population = []
            for niche in niches:
                whole_population.extend(niche)
            whole_population.sort(key=lambda x: x.fitness, reverse=True)
            whole_population = whole_population[:SURVIVALS]
            new_ind = generate_population(fitness, size=EXINTS, density=DENSITY)
            whole_population.extend(new_ind)
            niches = create_niches(whole_population, n = NICHES)
            if stats[0] / stats[1] < 1 / 5:
                DENSITY = min(1.0, DENSITY * 1.05)
            else:
                DENSITY = max(0, DENSITY / 1.05)
            print(DENSITY)
            stats = [0, 0]
        best_fitness = max(best_fitness, curr_best_fitness)
    # Migration
    if generation % 20 == 0:
        niches_indexes = set(range(NICHES))
        for _ in range(MIGRANTS):
            niche1 = choice(list(niches_indexes))
            niches_indexes.remove(niche1)
            niche2 = choice(list(niches_indexes))
            niches_indexes.add(niche1)
            to_swap = choices(list(range(NICHES_SIZE)), k = 2)
            niches[niche1][to_swap[0]], niches[niche2][to_swap[1]] = niches[niche2][to_swap[1]], niches[niche1][to_swap[0]]
        print(f"GENERATION: {generation}")
        for i in range(NICHES):
            print(f"NICHES_{i} --- fitness: {niches[i][0].fitness}")

In [196]:
print(fitness.calls)

350269
