Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB3

Wrote a local-search algorithm (eg. an EA) able to solve the *Problem* instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.

### Deadlines:

* Submission: Sunday, November 26 ([CET](https://www.timeanddate.com/time/zones/cet))
* Reviews: Sunday, December 3 ([CET](https://www.timeanddate.com/time/zones/cet))

Notes:

* Reviews will be assigned  on Monday, November 27
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [150]:
from random import choices
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
from itertools import product
import pickle
import math
import numpy as np
from enum import Enum
import lab3_lib
from lab3_lib import AbstractProblem
from itertools import chain


In [151]:
class Agent():

    class Crossover(Enum):
        RANDOM = 1
        MULTI_CUT = 2
        SCRAMBLE = 3
        CYCLE = 4

    class AgentType(Enum):
        BINARY = 1
        PATTERN = 2

    def __init__(self,genome,mutation_rate : float = None) -> None:
        self.genome = genome
        self.mutation_rate = mutation_rate
        self._id = random.randint(0, 1000000000)
        pass

    def set_mutation_rate(self, mutation_rate : float) -> None:
        self.mutation_rate = mutation_rate

    def set_genome(self, genome) -> None:
        self.genome = genome

    def compute_fitness(self) -> None:
        pass        

    def load_agent(self, path):
        with open(path, 'rb') as f:
            self.genome = pickle.load(f)
        return self
    
    def save_agent(self, path):
        with open(path, 'wb') as f:
            pickle.dump(self.genome, f)

In [152]:
class AgentGA(Agent):

    def __init__(self) -> None:
        super().__init__(None,None)

    def set_genome(self,genome_size : int,genome : list = None) -> Agent:
        if genome is None:
            genome = [random.uniform(0, 1) < 0.5 for _ in range(genome_size)]
        super().set_genome(genome)
        return self

    def set_mutation_rate(self,mutation_rate : float) -> Agent:
        super().set_mutation_rate(mutation_rate)
        return self

    def mutation(self,mutation_rate : float = None) -> Agent:
        if mutation_rate is None and self.mutation_rate is None:
            raise ValueError("Mutation rate not set")
        mask = [random.uniform(0, 1) < (mutation_rate if mutation_rate is not None else self.mutation_rate) for _ in range(len(self.genome))]
        self.genome = [a ^ b for a, b in zip(self.genome, mask)]
        if self.mutation_rate is not None:
            self.mutation_rate *= [0.999,1.001][random.randint(0,1)]
        return self

    def __repr__(self) -> str:
        return f"AgentGA({self._id},{self.fitness})"

    def crossover(self,other : Agent,crossover_type : Agent.Crossover = Agent.Crossover.RANDOM,other_weight = 0.5) -> Agent:
        match crossover_type:
            case Agent.Crossover.RANDOM:
                return self.random_crossover(other,other_weight)
            case Agent.Crossover.MULTI_CUT:
                return self.multi_cut_crossover(other)
            case Agent.Crossover.SCRAMBLE:
                return self.scramble_crossover(other)
            case Agent.Crossover.CYCLE:
                return self.cycle_crossover(other)
            case _:
                raise ValueError("Unknown crossover type")

    def random_crossover(self,other : Agent,other_weight = 0.5) -> Agent:
        #print("crossover")
        for (i,_) in enumerate(self.genome):
            self.genome[i] = random.choices([self.genome[i],other.genome[i]],weights=[1-other_weight,other_weight])[0]
        return self
    
    def multi_cut_crossover(self,other : Agent) -> Agent:
        #print("multi_cut_crossover")
        n = random.randint(1, len(self.genome)//2-1)
        size = len(self.genome)//n
        for i in range(n):
            start = i * size
            end = (i + 1) * size - 1
            n = random.randint(start, end)
            m = random.randint(start, end)
            if n > m:
                n,m = m,n
            self.genome[n:m] = other.genome[n:m]
        return self

    def scramble_crossover(self,other : Agent) -> Agent: #TODO rewrite and rename as One Point Crossover
        #print("scramble_mutation")
        n = random.randint(0, len(self.genome)-1)
        self.genome[n:] = other.genome[n:]
        return self
    
    def cycle_crossover(self,other : Agent) -> Agent:
        #print("cycle_crossover")
        n = random.randint(0, len(self.genome)-1)
        m = random.randint(0, len(self.genome)-1)
        if n > m:
            n,m = m,n
        self.genome[n:m] = other.genome[n:m]
        return self

    def compute_fitness(self,fitness_function : AbstractProblem) -> None:
        #print(f"compute_fitness : {fitness_function.calls}")
        self._fitness = fitness_function(self.genome)
    
    @property
    def fitness(self):
        return self._fitness

In [153]:
class AgentTraining():

    class trainingType(Enum):
        DEFAULT = 0
        LAMBDA = 1
        TOURNAMENT = 2

    def __init__(self, pop_size : int,problem  : AbstractProblem,training_type : trainingType = 0,k : int = 50,fast_finish : bool = False) -> None:
        self.population = []
        self.state = problem
        self.ration = 0.9
        self.temperature = 0.9
        self.lam = 10
        self.fast_finish = fast_finish
        
        for _ in range(pop_size):
            temp = AgentGA(choices([0, 1], k = k),self.state)
            self.population.append(temp)
        match training_type:
            case self.trainingType.DEFAULT:
                self.generation = self.generation_default
            case self.trainingType.LAMBDA:
                self.generation = self.generation_lambda
            case self.trainingType.TOURNAMENT:
                self.generation = self.generation_tournament

    def generation_default(self) -> None:
        for _ in range(len(self.population)):
            parent_1 , parent_2 = deepcopy(random.choices(self.population,weights=[x.fitness for x in self.population],k=2))
            if random.random() > self.ration :
                parent_1.crossover(parent_2,AgentGA.Crossover.RANDOM)
            else:
                parent_1.mutation(self.ration/2)
            parent_1.compute_fitness(self.state)
            self.population.append(parent_1)    
        self.population.sort(key=lambda x : x.fitness , reverse=True)
        self.population = self.population[:len(self.population)//2]
       

    def generation_lambda(self) -> None:
        parents = []
        # (mu,lambda) approach with lambda = mu for semplicity
        for _ in range(self.lam): # Selecting parents randomly
            #parents.append(self.population.pop(random.randint(0,len(self.population)-1)) )
            parents.append(self.population.pop(0) )
        for parent in parents:
            temp_population = []
            temp_population.append(deepcopy(parent))
            for _ in range(self.lam):
                temp : Agent = deepcopy(parent).mutation(self.ration)
                temp.compute_fitness(self.state)
                temp_population.append(temp)  
            temp_population.sort(key=lambda x : x.fitness , reverse=True)
            self.population.append(temp_population[0])  
        self.population.sort(key=lambda x : x.fitness , reverse=True)
        
    def train(self,epoch = 10):
        fitness = []
        avg_fitness = []
        print("Starting Training")
        for _ in range(epoch):
            print("Generation : ",_)
            self.generation()
            print(f"Best fitness : {self.population[0].fitness} , individual : {self.population[0].genome}")
            fitness.append(self.population[0].fitness)
            avg_fitness.append(sum([x.fitness for x in self.population])/len(self.population))
            if self.fast_finish and self.population[0].fitness == 1.0:
                break
            self.ration = self.temperature * self.ration
        return self.population[0] , fitness , avg_fitness

In [154]:
class PatternBasedAgent(Agent):

    def __init__(self) -> None:
        #self.pattern_size = random.randint(1,genome_size//10)
        super().__init__(None,None)
        
    def set_genome(self,genome_size : int,genome : list = None) -> Agent:
        self.genome_size = genome_size
        self.pattern_size = random.randint(max(1,genome_size//50),genome_size//10)
        self.pattern = [random.uniform(0, 1) < 0.5 for _ in range(self.pattern_size)]
        self.generate_genome()
        return self
    
    def set_mutation_rate(self,mutation_rate : float) -> Agent:
        super().set_mutation_rate(mutation_rate)
        return self

    def generate_genome(self):
        self.genome = list(chain.from_iterable([self.pattern for _ in range(self.genome_size//self.pattern_size + 1)])) # overshoot the size to be sure to have enough
        self.genome = self.genome[:self.genome_size]
        #print(f"size : {self.pattern_size} with pattern : {self.pattern} , genome : {self.genome}")

    def mutation(self,mutation_rate : float = None) -> Agent:
        if mutation_rate is None and self.mutation_rate is None:
            raise ValueError("Mutation rate not set")
        
        if random.random() < 0.1 :
            adding = random.choices([-1, 1], weights=[1, 2])[0]
            if adding == -1 and self.pattern_size > 1:
                self.pattern_size -= 1
                self.pattern = self.pattern[:-1]
            else:
                temp = random.choices([0, 1], k = adding)
                self.pattern_size += adding
                self.pattern += temp
        else:
            for (i,state) in enumerate(self.pattern):
                if random.random() < (mutation_rate if mutation_rate is not None else self.mutation_rate) :
                    self.pattern[i] = 1-state
        self.generate_genome()
        return self

    def __repr__(self) -> str:
        return f"AgentGA({self._id},{self.fitness}) with pattern {self.pattern}"

    def crossover(self,other : Agent,crossover_type : Agent.Crossover = 1,other_weight = 0.5) -> Agent:
        match crossover_type:
            case Agent.Crossover.RANDOM:
                return self.random_crossover(other,other_weight)
            case Agent.Crossover.MULTI_CUT:
                return self.multi_cut_crossover(other)
            case Agent.Crossover.SCRAMBLE:
                return self.one_cut_crossover(other)
            case Agent.Crossover.CYCLE:
                return self.cycle_crossover(other)
            case _:
                raise ValueError("Unknown crossover type")

    def random_crossover(self,other : Agent,other_weight = 0.5) -> Agent:
        #print("crossover")
        offset = self.pattern_size - other.pattern_size
        if offset > 0:
            temp = random.randint(0,offset)
            for (i,_) in enumerate(other.pattern):
                self.pattern[i + temp] = random.choices([self.pattern[i + temp],other.pattern[i]],weights=[1-other_weight,other_weight])[0]
        else:
            temp = random.randint(0,abs(offset))
            for (i,_) in enumerate(self.pattern):
                self.pattern[i] = random.choices([self.genome[i],other.genome[i + temp]],weights=[1-other_weight,other_weight])[0]
        self.generate_genome()
        return self
    
    def multi_cut_crossover(self,other : Agent) -> Agent:
        #print("multi_cut_crossover")
        for _ in range(random.randint(0, (min(self.pattern_size , other.pattern_size) - 1)//2)):
            n = random.randint(0, min(self.pattern_size , other.pattern_size) - 1)
            m = random.randint(0, min(self.pattern_size , other.pattern_size) - 1)
            if n > m:
                n,m = m,n
            self.pattern[n:m] = other.pattern[n:m]
        self.generate_genome()
        return self

    def one_cut_crossover(self,other : Agent) -> Agent:
        #print("scramble_mutation")
        n = random.randint(0, min(self.pattern_size , other.pattern_size) - 1)
        self.pattern[n:] = other.pattern[n:]
        self.generate_genome()
        return self
    
    def cycle_crossover(self,other : Agent) -> Agent:
        #print("cycle_crossover")
        RuntimeError("Not yet implemented")
        n = random.randint(0, len(self.genome)-1)
        m = random.randint(0, len(self.genome)-1)
        if n > m:
            n,m = m,n
        self.genome[n:m] = other.genome[n:m]
        return self

    def compute_fitness(self,fitness_function : AbstractProblem) -> None:
        #print(f"compute_fitness : {fitness_function.calls}")
        self._fitness = fitness_function(self.genome)
    
    @property
    def fitness(self):
        return self._fitness

In [155]:
def calculate_entropy(agent_list):
    # Calcola la probabilità di ciascun agente rispetto al totale
    total_agents = len(agent_list)
    probabilities = [agent.fitness / total_agents for agent in agent_list]

    # Calcola l'entropia utilizzando la formula di Shannon
    entropy = -sum(p * math.log2(p) if p > 0 else 0 for p in probabilities)

    return entropy

In [156]:
def average_rule(agent_list : list):
    # Calcola la media delle regole di tutti gli agenti
    total_agents = len(agent_list)
    total_rules = len(agent_list[0].genome)
    average = [0 for _ in range(total_rules)]

    for agent in agent_list:
        for i in range(total_rules):
            average[i] += agent.genome[i]

    average = [x / total_agents for x in average]

    return average

In [157]:
import concurrent.futures

class AgentIslandsTraining():

    def __init__(self, pop_size : int,problem  : AbstractProblem,k : int = 50,island_number : int = 10,agent_type : Agent.AgentType = Agent.AgentType.BINARY) -> None:
        self.state = problem
        self.starting_mutation_rate = 0.008
        self.ration = 0.8
        self.mutation_rate = self.starting_mutation_rate
        self.old_fitness = None
        self.static_epochs = 0
        self.islands = [ [] for _ in range(island_number)]
        print("Creating Islands")
        print(f"Islands : {self.islands}")

        match agent_type:
            case Agent.AgentType.BINARY:
                CurrentAgent = AgentGA
            case Agent.AgentType.PATTERN:
                CurrentAgent = PatternBasedAgent

        for island in self.islands:
            for _ in range(pop_size // island_number):
                temp : Agent = CurrentAgent().set_genome(genome_size=k).set_mutation_rate(random.uniform(0.001, 0.006))
                temp.compute_fitness(self.state)
                island.append(temp)
            print(f"Islands : {[len(x) for x in self.islands]}")

    def process_island(self,args):
        i, island, size, ration, state = args
        for _ in range(size):
            parent, other = random.choices(island, k=2)
            parent = deepcopy(parent)
            if random.random() > ration:
                parent.crossover(other, Agent.Crossover.MULTI_CUT)
            parent.mutation()
            parent.compute_fitness(state)
            island.append(parent)
        island.sort(key=lambda x: x.fitness, reverse=True)
        return island[:size]

    def parallel_generation(self):
        args_list = [(i, island, len(island), self.ration, self.state) for i, island in enumerate(self.islands)]

        with concurrent.futures.ThreadPoolExecutor() as executor:
            new_islands = list(executor.map(self.process_island, args_list))

        self.islands = new_islands

    def generation(self) -> None:
        for i , island in enumerate(self.islands):      
            #print(f"Island : {i} with {len(island)} pop")
            size = len(island)
            for _ in range(size):
                #parent , other = random.choices(island,weights=[x.fitness for x in island],k=2)
                parent , other = random.choices(island,k=2)
                #print(f"parent : {parent} , other : {other}")
                parent = deepcopy(parent)
                if random.random() > self.ration :
                    parent.crossover(other,Agent.Crossover.MULTI_CUT)
                #parent.mutation(self.mutation_rate / 2)
                parent.mutation()
                parent.compute_fitness(self.state)
                island.append(parent)    
            island.sort(key=lambda x : x.fitness , reverse=True)
            self.islands[i] = island[:size]
            #print(f"Islands : {[len(x) for x in self.islands]} at iteration {i}")
            #print(f"Island : {i} with {len(island)} pop")
    
    def generation_tournament(self) -> None:
        for i , island in enumerate(self.islands):      
            current_member = 0
            mating_pool = []
            while current_member < self.lam:
                tournament = random.choices(island,k=self.tournament_size)
                tournament.sort(key=lambda x : x.fitness , reverse=True)
                mating_pool.append(tournament[0])
                current_member += 1
            temp_population = []
            for parent in mating_pool:
                for _ in range(self.lam):
                    temp : Agent = deepcopy(parent).mutation(self.ration)
                    temp.compute_fitness(self.state)
                    temp_population.append(temp)  
            temp_population.sort(key=lambda x : x.fitness , reverse=True)
            self.islands[i][-self.lam:] = temp_population[:self.lam]

        #define what to do if num_islands is odd
    def migration(self , N : int = 1):
        if len(self.islands) == 1:
            return
        for i in range(len(self.islands)-1) :
            for _ in range(N):
                pos_1 = random.randint(0,len(self.islands[i]) - 1)
                pos_2 = random.randint(0,len(self.islands[(i+1)%len(self.islands)]) - 1)
                self.islands[i][pos_1] , self.islands[(i+1)%len(self.islands)][pos_2] =  self.islands[(i+1)%len(self.islands)][pos_2] , self.islands[i][pos_1]
               

    def train(self,generations = 10):
        best_fitness = None
        print("Starting Training")
        for gen in range(generations):
            #print(f"Islands : {[len(x) for x in self.islands]} at gen {gen}")
            self.generation()
            #print(f"Islands : {[len(x) for x in self.islands]} at gen {gen}")
            if (gen + 1 ) % 10 == 0:
                best_fitness = max([agent for island in self.islands for agent in island], key=lambda agent: agent.fitness)
                self.migration(30)
                if self.old_fitness == best_fitness:
                    self.static_epochs += 1
                else:
                    self.static_epochs = 0
                    self.old_fitness = best_fitness
                if self.static_epochs > 2:
                    self.mutation_rate = min(self.starting_mutation_rate,1.05 * self.mutation_rate)
                    #self.ration = max(0.6,1.1 * self.ration)
                else:
                    self.mutation_rate = max(self.starting_mutation_rate/10,0.95 * self.mutation_rate)
                    self.ration = max(0.4,0.99 * self.ration)
                print(f"Best fitness : {best_fitness.fitness} at gen {gen+1} with criterion : {self.ration} , mutation : {best_fitness.mutation_rate} , static_epochs : {self.static_epochs}")
                temp = average_rule([agent for island in self.islands for agent in island])
                print(f"Average sum : {sum(temp) / 1000} , rule weight : {temp}")
                #print(f"Best fitness : {best_fitness.fitness} at gen {gen} with total population : {([calculate_entropy(x) for x in self.islands])} , criterion : {self.ration}")
            #self.islands.sort(key=lambda x : x[0].fitness , reverse=True)
            if best_fitness is not None and best_fitness.fitness == 1.0:
                break
        print([island for island in self.islands])
        return best_fitness

In [159]:
fitness = lab3_lib.make_problem(10)
trainer = AgentIslandsTraining(100,fitness,k=1000,island_number=5,agent_type=Agent.AgentType.PATTERN)
best_agent = trainer.train(5000)
fitness.calls

Creating Islands
Islands : [[], [], [], [], []]
Islands : [20, 0, 0, 0, 0]
Islands : [20, 20, 0, 0, 0]
Islands : [20, 20, 20, 0, 0]
Islands : [20, 20, 20, 20, 0]
Islands : [20, 20, 20, 20, 20]
Starting Training
Best fitness : 0.4718 at gen 10 with criterion : 0.792 , mutation : 0.005694549915852335 , static_epochs : 0
Average sum : 0.5259900000000001 , rule weight : [0.3, 0.6, 0.29, 0.72, 0.44, 0.61, 0.61, 0.7, 0.72, 0.51, 0.49, 0.36, 0.66, 0.45, 0.49, 0.29, 0.42, 0.79, 0.3, 0.13, 0.99, 0.62, 0.8, 0.8, 0.2, 0.5, 0.29, 0.16, 0.7, 0.4, 0.72, 0.58, 0.52, 0.3, 0.5, 0.55, 0.42, 0.6, 0.6, 0.28, 0.52, 0.56, 0.58, 0.41, 0.62, 0.74, 0.6, 0.62, 0.78, 0.61, 0.3, 0.47, 0.62, 0.57, 0.47, 0.42, 0.33, 0.39, 0.72, 0.64, 0.63, 0.38, 0.71, 0.56, 0.66, 0.71, 0.57, 0.8, 0.31, 0.22, 0.8, 0.62, 0.38, 0.8, 0.28, 0.34, 0.4, 0.33, 0.41, 0.4, 0.52, 0.18, 0.8, 0.37, 0.82, 0.67, 0.7, 0.7, 0.6, 0.4, 0.08, 0.42, 0.76, 0.5, 0.22, 0.8, 0.24, 0.22, 0.8, 0.37, 0.6, 0.6, 0.62, 0.8, 0.69, 0.75, 0.6, 0.48, 0.32, 0.53, 0.8

15100

#### A dumb approach to the problem
Trying to optimize the problem by trying to maximize the fitness bit-by-bit, converge almost instantly for n = 1, but can't find anything with 2,5,10 problem size

In [160]:
def dummy_solution(fitness):
    k = 1000
    agent_slighly_smarter = [random.uniform(0,1) < 0.5 for _ in range(k)]
    agent_fitness = fitness(agent_slighly_smarter)
    while agent_fitness < 1.0:
        for i in range(k):
            temp = deepcopy(agent_slighly_smarter)
            temp[i] = not temp[i]
            temp_fitness = fitness(temp)
            if temp_fitness > agent_fitness:
                agent_slighly_smarter = temp
                agent_fitness = temp_fitness
        print(agent_fitness)
    #print(fitness(agent_slighly_smarter))
    #print(fitness.calls)
    return agent_slighly_smarter
fitness = lab3_lib.make_problem(1)
print(fitness(dummy_solution(fitness)))

1.0
1.0


In [161]:
def hill_climbing(k : int,fitness : AbstractProblem,lam):
    agent = AgentGA([random.uniform(0,1) < 0.5 for _ in range(k)],fitness)
    gen = 0
    gene_modifier = 2
    mutation_rate = gene_modifier / k
    while agent.fitness < 1.0:
        offspring = []
        for _ in range(lam):
            agent_temp = deepcopy(agent)
            agent_temp.mutation(mutation_rate)
            agent_temp.compute_fitness(fitness)
            offspring.append(agent_temp)
        offspring.append(agent)
        offspring.sort(key=lambda x : x.fitness , reverse=True)
        agent = offspring[0]
        gen += 1
        if gen % 10 == 0:
            print(f"gen {gen} : Best fitness : {agent.fitness}")

In [162]:
fitness = lab3_lib.make_problem(2)
hill_climbing(1000,fitness,40)
fitness.calls

TypeError: AgentGA.__init__() takes 1 positional argument but 3 were given