In [1]:
# prueba lunar lander por humano

import gymnasium as gym

env = gym.make("LunarLander-v3", render_mode="rgb_array")

import numpy as np
import pygame
import gymnasium.utils.play

lunar_lander_keys = {
    (pygame.K_UP,): 2,
    (pygame.K_LEFT,): 1,
    (pygame.K_RIGHT,): 3,
}
# gymnasium.utils.play.play(env, zoom=3, keys_to_action=lunar_lander_keys, noop=0)

## Prueba 1. AG - Evolución generacional

### Tareas
1. Probar nuevas politicas
2. Nuevos operadores: crossover, mutacion
3. Cambio de arquitectura [8, X, 4]. 2,6,20
4. Graficas: 
    - mejor fitness en funcion de generaciones
    - fitness medio (de poblacion) en funcion de generaciones
    - Precision (numero de aterrizajes correctos en cada iteracion) // hay que obtener "algun indicativo de que aterrice bien" 

In [2]:
from MLP import MLP
import random

def policy_base (observation, model):
    s = model.forward(observation)
    action = np.argmax(s)
    return action

def policy_epsGreedy(observation, model):
    epsilon = 0.10
    s = model.forward(observation)
    if np.random.rand() < epsilon:
        action = np.random.randint(len(s))
    else:
        action = np.argmax(s)
    return action

def run (model):
    #observation, info = env.reset(seed=42)
    observation, info = env.reset()
    ite = 0
    racum = 0
    while True:
        action = policy_epsGreedy(observation, model)
        observation, reward, terminated, truncated, info = env.step(action)
        
        racum += reward

        if terminated or truncated:
            #r = (racum+200) / 500
            #print(racum, r)
            return racum


def run_multiple_games(ch, arquitecture, N_games):

    model = MLP(arquitecture)
    model.from_chromosome(ch)

    r = 0

    for _ in range(N_games):
        r += run(model)
    
    return r/N_games # devuelve el refuerzo medio
    

In [None]:
# Define operadores de números reales
import loky
from loky import get_reusable_executor
import itertools

rang = (-1, 1) # al no hacerlo con clases, debemos definir el rango como variable global


def select(pop, T, fitness_array): 
    tournament = random.sample(range(len(pop)), T)  
    # Busca el índice del mejor individuo (mayor fitness porque es acumulativo en LunarLander)
    best_index = max(tournament, key=lambda i: fitness_array[i])  
    return pop[best_index].copy()  


def create(arquitecture, N=100): 
    pop = []

    cromosoma_length = (arquitecture[0] * arquitecture[1]) + arquitecture[1] + (arquitecture[1] * arquitecture[2]) + arquitecture[2]

    for _ in range(N):
        values = [random.uniform(-5, 5) for _ in range(cromosoma_length)]  
        pop.append(values)

    return pop


def sort_pop (pop, fitness): 
    pop_with_fitness = [(indiv, fit) for indiv, fit in zip(pop, fitness)]
    sorted_pop = sorted(pop_with_fitness, key=lambda x: x[1], reverse=True)  # Mayor fitness primero
    return [indiv for indiv, _ in sorted_pop], [fit for _, fit in sorted_pop]


def crossover (ind1, ind2, pcross, arquitecture): # devuelve el cruce (emparejamiento) de dos individuos, considerando todos los genes
    if (random.random() > pcross):
        return ind1.copy(), ind2.copy()
    child1 = []
    child2 = []

    for gene1, gene2 in zip(ind1, ind2):
        beta = random.uniform(0, 1)
        c1 = beta * gene1 + (1 - beta) * gene2
        c2 = (1 - beta) * gene1 + beta * gene2
        
        child1.append(c1)
        child2.append(c2)
    
    return child1, child2
    

def mutate(ind, pmut):
    if random.random() < pmut:
        idx = random.randint(0, len(ind) - 1)
        ind[idx] = random.uniform(rang[0], rang[1])
    return ind.copy()



def evolve_gen(pop, pmut, arquitecture = [8,6,4], generations = 6000, T=2, trace=100, pcross=0.7, elitism=False, N_games=2):
    
    generation = 0
    executor = get_reusable_executor(max_workers=8, timeout=2)

    
    while generation < generations:
        new_poblacion = []

        fitness_array = list(executor.map(run_multiple_games, pop, itertools.repeat(arquitecture), itertools.repeat(N_games)))
        generation += 1
        
        pop, fitness = sort_pop(pop, fitness_array)

        if trace > 0 and generation % trace == 0:
            print(f"Generacion: {generation}, Mejor fitness: {fitness[0]}")

        if elitism:
            new_poblacion.append(pop[0].copy())

        if generation >= generations:
            break


        while len(new_poblacion) < len(pop):
            parent_1 = select(pop, T, fitness)
            parent_2 = select(pop, T, fitness)

            child_1, child_2 = crossover(parent_1, parent_2, pcross, arquitecture)

            child_1 = mutate(child_1, pmut)
            child_2 = mutate(child_2, pmut)

            new_poblacion.extend([child_1, child_2])

        pop = new_poblacion[:len(pop)].copy()

    print(f"Generacion: {generation}, Mejor fitness: {fitness[0]}")
    return pop, fitness


def evolve_evals(pop, pmut, arquitecture = [8,6,4], neval = 3500, T=2, trace=100, pcross=0.7, elitism=False, N_games=2):
    """
    Algoritmo evolutivo con traza basada en el número de evaluaciones.
    """
    evaluaciones = 0
    executor = get_reusable_executor(max_workers=1, timeout=2)
    
    
    while evaluaciones < neval:

        new_poblacion = []

        fitness_array = list(executor.map(run_multiple_games, pop, itertools.repeat(arquitecture), itertools.repeat(N_games)))
        evaluaciones += N_games*len(pop)

        pop, fitness = sort_pop(pop, fitness_array)

        if trace > 0 and evaluaciones % trace == 0:
            print(f"Evaluaciones: {evaluaciones}, Mejor fitness: {fitness[0]}")

        if elitism:
            new_poblacion.append(pop[0].copy())

        if evaluaciones >= neval:
            break

        while len(new_poblacion) < len(pop):
            parent_1 = select(pop, T, fitness)
            parent_2 = select(pop, T, fitness)

            child_1, child_2 = crossover(parent_1, parent_2, pcross, arquitecture)

            child_1 = mutate(child_1, pmut)
            child_2 = mutate(child_2, pmut)

            new_poblacion.extend([child_1, child_2])

        # Actualiza nueva poblacion
        pop = new_poblacion[:len(pop)].copy()

    print(f"Evaluaciones: {evaluaciones}, Mejor fitness: {fitness[0]}")
    return pop, fitness


In [5]:
import multiprocessing
print(multiprocessing.cpu_count())

8


In [None]:
# crea y evoluiona
best_individuals = []
himmelblau_values = []
fitness_values = []

for i in range(1,2):
    print(f"Ejecución {i}")
    pop = create(arquitecture=[8,6,4]) 
    # CAMBIO DE T a 4 y n_games a 1
    pop, fitness = evolve_gen(pop, pmut=0.1, arquitecture=[8,6,4], generations=500, T=4, trace=50, pcross=0.7, elitism=False, N_games=1)
    # 1000 gen = 1000 * 100 * 2 = 
    #pop, fitness = evolve_evals(pop, pmut=0.1, arquitecture=[8,6,4], neval=2000, T=4, trace=1, pcross=0.7, elitism=True, N_games=2)
    best_individual = pop[0]  
    fitness_value = fitness[0]

    # Almacenar resultados
    best_individuals.append(best_individual)
    fitness_values.append(fitness_value)

Ejecución 1
Generacion: 50, Mejor fitness: -256.04323938154914
Generacion: 100, Mejor fitness: -227.83518193034425
Generacion: 150, Mejor fitness: -298.1898802799158


KeyboardInterrupt: 

> Plot

---

In [47]:
import time

def run_lunar_lander(model, chromosome):

    env = gym.make("LunarLander-v3", render_mode="human")  
    observation, _ = env.reset() 
    model.from_chromosome(chromosome)  
    
    total_reward = 0
    done = False

    while not done:
        env.render()  
        
        action_values = model.forward(observation)  
        action = np.argmax(action_values)  
        
        observation, reward, done, _, _ = env.step(action)  
        total_reward += reward
        
        time.sleep(0.05)  
    
    env.close()
    print(f"Total reward: {total_reward}")

arquitecture = [8, 6, 4] 
model = MLP(arquitecture)


# 🔹 Ejecutar el modelo en el entorno
run_lunar_lander(model, best_individuals[0])


Total reward: -33.465313386058796


---

In [None]:
import numpy as np

fitness_mean = np.mean(fitness_values)
fitness_std = np.std(fitness_values)


best_ind_index = np.argmax(fitness_values)
best_ind = best_individuals[best_ind_index]
best_fitness = fitness_values[best_ind_index]

In [None]:
print(f"Media de fitness: {fitness_mean}")
print(f"Desviación típica de fitness: {fitness_std}")

print ("-----")

print(f"El mejor individuo es {best_ind}")
print(f"Fitness en ese individuo: {best_fitness:.10f}")

Media de fitness: 0.9425973798932168
Desviación típica de fitness: 0.0832002599174057
-----
Media de himmelblau: 0.07014512890659286
Desviación típica de himmelblau: 0.10485796294344779
-----
El mejor individuo es [3.000423765860462, 2.0033795362969467]
Fitness en ese individuo: 0.9997702835
Himmelblau de la mejor solución: 0.0002297693


In [None]:
# crea y evoluiona
pop = create()
pop, fitness = evolve(pop, arquitecture = [8,6,4], pmut=10/100, ngen=100, T=4, trace=25, pcross=0.7, elitism=False)

Generacion 0: mejor fitness [0.2952568997016495]
Generacion 25: mejor fitness [0.838982892859534]
Generacion 50: mejor fitness [0.838982892859524]
Generacion 75: mejor fitness [0.8389828928595396]
Generacion 100: mejor fitness [0.8389828928595421]


In [None]:
# Mejor individuo, valor en la función y su fitness
best_individual = pop[0]
fitness_best = fitness[0]
  

print(f"El mejor individuo es {best_individual}")
print(f"Fitness de la mejor solución: {fitness_best}")


El mejor individuo es [3.531729179447823, -1.77507053290786]
Valor de la función de Himmelblau en ese individuo: 0.1919194164
Fitness de la mejor solución: 0.8389828928595421
