# Lab9 - Black Box EA

Wrote a local-search algorithm (eg. an EA) able to solve the Problem instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.


In [113]:
import os 
if "lab9_lib.py" not in os.listdir("."):
    !curl https://raw.githubusercontent.com/squillero/computational-intelligence/master/2023-24/lab9_lib.py > lab9_lib.py
    

In [140]:
from lab9_lib import make_problem
from tqdm.autonotebook import tqdm, trange
from collections import namedtuple
from copy import deepcopy
from dataclasses import dataclass, field
import random
from typing import Literal, Union, Callable
import numpy as np
import math

In [115]:
LOCI = 1000
Gene = Literal[0,1]
Genome = tuple[Gene]

@dataclass(frozen=True, repr=False)
class Individual:

    genome: tuple[Gene] = field(default_factory=lambda: list(random.choices([0, 1], k = LOCI)), repr=False)
    _fitness: float = field(default=None, init=False, compare=False)

    def mutate(it: "Individual") -> "Individual":
        gene_to_mutate = 10
        mutated_genome = [*it.genome]
        for _ in range(gene_to_mutate):
            ind = random.randrange(LOCI)
            mutated_genome[ind] = 1-mutated_genome[ind]
        return Individual(mutated_genome)

    def crossover(it: "Individual", other: "Individual") -> "Individual":
        return Individual(
            [i if r < .5 else o for i, o, r in zip(it.genome, other.genome, [random.random() for _ in range(LOCI)])]
        )

    def evaluate(self: "Individual", fitness_fn: Callable[[Genome], float]) -> float:
        """Wrapped evaluation inside individual to allow some kind of caching

        Args:
            self (Individual): Individual
            fitness_fn (Callable[[Genome], float]): Fitness function

        Returns:
            float: fitness
        """
        if self._fitness is None:
            fitness = fitness_fn(self.genome)
            object.__setattr__(self, "_fitness", fitness)
        return self._fitness

    @property
    def fitness(self) -> float:
        assert self._fitness is not None, "Fitness has not been evaluated yet"
        return self._fitness

    def __repr__(self: "Individual"):
        return f"Individual(Zeros={sum([1 for it in self.genome if it == 0])}, Ones={sum([it for it in self.genome])})"
        
    def __str__(self: "Individual"):
        return self.__repr__()
    
    @property
    def phenotype(self) -> str:
        return "".join(str(bit) for bit in self.genome)

In [141]:
EPOCHS = 10
POP_SIZE = 40
OFFSPRING_SIZE = 20
CROSSOVER_OVER_MUTATION_PROB = .1
CROSSOVER_AND_MUTATION_PROB = .8
SURVIVAL_RATE = .15
TOURNAMENT_SIZE = 2
PROB_SIZE = 1

In [142]:
def train(*, problem_size: int = None, pop_size: int = None, offspring_size: int = None, 
          epochs: int = None, tournament_size: int = None, crossover_over_mutation_prob: float = None, 
          crossover_and_mutation_prob: float = None,
          extinction: bool = False, survival_rate: float = None):
    if problem_size is None:
        problem_size = PROB_SIZE
    if pop_size is None:
        pop_size = POP_SIZE
    if offspring_size is None:
        offspring_size = OFFSPRING_SIZE
    if epochs is None:
        epochs = EPOCHS
    if tournament_size is None: 
        tournament_size = TOURNAMENT_SIZE
    if crossover_over_mutation_prob is None: 
            crossover_over_mutation_prob = CROSSOVER_OVER_MUTATION_PROB
    if crossover_and_mutation_prob is None:
        crossover_and_mutation_prob = CROSSOVER_AND_MUTATION_PROB
    if survival_rate is None:
        survival_rate = SURVIVAL_RATE

    problem = make_problem(problem_size)
    parents = [Individual() for _ in range(pop_size)]
    for i in parents:
        i.evaluate(problem)
    max_fitness: Callable[[list["Individual"]], float] = lambda x: max([i.fitness for i in x])
    best_in_list: Callable[[list["Individual"]], "Individual"] = lambda x: [c for c in x if c.fitness == max_fitness(x)][0]
    tournament_selection: Callable[[list["Individual"]], "Individual"] = lambda l: best_in_list(random.choices(l, k=tournament_size))
    epoch_bar = trange(0, epochs, unit="epoch")
    update_epoch_bar = lambda: epoch_bar.set_description(f"Fitness {max_fitness(parents):.2%} - #Calls: {problem.calls}")
    best = None
    for epoch in epoch_bar:
        update_epoch_bar()
        if math.isclose(1, best_in_list(parents).fitness):
            break
        offspring = []
        for i in range(offspring_size):
            new_ind: "Individual"
            if random.random() < crossover_over_mutation_prob:
                new_ind = tournament_selection(parents).crossover(tournament_selection(parents))
                if random.random() < crossover_and_mutation_prob:
                    new_ind = new_ind.mutate()
            else:
                new_ind = tournament_selection(parents).mutate()
            new_ind.evaluate(problem)
            offspring.append(new_ind)

        parents = sorted([*parents, *offspring], key=lambda i:i.fitness, reverse=True)[:pop_size]
        if extinction:
            convergenceness_threshold = 0.005
            convergenceness = [i.fitness for i in parents]
            if np.std(convergenceness) < convergenceness_threshold:
                to_purge = int(len(parents) * SURVIVAL_RATE)
                parents = random.choices(parents, k=to_purge)
                # parents.extend(random.choices(offspring, k=pop_size//4))
                for _ in range(to_purge):
                    ind = Individual()
                    ind.evaluate(problem)
                    parents.append(ind)
        if best is None or best.fitness < max_fitness(parents):
            best = best_in_list(parents)
                
    return best_in_list(parents), best

In [147]:
ind, best = train(problem_size=2, epochs=3000)
ind, best

  0%|          | 0/3000 [00:00<?, ?epoch/s]

(Individual(Zeros=174, Ones=826), Individual(Zeros=174, Ones=826))