# Problema QAP

### Parte 1. Clase QAP y operadores

In [78]:
# https://goo.gl/p1UZlp

import numpy as np 
import matplotlib.pyplot as plt 

class QAP:

    # constructor de la clase
    def __init__(self, n, distances, flows):
        self.n = n                      
        self.distances = distances
        self.flows = flows
        self.best_fit_record = []
        self.best_fit =np.Inf
        self.best_sol = []

    # Función objetivo
    def __call__(self, solution):
        # cada solucion es array de números enteros no consecutivos en el rango [0, n-1]
        # el valor solution[i] es la ubicación donde se contruira la instalacion[i]
        perm_dist = self.distances[solution,:][:,solution]
        fit = int(np.multiply(self.flows, perm_dist).sum())

        # la mejor solucion se evalua por cada llamada
        if fit < self.best_fit:
            self.best_fit = fit
            self.best_sol = solution.tolist()

        self.best_fit_record.append(self.best_fit) # record de las mejores soluciones

        return fit

    # construye una instancia de la clase desde un fichero
    def build_from_file(path):
        with open(path, "r") as f:
            n = int(f.readline().strip())
            distances, flows = np.zeros((n,n), dtype=int), np.zeros((n,n), dtype=int)
            _=f.readline()

            for i in range(n):
                distances[i,:] = (list(map(int, f.readline().split())))
            for j in range(n):
                flows[j,:] = (list(map(int, f.readline().split())))

        return QAP(n=n,distances=distances,flows=flows)

    # reinicia las soluciones encontradas
    def reset(self):
        self.best_fit_record = []
        self.best_fit = np.Inf
        self.best_sol = []     

### Operadores

In [79]:
from difflib import SequenceMatcher

# # OPERADOR SELECCION
# # :pop_fitness  -> lista con los fitness de la población
# # :prop_tourn     -> proporción de los individuos
# # :return       -> los índices de los individuos seleccionados
def seleccion_por_torneo(pop_fitness, prop_tourn:float=0.2):

    prop_tourn = max(0.1, prop_tourn)  # definimos los limites de esta variable
    prop_tourn = min(1., prop_tourn)
    
    p_size = len(pop_fitness)                           # tamaño de la población (número de soluciones)
    tourn_size = int(np.ceil(prop_tourn * p_size))      # cantidad de soluciones participantes

    sub_sample_1 = np.random.permutation(p_size)[range(tourn_size)] # submuestra aleatoria compuesta por los índices de los individuos
    best1_indx = np.argmin(pop_fitness[sub_sample_1])               # seleccionamos la mejor muestra, el menor valor
    orig_best1_indx = sub_sample_1[best1_indx]                      # guardamos el índice del mejor individuo de la población original

    orig_best2_indx = orig_best1_indx
    while(orig_best1_indx ==  orig_best2_indx):
        sub_sample_2 = np.random.permutation(p_size)[range(tourn_size)]     # escogemos otra submuestra
        best2_indx = np.argmin(pop_fitness[sub_sample_2])                   # seleccionamos el mejor
        orig_best2_indx = sub_sample_2[best2_indx]                          # guardamos su índice

    return orig_best1_indx, orig_best2_indx

# # OPERADOR CRUCE
# # :parent1, parent2     -> individuos que representan soluciones al problema
# # :low_rate, upp_rate   -> proporcion de individuos que se van a cruzar
# # :return               -> descendientes
def operador_cruce(parent1, parent2, low_rate=0.3, upp_rate=0.7):
    
    l = len(parent1)    # longitud de los padres
    par_size = int(np.ceil( l * np.random.uniform(low=low_rate, high=upp_rate, size=1))[0]) # tamaño de la sección que se va a cruzar
    
    start_point = np.random.randint(l)                          # punto de inicio aleatorio
    cop_points = (start_point + np.array(range(par_size))) % l  # calcula los puntos de cruce
    end_point = (start_point + par_size) % l                    # punto final
    rep_points_all = (end_point + np.array(range(l))) % l       # calcula los puntos que se van a reemplazar
    rep_points = rep_points_all[0:(l - par_size)]               # selecciona los puntos que deben ser reemplazados

    off1 = np.zeros(l, dtype=int)               # decendientes
    off2 = np.zeros(l, dtype=int)
    off1[cop_points] = parent1[cop_points]
    off2[cop_points] = parent2[cop_points]

    parent2_cont = np.setdiff1d(parent2[rep_points_all], off1[cop_points], assume_unique=True)
    parent1_cont = np.setdiff1d(parent1[rep_points_all], off2[cop_points], assume_unique=True)

    off1[rep_points] = parent2_cont[0:(l - par_size)]
    off2[rep_points] = parent1_cont[0:(l - par_size)]

    return off1, off2

# # OPERADOR MUTACION
# # :individual     -> el individuo a mutar
# # :mutation_rate  -> proporcion de mutacion
def operador_mutacion(individual, mutation_rate:float, inplace=True):

    # Se decide si se hará sobre el individuo, o una copia
    if inplace:
        mutated_invidual = individual
    else:
        mutated_invidual = individual.copy()

    dimension = len(individual)                                                         # tamaño del individuo
    mutation_size = int(np.ceil(np.random.uniform(0, mutation_rate) * dimension))       # tamaño de la mutacion
    random_positions = np.random.permutation(dimension)[0:mutation_size]                # posiciones random que serán intercambiadas
    swapped_positions = np.random.permutation(random_positions)                         # las desordenamos
    mutated_invidual[random_positions] = mutated_invidual[swapped_positions]            # y las intercambiamos
    
    return mutated_invidual

# Reemplazo basado en fitness
def reemplazo_ajuste(pop_fitness, sel_size:int):
    return np.argsort(pop_fitness)[range(sel_size)]

# Reemplazo basado en edad
def reemplazo_edad(pop_fitness, sel_size:int):
    return range(sel_size, len(pop_fitness))

# Diversidad de la población
def diversidad_poblacion(population) -> float:
    pop_size, _ = np.shape(population)
    result_matrix = np.zeros(shape=(pop_size, pop_size))

    for i in range(pop_size):
        ind = population[i,:].tolist()
        matches = np.apply_along_axis(lambda x: SequenceMatcher(None, a=ind, b=x.tolist()).ratio(), 1, population)
        result_matrix[i, :] = matches

    return np.tril(result_matrix, k=-1).sum()/((pop_size**2 - pop_size)/2)

# Diversidad promedio de la población
def promedio_diversidad_poblacion(population) -> float:
    mean_individual = np.mean(population, axis=0)
    diff_matrix = np.abs(population - mean_individual)
    return round(np.mean(diff_matrix)/255., 3)



# Parte 2. Algoritmo Tradicional

In [80]:
from tqdm.notebook import tqdm
     
class TraditionalGA:
    def __init__(self, qa_problem:QAP, pop_size:int=50, crossover_rate:float=0.8, mutation_rate:float=0.05, replace_fn=reemplazo_ajuste) -> None:
        self.qa_problem = qa_problem
        self.pop_size = pop_size
        self.cross_rate = crossover_rate
        self.mutation_rate = mutation_rate
        self.population = np.empty((self.pop_size, self.qa_problem.n),  dtype=int)
        self.fitness = np.zeros(self.pop_size, dtype=int)
        self.replace = replace_fn
        self.sorted_fit_indxs = []
        self.name = "TraditionalGA"

    def initialize(self):
        for i in range(self.pop_size):
            x = np.random.permutation(self.qa_problem.n)
            f = self.qa_problem(x)
            self.population[i,:] = x
            self.fitness[i] = f
        
        self.sorted_fit_indxs = np.argsort(self.fitness)

    # seleccion
    def seleccion(self):
        par1_idx, par2_idx = seleccion_por_torneo(self.fitness, prop_tourn=0.3)
        par1 = self.population[par1_idx]
        par2 = self.population[par2_idx]
        return par1, par2
    
    # cruce
    def cruce(self, par1, par2):
        # Crossover
        if(np.random.random() < self.cross_rate):
            off1, off2 = operador_cruce(par1, par2)
        else:
            off1, off2 = par1.copy(), par2.copy()
        return off1, off2

    # mutacion
    def mutacion(self, off1, off2):
        # Mutation
        operador_mutacion(off1, self.mutation_rate)
        operador_mutacion(off2, self.mutation_rate)
        return off1, off2

    # evaluar
    def evaluar(self, individual):
        return self.qa_problem(individual)

    # evolucionar
    def evolucionar(self):
        par1, par2 = self.seleccion()                      # seleccionamos los mejores individuos
        off1, off2 = self.cruce(par1, par2)         # obtenemos su descendencia
        if np.random.random() < self.mutation_rate:
            off1, off2 = self.mutacion(off1, off2)        # se aplica una mutacion aleatoria

        # Offspring fitness evaluation
        fit1 = self.evaluar(off1)                      # se evalua la descendencia
        fit2 = self.evaluar(off2)

        return off1, fit1, off2, fit2           # retorna la descendencia y sus ajustes

    def evolucionar_poblacion(self):
        offspring = []
        off_fitness = []
        for i in range(self.pop_size//2):       # se añaden nuevos individuos a la lista de descendientes

            off1, fit1, off2, fit2 = self.evolucionar()
            offspring.append(off1)
            offspring.append(off2)
            off_fitness.append(fit1)
            off_fitness.append(fit2)

        return offspring, off_fitness
    
    def __call__(self, max_gens):
        
        self.initialize() # Esto cuenta como una iteración

        pbar = tqdm(range(1, max_gens))
        
        for i in pbar:
            
            offspring, off_fitness = self.evolucionar_poblacion()

            # Create an extender pool of individuals (population + offspring)
            all_fits = np.append(self.fitness, off_fitness)
            all_pop = np.append(self.population, offspring, axis=0)
            
            # Replacement: select the top p_size fittest individuals            
            rep_indxs = self.replace(all_fits, self.pop_size)
            
            self.population = all_pop[rep_indxs]
            self.fitness = all_fits[rep_indxs]

            self.sorted_fit_indxs = np.argsort(self.fitness)

            nota = round(max(5 - 100 * (self.qa_problem.best_fit - 44759294)/44759294, 0),1)

            pop_div = promedio_diversidad_poblacion(self.population)

            pbar.set_description(f"{self.name} --> Best fitness: {self.qa_problem.best_fit} Pop.Div.: {pop_div} Nota: {nota}")

# Parte 3. Algoritmos Genéticos con búsqueda local

### 2-opt

In [81]:
# def: funcion que invierte los elementos de una solucion en el rango indicado
# param: solution -> solucion
# param: init_pos -> posicion inicial
# param: end_pos -> posicion final
# return: nueva solucion
def swap_and_reverse(solution, init_pos:int, end_pos:int) -> list:
    changed_solution = solution.copy()
    changed_solution[init_pos:(end_pos+1)] = list(reversed(changed_solution[init_pos:(end_pos+1)]))
    return changed_solution

# def: funcion que implementa el algoritmo de optimizacion local 2-opt
# param: problem -> problema QAP
# param: solution -> solucion actual
# param: solution_fitness -> el ajuste de la solucion actual
# param: swap_size -> tamaño del intercambio
# return: mejor solucion encontrada y su ajuste
def two_opt(problem:QAP, solution, solution_fitness, swap_size):

    best_fitness = solution_fitness
    best_solution = solution.copy()

    swap_init = np.random.randint(problem.n - swap_size)
    swap_end = swap_init + swap_size

    for swap_first in range(swap_init, swap_end - 1):
        for swap_last in range(swap_first + 1, swap_end):

            new_solution = swap_and_reverse(best_solution, init_pos=swap_first, end_pos=swap_last)

            new_fitness = problem(new_solution)

            if new_fitness < best_fitness:
                best_fitness = new_fitness
                best_solution = new_solution.copy()
                
    return best_solution, best_fitness

### Variante **Baldwiniana**

In [82]:
class BaldwinianGA(TraditionalGA):
    
    def __init__(self, qa_problem: QAP, pop_size: int = 50, crossover_rate: float = 0.8, mutation_rate: float = 0.05, replace_fn=reemplazo_ajuste, swap_size=10) -> None:
        super().__init__(qa_problem, pop_size, crossover_rate, mutation_rate, replace_fn)
        self.swap_size = swap_size
        self.name = "BaldwinianGA"

    def evaluar(self, individual):
        best_sol = individual.copy()
        best_fit = super().evaluar(individual)

        _, newfit = two_opt(problem=self.qa_problem, solution=best_sol, solution_fitness=best_fit, swap_size=self.swap_size)

        return newfit
    

### Variante Lamarckiana

In [83]:
class LamarckianGA(TraditionalGA):
    
    def __init__(self, qa_problem: QAP, pop_size: int = 50, crossover_rate: float = 0.7, mutation_rate: float = 0.01, replace_fn=reemplazo_ajuste, swap_size=10) -> None:
        super().__init__(qa_problem, pop_size, crossover_rate, mutation_rate, replace_fn)
        self.swap_size = swap_size
        self.name = "LamarckianGA"

    def evaluar(self, individual):
        best_sol = individual[:]
        best_fit = super().evaluar(individual)

        new_ind, new_fit = two_opt(problem=self.qa_problem, solution=best_sol, solution_fitness=best_fit, swap_size=self.swap_size)

        if new_fit < best_fit:
            individual[:] = new_ind.copy()

        return new_fit

# Parte 4. Test

In [84]:
np.random.seed(12345)

problem = QAP.build_from_file("tai256c.dat")

max_gens = 10000
poblacion = 300
c_rate = 0.7
m_rate = 0.1
f_reemplazo = reemplazo_ajuste
tam_intercambio = 10

problem.reset()
tradicional = TraditionalGA(problem, poblacion, c_rate, m_rate, f_reemplazo)
tradicional(max_gens)
print(problem.best_fit)
print(problem.best_sol)
plt.plot(problem.best_fit_record)
plt.show()

# Descomentar para probar la variante balwiniana
# problem.reset()
# baldwin = BaldwinianGA(problem, poblacion, c_rate, m_rate, f_reemplazo, tam_intercambio)
# baldwin(max_gens) 
# print(problem.best_fit)
# print(problem.best_sol)
# plt.plot(problem.best_fit_record)
# plt.show()

# Descomentar para probar la variante lamarckiana
# problem.reset()
# lamarck = LamarckianGA(problem, poblacion, c_rate, m_rate, f_reemplazo, tam_intercambio)
# lamarck(max_gens) 
# print(problem.best_fit)
# print(problem.best_sol)
# plt.plot(problem.best_fit_record)
# plt.show()

# Descomentar para probar con otras semillas
# for seed in [10000, 11111, 13333]:
#     np.random.seed(seed)
#     problem.reset()
#     tradicional = TraditionalGA(problem, poblacion, c_rate, m_rate, f_reemplazo)
#     tradicional(max_gens)
#     print(seed)
#     print(problem.best_fit)
#     print(problem.best_sol)
#     plt.plot(problem.best_fit_record)
#     plt.show()





  0%|          | 0/9999 [00:00<?, ?it/s]

KeyboardInterrupt: 