# Práctica Final:
## Resolución de un problema de regresión lineal mediante algoritmos genéticos
### Computación 2023 Carlos Jimeno Miguel

## -----------------------------------------------------------------------------

# <span style="color:#C97828"> Desarrollo de nuestro código </span>

##### Librerias utilizadas

In [215]:
import numpy as np
import pandas as pd
import time as tm

##### Carga de datos y conversión a numpy arrays

In [216]:
def carga_datos():
    diabetes_df = pd.read_csv('../datasets/diabetes_normalized.dat')
    laser_df = pd.read_csv('../datasets/laser_normalized.dat')
    quake_df = pd.read_csv('../datasets/quake_normalized.dat')

    diabetes_nparray = diabetes_df.to_numpy()
    laser_nparray = laser_df.to_numpy()
    quake_nparray = quake_df.to_numpy()

    return diabetes_nparray, laser_nparray, quake_nparray

##### **1. Solución al modelo de regresión:** Calculemos la solución de la recta de regresión del conjunto de datos

In [217]:
def obten_solucion(dataset):
    row_length = len(dataset[0])
    model_vars = dataset[:, 1:row_length - 1]           # Variables predictoras del modelo
    pred_value = dataset[:, row_length - 1]             # Valor predictor de las variables
    beta_arr = np.zeros(row_length - 2)                 # Coeficientes de regresion (extraemos la variable dependiente)

    # Calculamos la solucion del modelo
    # Calcularemos los coeficientes de regresion como sigue:
    # beta_n = sum((x_i - x_medio) * (y_i - y_medio)) / sum((x_i - x_medio)^2)
    independent_mean = np.array([np.mean(dataset[:, x + 1]) for x in range(row_length - 2)])
    dependent_mean = np.mean(dataset[:, row_length - 1])

    for i in range(row_length - 2):
        beta_arr[i] = sum((model_vars[:, i] - independent_mean[i]) * (pred_value - dependent_mean)) / sum((model_vars[:, i] - independent_mean[i])**2)

    beta_zero = dependent_mean
    for i in range(row_length - 2):
        beta_zero -= beta_arr[i] * independent_mean[i]

    model_solution = np.concatenate(([beta_zero], beta_arr))

    return model_solution


##### **2. Función de fitness:** Usaremos el error cuadrático medio entre los valores predichos y la solución exacta del modelo <span style="color:red">MINIMIZAR</span>

In [218]:
def fitness(actual_cromosome, model_sol):
    return np.square(np.subtract(actual_cromosome, model_sol)).mean()

##### **3. Inicialización de la población:** Codificaremos los cromosomas con los valores de los diferentes coeficientes de regresión, y los inicializaremos aleatoriamente
$$
C_i = (\beta_0, \beta_1, ..., \beta_n)
$$

<span style="color:green">Nota:</span> Al estar los valores normalizados entre 0 y 1 los valores de los cromosomas estarán en el rango [0, 1]

In [219]:
def ini_poblacion(cromosome_length, pob_size):
    return np.random.rand(pob_size, cromosome_length)

##### **4. Asignación de probabilidades:** Asignaremos probabilidades a los diferentes cromosomas para el método de selección proporcional al ajuste (método de la ruleta)

In [220]:
def asign_probabilidad(pob_size, fit_array):
    prob_array = np.zeros(pob_size)
    fit_sum = np.sum(fit_array)
    for i in range(pob_size):
        prob_array[i] = fit_array[i] / fit_sum
    return prob_array


##### **5. Asignación de subintervalos en función de las probabilidades:** Repartimos las probabilidades anteriores para el método de la ruleta

In [221]:
def divide_subintervalos(prob_arr):
    subintervals = list()
    ini = 0
    fin = prob_arr[0]
    subintervals.append([ini, fin])
    for i in range(len(prob_arr) - 1):
        ini = fin
        fin = sum(prob_arr[:i + 1])
        subintervals.append([ini, fin])
    subintervals[-1][1] = 1
    return subintervals


##### **6. Implementación del método estocástico (sin reemplazamiento) del torneo:** Una vez seleccionados _k_ individuos les asignamos probabilidad en función de sus fitness 

In [222]:
def torneo_estocastico(actual_poblation, num_contestants, exact_sol):
    fitness_arr = np.zeros(num_contestants)
    idx = np.random.choice(len(actual_poblation), num_contestants, replace=False)
    tourney_elems = actual_poblation[idx, :]
    for i in range(num_contestants):
        fitness_arr[i] = fitness(tourney_elems[i], exact_sol)
    _, sorted_tourney_elems = zip(*sorted(zip(fitness_arr, tourney_elems), key=lambda x: x[0]))

    p_prob = np.random.rand()
    tourney_probs = np.zeros(num_contestants)
    for k in range(num_contestants):
        tourney_probs[k] = p_prob * ((1 - p_prob)**k)

    tourney_probs /= tourney_probs.sum()    # Normalizamos las probabilidades para que puedan sumar 1

    return sorted_tourney_elems[np.random.choice(len(sorted_tourney_elems), p=tourney_probs)]
    

##### **7a. Selección de progenitores:** Por el método de la ruleta

In [223]:
def selecciona_prog_ruleta(poblation, model_result, num_prog):
    fit_vals = np.zeros(len(poblation))
    for i in range(len(fit_vals)):
        fit_vals[i] = fitness(poblation[i], model_result)
    poblation_probabilities = asign_probabilidad(len(poblation), fit_vals)
    intervals = divide_subintervalos(poblation_probabilities)

    parents = list()
    for i in range(num_prog):
        select_prob = np.random.rand()
        for interval in intervals:
            if (select_prob >= interval[0] and select_prob < interval[1]):
                parents.append(poblation[intervals.index(interval)])
                break

    return parents


##### **7b. Selección de progenitores:** Por el método del torneo estocástico

In [224]:
def selecciona_prog_torneo(poblation, model_result, num_prog):
    parents = list()
    for i in range(num_prog):
        parents.append(torneo_estocastico(poblation, num_prog, model_result))
    return parents

##### **8a. Cruzamiento:** Utilizaremos el cruzamiento aritmético total:
$$
Cnew_i = \alpha * x_i + (1 - \alpha) * y_i \qquad con \ \alpha \in [0, 1] \quad x_i \in C_x \quad e \quad y_i \in C_y
$$

In [225]:
def cruzamiento_total(parent_a, parent_b, alpha):
    child = np.zeros(len(parent_a))
    for i in range(len(parent_a)):
        child[i] = (alpha * parent_a[i]) + ((1.0 - alpha) * parent_b[i])
    return child

##### **8b. Cruzamiento:** Utilizaremos el cruzamiento simple
$$
Cnew_i = (x_i + y_i) * \alpha \qquad con \ \alpha \in [0, 1] \quad x_i \in C_x \quad e \quad y_i \in C_y
$$

In [226]:
def cruzamiento_simple(parent_a, parent_b, alpha):
    pos = np.random.randint(0, len(parent_a))
    child = np.copy(parent_a)

    for i in range(pos, len(parent_a)):
        child[i] = (parent_a[i] + parent_b[i]) * alpha
    
    return child

##### **9. Mutación:** Utilizaremos la mutación no uniforme según una distribución fija (una distribución normal con media = 0 y desviación estandar pasada como parámetro)

In [227]:
def muta_no_uniforme(cromosome, deviation):
    probability = 0.70
    for i in range(len(cromosome)):
        if (np.random.uniform(0, 1) < probability):
            cromosome[i] += np.random.normal(0, 1) * deviation
            if (cromosome[i] < 0):
                cromosome[i] = 0
            elif (cromosome[i] > 1):
                cromosome[i] = 1
    return cromosome

##### **10. Selección de supervivientes:** Utilizaremos un enfoque elitista, si para una población $z$ generamos otros $z$ descendientes, de los $2z$ individuos nos quedaremos con los $z$ mejores

In [228]:
def select_supers(poblation_fit, childs_fit, population):
    total_fit = np.concatenate((poblation_fit, childs_fit))
    _, sorted_population = zip(*sorted(zip(total_fit, population), key=lambda x: x[0]))
    return sorted_population[0:len(population) - len(childs_fit)]
    

# <span style="color:#C97828"> Ejecución del programa principal </span>

### _Ejecución dataset diabetes_ Para poder ejecutar un tipo de selección de progenitores y/o cruzamiento, comentar la linea encargada de ello y descomentar el otro método

In [229]:
def main_diabetes(tolerance, max_iters, std_dev, alfa, prob_cruce, num_prog):
    start = tm.time()

    # Cargamos los arrays correspondientes al conjunto de datos
    diabetes_nparr, _, _ = carga_datos()

    # Obtenemos la solucion real para el dataset
    exact_sol_diab = obten_solucion(diabetes_nparr)

    # Creemos la poblacion inicial
    ini_diabetes_pob = ini_poblacion(len(exact_sol_diab), 20)

    actual_poblation_diabetes = ini_diabetes_pob

    cont = 0
    while (cont < max_iters):
        # Evaluemos la poblacion
        fit_values_diabetes = np.zeros(len(actual_poblation_diabetes))

        for i in range(len(ini_diabetes_pob)):
            fit_values_diabetes[i] = fitness(actual_poblation_diabetes[i], exact_sol_diab)
            if (fit_values_diabetes[i] < tolerance):
                print("DATASET DIABETES:")
                print(f'Numero iteraciones: {cont} Aproximacion: {actual_poblation_diabetes[i]}')
                print(f'Fitness: {fit_values_diabetes[i]} Resultado exacto: {exact_sol_diab}')
                print(f'Tiempo: {tm.time() - start}s')
                print("--------------------")
                return

        # Seleccionemos los progenitores
        parents_r_diabetes = selecciona_prog_ruleta(actual_poblation_diabetes, exact_sol_diab, num_prog)
        # parents_r_diabetes = selecciona_prog_torneo(actual_poblation_diabetes, exact_sol_diab, num_prog)

        # Crucemos los progenitores
        childs = list()
        parents = list(parents_r_diabetes)
        for i in range(len(parents_r_diabetes)):
            idx_a = np.random.randint(0, len(parents))
            parent_a = np.array(parents[idx_a])
            if (np.random.uniform(0, 1) < prob_cruce):
                idx_b = idx_a
                while (idx_b == idx_a):
                    idx_b = np.random.randint(0, len(parents))
                parent_b = np.array(parents[idx_b])
                #childs.append(cruzamiento_total(parent_a, parent_b, alfa))
                childs.append(cruzamiento_simple(parent_a, parent_b, alfa))
            else:
                childs.append(parent_a)

        # Mutemos los hijos
        for cromosome in childs:
            cromosome = muta_no_uniforme(cromosome, std_dev)

        # Evaluemos los hijos
        fit_child = list()
        for i in range(len(childs)):
            fit_child.append(fitness(childs[i], exact_sol_diab))
            
        # Seleccionemos los supervivientes
        pob_total_diab = np.concatenate((actual_poblation_diabetes, np.array(childs)))
        actual_poblation_diabetes = select_supers(fit_values_diabetes, fit_child, pob_total_diab)
        actual_poblation_diabetes = np.asarray(actual_poblation_diabetes)

        cont += 1
    
    print("No converge")
    print(f'Tiempo: {tm.time() - start}s')
    print("--------------------")

### _Ejecución dataset laser_ Para poder ejecutar un tipo de selección de progenitores y/o cruzamiento, comentar la linea encargada de ello y descomentar el otro método

In [230]:
def main_laser(tolerance, max_iters, std_dev, alfa, prob_cruce, num_prog):
    start = tm.time()

    # Cargamos los arrays correspondientes al conjunto de datos
    _, laser_nparr, _ = carga_datos()
        
    # Obtenemos la solucion real para el dataset
    exact_sol_laser = obten_solucion(laser_nparr)

    # Creemos la poblacion inicial
    ini_laser_pob = ini_poblacion(len(exact_sol_laser), 500)

    actual_poblation_laser = ini_laser_pob

    cont = 0
    while (cont < max_iters):
        # Evaluemos la poblacion
        fit_values_laser = np.zeros(len(actual_poblation_laser))
            
        for i in range(len(ini_laser_pob)):
            fit_values_laser[i] = fitness(actual_poblation_laser[i], exact_sol_laser)
            if (fit_values_laser[i] < tolerance):
                print("DATASET LASER:")
                print(f'Numero iteraciones: {cont} Aproximacion: {actual_poblation_laser[i]}')
                print(f'Fitness: {fit_values_laser[i]} Resultado exacto: {exact_sol_laser}')
                print(f'Tiempo: {tm.time() - start}s')
                print("--------------------")
                return

        # Seleccionemos los progenitores
        # parents_r_laser = selecciona_prog_ruleta(actual_poblation_laser, exact_sol_laser, num_prog)
        parents_r_laser = selecciona_prog_torneo(actual_poblation_laser, exact_sol_laser, 50)

        # Crucemos los progenitores
        childs = list()
        parents = list(parents_r_laser)
        for i in range(len(parents_r_laser)):
            idx_a = np.random.randint(0, len(parents))
            parent_a = np.array(parents[idx_a])
            if (np.random.uniform(0, 1) < prob_cruce):
                idx_b = idx_a
                while (idx_b == idx_a):
                    idx_b = np.random.randint(0, len(parents))
                parent_b = np.array(parents[idx_b])
                #childs.append(cruzamiento_total(parent_a, parent_b, alfa))
                childs.append(cruzamiento_simple(parent_a, parent_b, alfa))
            else:
                childs.append(parent_a)

        # Mutemos los hijos
        for cromosome in childs:
            cromosome = muta_no_uniforme(cromosome, std_dev)

        # Evaluemos los hijos
        fit_child = list()
        for i in range(len(childs)):
            fit_child.append(fitness(childs[i], exact_sol_laser))
            
        # Seleccionemos los supervivientes
        pob_total_laser = np.concatenate((actual_poblation_laser, np.array(childs)))
        actual_poblation_laser = select_supers(fit_values_laser, fit_child, pob_total_laser)
        actual_poblation_laser = np.asarray(actual_poblation_laser)

        cont += 1
        
    print("No converge")
    print(f'Tiempo: {tm.time() - start}s')
    print("--------------------")

### _Ejecución dataset quake_ Para poder ejecutar un tipo de selección de progenitores y/o cruzamiento, comentar la linea encargada de ello y descomentar el otro método

In [231]:
def main_quake(tolerance, max_iters, std_dev, alfa, prob_cruce, num_prog):
    start = tm.time()

    # Cargamos los arrays correspondientes al conjunto de datos
    _, _, quake_nparr = carga_datos()

    # Obtenemos la solucion real para el dataset
    exact_sol_quake = obten_solucion(quake_nparr)

    # Creemos la poblacion inicial
    ini_quake_pob = ini_poblacion(len(exact_sol_quake), 2000)

    actual_poblation_quake = ini_quake_pob

    cont = 0
    while (cont < max_iters):
        # Evaluemos la poblacion
        fit_values_quake = np.zeros(len(actual_poblation_quake))

        for i in range(len(ini_quake_pob)):
            fit_values_quake[i] = fitness(actual_poblation_quake[i], exact_sol_quake)
            if (fit_values_quake[i] < tolerance):
                print("DATASET QUAKE:")
                print(f'Numero iteraciones: {cont} Aproximacion: {actual_poblation_quake[i]}')
                print(f'Fitness: {fit_values_quake[i]} Resultado exacto: {exact_sol_quake}')
                print(f'Tiempo: {tm.time() - start}s')
                print("--------------------")
                return

        # Seleccionemos los progenitores
        # parents_r_quake = selecciona_prog_torneo(actual_poblation_quake, exact_sol_quake, num_prog)
        parents_r_quake = selecciona_prog_ruleta(actual_poblation_quake, exact_sol_quake, num_prog)

        # Crucemos los progenitores
        childs = list()
        parents = list(parents_r_quake)
        for i in range(len(parents_r_quake)):
            idx_a = np.random.randint(0, len(parents))
            parent_a = np.array(parents[idx_a])
            if (np.random.uniform(0, 1) < prob_cruce):
                idx_b = idx_a
                while (idx_b == idx_a):
                    idx_b = np.random.randint(0, len(parents))
                parent_b = np.array(parents[idx_b])
                # childs.append(cruzamiento_total(parent_a, parent_b, alfa))
                childs.append(cruzamiento_simple(parent_a, parent_b, alfa))
            else:
                childs.append(parent_a)

        # Mutemos los hijos
        for cromosome in childs:
            cromosome = muta_no_uniforme(cromosome, std_dev)

        # Evaluemos los hijos
        fit_child = list()
        for i in range(len(childs)):
            fit_child.append(fitness(childs[i], exact_sol_quake))
            
        # Seleccionemos los supervivientes
        pob_total_quake = np.concatenate((actual_poblation_quake, np.array(childs)))
        actual_poblation_quake = select_supers(fit_values_quake, fit_child, pob_total_quake)
        actual_poblation_quake = np.asarray(actual_poblation_quake)

        cont += 1

    print("No converge")
    print(f'Tiempo: {tm.time() - start}s')
    print("--------------------")

## _Ejecución general_

In [234]:
if __name__ == "__main__":
    # Definimos los parametros para nuestro algoritmo
    tolerance = 1e-4
    max_iters = 1e3
    std_dev = 1.5
    alfa = 0.25
    prob_cruce = 0.65
    num_prog = 20
    main_diabetes(tolerance, max_iters, std_dev, alfa, prob_cruce, num_prog)
    main_laser(tolerance*1000, max_iters, std_dev, alfa, prob_cruce, num_prog*10)
    main_quake(tolerance, max_iters, std_dev, alfa, prob_cruce, num_prog*100)  

DATASET DIABETES:
Numero iteraciones: 526 Aproximacion: [0.         0.34266209 0.39254869]
Fitness: 4.790321947231048e-05 Resultado exacto: [0.01111626 0.33926845 0.39548496]
Tiempo: 0.8430745601654053s
--------------------
DATASET LASER:
Numero iteraciones: 2 Aproximacion: [0.09136055 0.94313531 0.         0.         0.63789854]
Fitness: 0.09564604474932989 Resultado exacto: [ 0.12638563  0.77068876 -0.63004538 -0.19529967  0.52759662]
Tiempo: 0.09442973136901855s
--------------------
DATASET QUAKE:
Numero iteraciones: 6 Aproximacion: [0.13534717 0.         0.04917508 0.        ]
Fitness: 7.29213434147197e-05 Resultado exacto: [ 0.13941657 -0.00984116  0.05748688 -0.01044945]
Tiempo: 5.175872087478638s
--------------------
