# LunarLander-v3 feat. Reinforcemente Learning 🚀

By
- Miguel González
- Javier Quesada

## Info

### Documentación

Problemas interesantes para Aprendizaje por refuerzo
 * Gymnasium: https://gymnasium.farama.org/environments/box2d/

### Instalación

%pip install gymnasium  
%pip install gymnasium[box2d] 

### Acciones adicionales

Pueden ser necesarias *antes* de instalar gymnasium[box2d].

#### En macos

pip uninstall swig  
xcode-select -—install (instala las herramientas de desarrollador si no se tienen ya)  
pip install swig  / sudo port install swig-python
pip install 'gymnasium[box2d]' # en zsh hay que poner las comillas  

#### en Windows

Si da error, se debe a la falta de la versión correcta de Microsoft C++ Build Tools, que es una dependencia de Box2D. Para solucionar este problema, puede seguir los siguientes pasos:
 * Descargar Microsoft C++ Build Tools desde https://visualstudio.microsoft.com/visual-cpp-build-tools/.
 * Dentro del instalador, seleccione la opción "Desarrollo para el escritorio con C++"
 * Reinicie su sesión en Jupyter Notebook o en Visual Studio.
 * Ejecute nuevamente el comando !pip install gymnasium[box2d] en la línea de comandos de su notebook.

#### En linux (colab)
  * pip install swig

## Dependencias 📦

In [1]:
%pip install swig -q
%pip install gymnasium[box2d] -q
%pip install loky -q

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import gymnasium as gym
import numpy as np
import pygame
import gymnasium.utils.play
from MLP import MLP
import copy
from loky import get_reusable_executor
import random
import matplotlib.pyplot as plt

## AutoPlay - humano 🕹️

In [None]:
# prueba lunar lander por humano
env_human = gym.make("LunarLander-v3", render_mode="rgb_array")

lunar_lander_keys = {
    (pygame.K_UP,): 2,
    (pygame.K_LEFT,): 1,
    (pygame.K_RIGHT,): 3,
}
gymnasium.utils.play.play(env_human, zoom=1, keys_to_action=lunar_lander_keys, noop=0)

## Desarrollo de funciones 🧩

### Modelo 🧠

In [3]:
# construir modelo
LAYERS = [8, 6, 4]
model =  MLP(LAYERS)

### Política 🎯

In [4]:
def policy(observation, model, epsilon=0.1):
    """
    Toma una observación del entorno y devuelve una acción basada en el modelo MLP.
    
    Parámetros:
    - observation (array): Estado actual del entorno.
    - model (MLP): Red neuronal que estima los valores de las acciones.
    - epsilon (float): Probabilidad de exploración (acción aleatoria).

    Retorna:
    - int: Acción elegida por el modelo.
    """
    num_actions = model.layers[-1]  # Número de acciones basado en la última capa de MLP
    
    if np.random.rand() < epsilon:  # Exploración (acción aleatoria con probabilidad epsilon)
        return np.random.randint(num_actions)
    
    action_values = model.forward(observation)  # Predicción de la red neuronal
    return np.argmax(action_values)  # Selecciona la acción con el mayor valor

### Entorno de exploración y función de ejecución 🌍

In [5]:
env = gym.make("LunarLander-v3")

def run(model):
    """
    Genera un escenario de juego y ejecuta la política definida en la función policy.
    Al acabar el juego (cuando se estrella o aterriza), devuelve la recompensa acumulada.
    
    Parámetros:
    - model (MLP): La red neuronal que se usará para decidir las acciones.
    
    Retorna:
    - float: La recompensa total obtenida en el episodio.
    """
    observation, info = env.reset()  # Se abre un escenario de juego
    racum = 0  # Recompensa acumulada

    while True:
        action = policy(observation, model)  # Se pasa el modelo a policy()
        observation, reward, terminated, truncated, info = env.step(action)
        racum += reward  # Acumula la recompensa obtenida

        if terminated or truncated:
            return racum  # Devuelve la recompensa final


### Funciones bioinspiradas 🧬

In [6]:
def sbx_crossover(parent1, parent2, eta_c=15):
    """
    Simulated Binary Crossover (SBX) para variables continuas en [0,1].   
    eta_c: distribution, controla la 'intensidad' del cruce.
    """
    
    # if np.random.rand() >= pc:
    #     return np.copy(parent1), np.copy(parent2)

    n_var = len(parent1)
    child1 = np.zeros(n_var)
    child2 = np.zeros(n_var)

    for i in range(n_var):
        x1 = parent1[i]
        x2 = parent2[i]

        if x1 > x2:
            x1, x2 = x2, x1

        if abs(x1 - x2) < 1e-14:
            # Si son prácticamente iguales, se copia
            child1[i] = x1
            child2[i] = x2
        else:
            # Beta determina el grado de cruce
            rand = np.random.rand()
            beta = 1.0 + (2.0 * (x1 - 0.0) / (x2 - x1))
            alpha = 2.0 - beta ** (-(eta_c + 1.0))
            if rand <= 1.0 / alpha:
                betaq = (rand * alpha) ** (1.0 / (eta_c + 1.0))
            else:
                betaq = (1.0 / (2.0 - rand * alpha)) ** (1.0 / (eta_c + 1.0))

            c1 = 0.5 * ((x1 + x2) - betaq * (x2 - x1))

            beta = 1.0 + (2.0 * (1.0 - x2) / (x2 - x1))
            alpha = 2.0 - beta ** (-(eta_c + 1.0))
            if rand <= 1.0 / alpha:
                betaq = (rand * alpha) ** (1.0 / (eta_c + 1.0))
            else:
                betaq = (1.0 / (2.0 - rand * alpha)) ** (1.0 / (eta_c + 1.0))

            c2 = 0.5 * ((x1 + x2) + betaq * (x2 - x1))

            c1 = min(max(c1, 0.0), 1.0)
            c2 = min(max(c2, 0.0), 1.0)

            # Aleatoriamente asignamos quién obtiene c1 y c2
            if np.random.rand() < 0.5:
                child1[i] = c2
                child2[i] = c1
            else:
                child1[i] = c1
                child2[i] = c2

    return child1, child2

In [7]:
import random

def blx_crossover(parent1: list, parent2: list, pcross: float, alpha: float = 0.7) -> tuple[list, list]:
    """
    Realiza crossover BLX-alpha entre dos padres.
    
    Parámetros:
    - parent1 (list): Primer padre (lista de genes).
    - parent2 (list): Segundo padre (lista de genes).
    - pcross (float): Probabilidad de cruce (0-1).
    - alpha (float, opcional): Factor de exploración. Por defecto 0.5.
    
    Retorna:
    - tuple[list, list]: Dos listas representando los hijos generados.
    """
    if random.random() < pcross:
        child1 = []
        child2 = []
        for gene1, gene2 in zip(parent1, parent2):
            x_min = min(gene1, gene2)
            x_max = max(gene1, gene2)
            I = x_max - x_min  # Intervalo
            lower_bound = x_min - alpha * I  # Expansión inferior
            upper_bound = x_max + alpha * I  # Expansión superior
            child1.append(random.uniform(lower_bound, upper_bound))
            child2.append(random.uniform(lower_bound, upper_bound))
        return child1, child2
    else:
        return parent1.copy(), parent2.copy()  # Evita modificar los padres originales


In [8]:
def evaluate_fitness(model, ind, num_eval_games):
    """
    Evalúa el fitness de un individuo en múltiples partidas.

    Parámetros:
    - model (MLP): Instancia de la red neuronal que se va a evaluar.
    - ind (list): Cromosoma que representa los pesos del modelo.
    - num_eval_games (int): Número de partidas a jugar para calcular el fitness.

    Retorna:
    - float: Fitness promedio del individuo (recompensa media en las partidas).
    """
    total_reward = 0
    model.from_chromosome(ind)  # Carga los pesos del individuo en el modelo
    
    for _ in range(num_eval_games):
        total_reward += run(model)  # Se pasa el modelo a `run()`

    return total_reward / num_eval_games  # Retorna el fitness promedio


In [9]:
def parallel_evaluation(population, model, num_eval_games, fitness_func):
    """
    Evalúa en paralelo el fitness de la población usando loky.

    Parámetros:
    - population (list): Lista de individuos (listas de pesos para el MLP).
    - model_template (MLP): Modelo base para crear copias y evaluar individuos.
    - num_eval_games (int): Número de partidas para evaluar el fitness.
    - fitness_func (func): Función de evaluación del fitness.

    Retorna:
    - list: Lista de valores de fitness para cada individuo en la población.
    """
    with get_reusable_executor() as executor:
        # Creamos un modelo independiente para cada individuo
        models = [copy.deepcopy(model) for _ in range(len(population))]
        
        # Ejecutamos fitness_func en paralelo, pasando el modelo, el individuo y num_eval_games
        fitness_scores = list(executor.map(fitness_func, models, population, [num_eval_games] * len(population)))
    
    return fitness_scores

In [10]:
def sort_pop(population, fitness_scores):
    """
    Ordena la población según el fitness obtenido (de mayor a menor).

    Parámetros:
    - population (list): Lista de individuos.
    - fitness_scores (list): Lista de valores de fitness correspondientes.

    Retorna:
    - sorted_population (list): Población ordenada de mayor a menor fitness.
    - sorted_scores (list): Valores de fitness ordenados de mayor a menor.
    """
    sorted_indices = np.argsort(fitness_scores)[::-1]  # Índices ordenados de mayor a menor fitness
    sorted_population = [population[i] for i in sorted_indices]
    sorted_scores = [fitness_scores[i] for i in sorted_indices]

    return sorted_population, sorted_scores

In [11]:
import random

def select(population_with_fitness, T: int):
    """
    Selección por torneo con población ordenada de mayor a menor fitness.

    Parámetros:
    - population_with_fitness (list): Lista de tuplas (individuo, fitness), ORDENADA de mayor a menor.
    - T (int): Tamaño del torneo.

    Retorna:
    - Individuo seleccionado (lista de pesos).
    """
    tournament = random.sample(population_with_fitness[:T], k=T)  # Selecciona dentro de los mejores T
    return tournament[0][0]  # Retorna solo el individuo

In [None]:
def mutate(individual, pmut, sigma=0.05):
    """
    Aplica mutación gaussiana normal a un individuo.

    Parámetros:
    - individual (list): Cromosoma del individuo (pesos de la red).
    - pmut (float): Probabilidad de mutar cada gen.
    - sigma (float): Desviación estándar de la mutación gaussiana.

    Retorna:
    - mutated_ind (list): Nuevo cromosoma mutado.
    """
    mutated_ind = []

    for gene in individual:
        if random.random() < pmut:  # Aplicamos mutación con probabilidad pmut
            mutated_gene = gene + np.random.normal(0, sigma)  # Mutación gaussiana
        else:
            mutated_gene = gene  # No hay mutación, el gen permanece igual
        
        mutated_ind.append(mutated_gene)

    return mutated_ind


In [22]:
import numpy as np
import random

def evolve(population, model, num_eval_games=3, pmut=0.2, pcross=0.8, ngen=1000, T=8, trace=100):
    """
    Ejecuta un Algoritmo Genético para optimizar redes MLP.

    Parámetros:
    - population (list): Lista de individuos (listas de pesos para el MLP).
    - model (MLP): Modelo base para evaluar individuos.
    - num_eval_games (int): Número de partidas para evaluar fitness.
    - pmut (float): Probabilidad de mutación.
    - pcross (float): Probabilidad de crossover.
    - ngen (int): Número de generaciones.
    - T (int): Tamaño del torneo en la selección.
    - trace (int): Frecuencia de impresión de estadísticas.
    - elitism (bool): Si True, el mejor individuo se mantiene en la siguiente generación.

    Retorna:
    - final_population (list): Población evolucionada.
    - best_fitness (list): Historial del mejor fitness en cada generación.
    - mean_fitness (list): Historial del fitness promedio en cada generación.
    """

    best_fitness = []
    mean_fitness = []

    for gen in range(ngen):
        # Evaluar fitness en paralelo
        fitness_scores = parallel_evaluation(population, model, num_eval_games, evaluate_fitness)

        # Ordenar la población por fitness
        sorted_population, sorted_scores = sort_pop(population, fitness_scores)

        # Guardar estadísticas
        best_fitness.append(sorted_scores[0])  # Mejor fitness de la generación
        mean_fitness.append(np.mean(sorted_scores))  # Fitness promedio

        # Convertimos la población en una lista de tuplas (individuo, fitness)
        population_with_fitness = list(zip(sorted_population, sorted_scores))

        # Generar nueva población
        new_population = []

        while len(new_population) < len(population):
            # Seleccionar padres con torneo
            parent1 = select(population_with_fitness, T)
            parent2 = select(population_with_fitness, T)

            # Aplicar crossover BLX-α
            child1, child2 = blx_crossover(parent1, parent2, pcross)

            # Aplicar mutación gaussiana adaptativa
            mutated1 = mutate(child1, pmut)
            mutated2 = mutate(child2, pmut)

            # Agregar hijos a la nueva población
            new_population.append(mutated1)
            if len(new_population) < len(population):  # Evitar exceder el tamaño
                new_population.append(mutated2)

        # Reemplazar la población con la nueva
        population = new_population

        # Imprimir progreso cada "trace" generaciones
        if gen % trace == 0:
            print(f"Generation {gen}: Best Fitness = {sorted_scores[0]:.2f}, Mean Fitness = {np.mean(sorted_scores):.2f}")

    print(f"Final Best Fitness: {sorted_scores[0]:.2f}")

    return population, best_fitness, mean_fitness

## Neuroevolución 🔬🦾

In [None]:
POP_SIZE = 100
GENS = 1000
EVAL_GAMES = 3  # Número de partidas por individuo para calcular fitness
# Crear población inicial con equilibrio entre exploración y estabilidad
population = [
    np.array(model.to_chromosome()) + np.random.uniform(-0.5, 0.5, len(model.to_chromosome()))
    for _ in range(POP_SIZE)
]

# Convertimos la población a listas (ya que usamos NumPy antes)
population = [ind.tolist() for ind in population]


In [None]:
pmut = 1.0 / len(population[0])
population, best_fitness, worst_fitness, mean_fitness = evolve(
    population, 
    model,  # Ahora pasamos el modelo base correctamente
    num_eval_games=EVAL_GAMES, 
    ngen=GENS, 
    T=8, 
    trace=50, 
    pmut=0.1, 
    pcross=0.7
)

Generation 0: Best Fitness = -62.62, Mean Fitness = -446.66
Generation 50: Best Fitness = -0.45, Mean Fitness = -147.08
Generation 100: Best Fitness = 228.49, Mean Fitness = -133.40
Generation 150: Best Fitness = 61.92, Mean Fitness = -139.94


KeyboardInterrupt: 

### Visualización de métricas de la evolución 📊📈

In [None]:
plt.plot(best_fitness)
plt.plot(worst_fitness)
plt.plot(mean_fitness)
plt.xlabel('Generations')
plt.ylabel('Fitness')
plt.legend(['Best', 'Worst', 'Mean'])
plt.show()

## Prueba del mejor individuo 🏆🚀

In [None]:
env_test = gym.make("LunarLander-v3", render_mode="human")

model_test = MLP(LAYERS)
model_test.from_chromosome(population[0])
def run_test():
    """
    Esta función genera un escenario de juego y ejecuta la política definida en la función policy.
    Al acabar el juego (cuando se estrella o aterriza), devuelve la recompensa acumulada.
    """
    observation, info = env_test.reset() # se abre un escenario de juego
    racum = 0
    while True:
        action = policy(observation)
        observation, reward, terminated, truncated, info = env_test.step(action)
        racum += reward
        if terminated or truncated:
            return racum
for e in range(10):
    run_test()

#### ¿No has tenido bastante?

Prueba a controlar el flappy bird https://github.com/markub3327/flappy-bird-gymnasium

pip install flappy-bird-gymnasium

import flappy_bird_gymnasium  
env = gym.make("FlappyBird-v0")

Estado (12 variables):
  * the last pipe's horizontal position
  * the last top pipe's vertical position
  * the last bottom pipe's vertical position
  * the next pipe's horizontal position
  * the next top pipe's vertical position
  * he next bottom pipe's vertical position
  * the next next pipe's horizontal position
  * the next next top pipe's vertical position
  * the next next bottom pipe's vertical position
  * player's vertical position
  * player's vertical velocity
  * player's rotation

  Acciones:
  * 0 -> no hacer nada
  * 1 -> volar