# Integrantes de la prácticas

* Alejandro Cortijo Benito
* Alejandro García Mota

# Librerias utilizadas

In [26]:
import sys
import os

import random
import numpy as np

import gymnasium as gym
from gymnasium.wrappers import RecordVideo

from MLP import MLP
from tqdm import tqdm

from loky import get_reusable_executor

EXECUTOR = get_reusable_executor()

# Arquitectura del MLP

In [9]:
ARCHITECTURE = [8, 6, 4]
ARCHITECTURE

[8, 6, 4]

# Funciones utilizadas

## Algoritmos genéticos de cruces

Para el desarrollo de la práctica probamos diferentes estrategías para combinar los pesos de los "padres". En la siguiente celda, dejaremos algunas de las propuestas que tuvimos en cuenta. Cabe destacar que la que mejores resultados nos dió fue `simulated_binary_crossover()` y será la que usaremos en las siguientes secciones.

In [11]:
def simulated_binary_crossover(ind1, ind2, pcross, eta=2):
    ind1_copy, ind2_copy = [*ind1], [*ind2]
    for i in range(len(ind1)):
        if random.random() < pcross:
            u = random.random()
            beta = (2 * u) ** (1 / (eta + 1)) if u <= 0.5 else (1 / (2 * (1 - u))) ** (1 / (eta + 1))
            ind1_copy[i] = 0.5 * ((1 + beta) * ind1[i] + (1 - beta) * ind2[i])
            ind2_copy[i] = 0.5 * ((1 - beta) * ind1[i] + (1 + beta) * ind2[i])
    return ind1_copy, ind2_copy

def crossover_real_numbers (ind1, ind2, pcross):
    ind1_copy, ind2_copy = [*ind1], [*ind2]
    for i in range(len(ind1)):
        if random.random() > pcross:
            beta = random.uniform(1e-6, 1-1e-6)
            ind1_copy[i] = beta * ind1[i] + (1 - beta) * ind2[i]
            ind2_copy[i] = beta * ind2[i] + (1 - beta) * ind1[i]

    return ind1_copy, ind2_copy


def blend_crossover(ind1, ind2, pcross, alpha=0.5):
    ind1_copy, ind2_copy = [*ind1], [*ind2]
    for i in range(len(ind1)):
        if random.random() < pcross:
            gamma = (1 + 2 * alpha) * random.random() - alpha
            ind1_copy[i] = (1 - gamma) * ind1[i] + gamma * ind2[i]
            ind2_copy[i] = gamma * ind1[i] + (1 - gamma) * ind2[i]
    return ind1_copy, ind2_copy

## Algoritmos genéticos de mutación

De manera similar a los algoritmos de cruce, hemos probado varios algoritmos de mutaciones con el proposito de mejorar los resultados poco a poco. Finalmente, decidimos seleccionar una `random_mutation()` entre una mutación polinomial y una gaussiana. Cabe destacar que utilizar una `gaussian_mutation()` también da buenos resultados, pero consideramos que establecer un poco de aleatoriedad puede enriquecer aún más el proceso.

In [12]:
rang = (-1, 1)

def polynomial_mutation(ind, pmut, eta=2):
    ind_copy = [*ind]
    for i in range(len(ind)):
        if random.random() < pmut:
            u = random.random()
            delta = (2 * u) ** (1 / (eta + 1)) - 1 if u < 0.5 else 1 - (2 * (1 - u)) ** (1 / (eta + 1))
            ind_copy[i] += delta
    return ind_copy

def mutate_swap_real_numbers (ind, pmut): 
    ind_copy = [*ind]
    for i in range(len(ind)):
        if random.random() < pmut:
            ind_copy[i] = random.uniform(rang[0], rang[1])
    return ind_copy

def gaussian_mutation(ind, pmut, sigma=0.2):
    ind_copy = [*ind]
    for i in range(len(ind)):
        if random.random() < pmut:
            ind_copy[i] += random.gauss(0, sigma)
    return ind_copy

def random_mutation(ind, pmut):
    options = [polynomial_mutation, gaussian_mutation]

    return random.choice(options)(ind, pmut)

## Política de acción

Hemos seguido una política greedy para decidir que acción tomar. Al principio, probamos a explotar cada acción seleccionando el máximo pero no conseguimos buenos.   

In [13]:
def policy(model, observation, epsilon=0.01):
    s = model.forward(observation) 
    if np.random.rand() < epsilon:  
        action = np.random.randint(len(s))
    else: 
        action = np.argmax(s)
    return action

## Fitness 

Utilizando como referencia la función `run()` de la plantilla de la práctica, decidimos implementar nuestra propia función para evaluar qué tan buena es nuestra MLP. Para mejorar el refuerzo hemos establecido un bucle de $3$ iteraciones.

In [15]:
def fitness (ch):
    env = gym.make("LunarLander-v3", render_mode=None)

    rewards_list = []
    for _ in range(3):
        observation, _ = env.reset()
        racum = 0
        while True:
            model = MLP(ARCHITECTURE)
            model.from_chromosome(ch)
            action = policy(model, observation)
            observation, reward, terminated, truncated, _ = env.step(action)
            racum += reward

            if terminated or truncated:
                rewards_list.append(racum)
                break
    
    return sum(rewards_list) / len(rewards_list)

## Proceso de evolución

En este proceso además de usar las funcione definidas previamente, también hemos definido otras como: `select()` para devolver un individuo por torneo, `sort_pop()` pra ordenar la población en base al fitness obtenido, `show()` para ver durante la evolución el desempeño del mejor individuo a tiempo real y `evolution()` donde se lleva acabo el proceso de evolución y guardamos los pesos de los mejores individuos.

In [21]:
def show(ind):
    env = gym.make("LunarLander-v3", render_mode="human")

    observation, _ = env.reset()
    while True:
        model = MLP(ARCHITECTURE)
        model.from_chromosome(ind)
        action = policy(model, observation)
        observation, _, terminated, truncated, _ = env.step(action)

        if any([truncated, terminated]):
            observation, _ = env.reset()
            break

    env.close()

def select (pop, T): 
    selected = [random.randint(0, len(pop)-1) for _ in range(T)]
    return [*pop[min(selected)]]

def sort_pop (pop, fit): 
    fitness_list = EXECUTOR.map(fit, pop) # Loky
    sorted_pop_fitness = sorted(zip(pop, fitness_list), key=lambda x: x[1], reverse=True)
    return [x[1] for x in sorted_pop_fitness], [x[0] for x in sorted_pop_fitness]

def evolution (pop, fit, pmut, pcross=0.7, ngen=100, T=2):
    initial_pop = [*pop]
    historical_best = []
    best_fitness = sys.maxsize * -1
    pbar = tqdm(range(ngen), desc="Processing")
    for i in pbar:
        sorted_fitnesses, sorted_pop = sort_pop(initial_pop, fit)
        current_best = sorted_pop[0]
        selected_pop = [select(sorted_pop, T) for _ in range(len(initial_pop))]

        crossed_pop = []
        for j in range(0, len(selected_pop)-1, 2):
            crossed_pop.extend(simulated_binary_crossover(selected_pop[j], selected_pop[j+1], pcross))
        if len(selected_pop) % 2 != 0:
            crossed_pop.append(selected_pop[-1])
        
        mutated_pop = [random_mutation(ind, pmut) for ind in crossed_pop]
        
        if  sorted_fitnesses[0] > best_fitness:
            show(current_best)
            historical_best = current_best
            best_fitness = sorted_fitnesses[0]
            np.save("weight/current_best_chromosome.npy", historical_best)
            np.save("weight/current_best_architecture.npy", ARCHITECTURE)

        initial_pop = mutated_pop
        
        pbar.set_postfix(current_best=sorted_fitnesses[0], best_fitness=best_fitness)


    initial_pop.insert(0, historical_best)
    return initial_pop

## Experimentos

En la siguiente celda dejamos la configuración final:

* `pop`: Población del 100 individuos
* `ARCHITECTURE`: Definida al principio del cuaderno ([8, 6, 4])
* `pmut`: 0.1
* `pcross`: 0.9
* `ngen`: 750
* `T`: Tamaño del torneo igual a 8

In [22]:
population_size = 100

pop = [MLP(ARCHITECTURE).to_chromosome() for _ in range(population_size)]

# evolution (pop, fit, pmut, pcross, ngen, T)
pop = evolution(pop, fitness, 0.1, pcross=0.9, ngen=750, T=8)

  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
  return 1.0 / (1.0 + np.exp(-neta))
Processing: 100%|██████████| 750/750 [14:29<00:00,  1.16s/it, best_fitness=320, current_best=301]


In [39]:
np.save("best_chromosome.npy", pop[0])

env = gym.make("LunarLander-v3", render_mode="rgb_array")
env = RecordVideo(env, video_folder="./video", episode_trigger=lambda x: True)

observation, _ = env.reset()
iters = 0
while True:
    model = MLP(ARCHITECTURE)
    model.from_chromosome(pop[0])

    action = policy(model, observation)
    observation, _, terminated, truncated, _ = env.step(action)

    env.render()

    if any([truncated, terminated]):
        observation, _ = env.reset()
        iters += 1

    if iters == 10:
        break

env.close()

  logger.warn(


In [40]:
video_folder = "./video"
output_video = "./video/output_video.mp4"
file_list_path = os.path.join(video_folder, "file_list.txt")

video_files = sorted([f for f in os.listdir(video_folder) if f.endswith(".mp4")])

with open(file_list_path, "w") as file:
    for video in video_files:
        file.write(f"file '{video}'\n")

# ffmpeg para unir los videos (hay que descargarlo en local)
ffmpeg_command = [
    "ffmpeg",
    "-f", "concat",
    "-safe", "0",
    "-i", file_list_path,
    "-c", "copy",
    output_video
]

os.remove(file_list_path)

<video controls>
  <source src="./video/output_video.mp4" type="video/mp4">
  Tu navegador no soporta el elemento de video.
</video>