# Neuroevolution on EvoGym

## Getting started

Check that the following line can run. If so, you have evogym installed! Otherwise, you need to install it.

In [2]:
from evogym import sample_robot
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import copy
import matplotlib.pyplot as plt
from matplotlib import animation
import gymnasium as gym
import evogym.envs
from evogym import sample_robot
from evogym.utils import get_full_connectivity
from tqdm import tqdm

## Agent

In [3]:
class Network(nn.Module):
    def __init__(self, n_in, h_size, n_out):
        super().__init__()
        self.fc1 = nn.Linear(n_in, h_size)
        self.fc2 = nn.Linear(h_size, h_size)
        self.fc3 = nn.Linear(h_size, n_out)
 
        self.n_out = n_out

    def reset(self):
        pass
    
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)

        x = self.fc2(x)
        x = F.relu(x)

        x = self.fc3(x)
        return x

In [4]:
class Agent:
    def __init__(self, Net, config, genes = None):
        self.config = config
        self.Net = Net
        self.model = None
        self.fitness = None

        self.device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu")

        self.make_network()
        if genes is not None:
            self.genes = genes

    def __repr__(self):  # pragma: no cover
        return f"Agent {self.model} > fitness={self.fitness}"

    def __str__(self):  # pragma: no cover
        return self.__repr__()

    def make_network(self):
        n_in = self.config["n_in"]
        h_size = self.config["h_size"]
        n_out = self.config["n_out"]
        self.model = self.Net(n_in, h_size, n_out).to(self.device).double()
        return self

    @property
    def genes(self):
        if self.model is None:
            return None
        with torch.no_grad():
            params = self.model.parameters()
            vec = torch.nn.utils.parameters_to_vector(params)
        return vec.cpu().double().numpy()

    @genes.setter
    def genes(self, params):
        if self.model is None:
            self.make_network()
        assert len(params) == len(
            self.genes), "Genome size does not fit the network size"
        if np.isnan(params).any():
            raise
        a = torch.tensor(params, device=self.device)
        torch.nn.utils.vector_to_parameters(a, self.model.parameters())
        self.model = self.model.to(self.device).double()
        self.fitness = None
        return self

    def mutate_ga(self):
        genes = self.genes
        n = len(genes)
        f = np.random.choice([False, True], size=n, p=[1/n, 1-1/n])
        
        new_genes = np.empty(n)
        new_genes[f] = genes[f]
        noise = np.random.randn(n-sum(f))
        new_genes[~f] = noise
        return new_genes

    def act(self, obs):
        # continuous actions
        with torch.no_grad():
            x = torch.tensor(obs).double().unsqueeze(0).to(self.device)
            actions = self.model(x).cpu().detach().numpy()
        return actions


## Environment

In [5]:
def make_env(env_name, seed=None, robot=None, **kwargs):
    if robot is None: 
        env = gym.make(env_name)
    else:
        connections = get_full_connectivity(robot)
        env = gym.make(env_name, body=robot)
    env.robot = robot
    if seed is not None:
        env.seed(seed)
        
    return env

In [6]:
def evaluate(agent, env, max_steps=500, render=False):
    obs, i = env.reset()
    agent.model.reset()
    reward = 0
    steps = 0
    done = False
    while not done and steps < max_steps:
        if render:
            env.render()
        action = agent.act(obs)
        obs, r, done, trunc, _ = env.step(action)
        reward += r
        steps += 1
    return reward

In [7]:
def get_cfg(env_name, robot=None):
    env = make_env(env_name, robot=thrower)
    cfg = {
        "n_in": env.observation_space.shape[0],
        "h_size": 32,
        "n_out": env.action_space.shape[0],
    }
    env.close()
    return cfg

In [8]:
def mp_eval(a, cfg):
    env = make_env(cfg["env_name"], robot=cfg["robot"])
    fit = evaluate(a, env, max_steps=cfg["max_steps"])
    env.close()
    return fit

In [9]:
def save_solution(a, cfg, name="solution.json", algo_name=None):
    save_cfg = {}
    for i in ["env_name", "robot", "n_in", "h_size", "n_out"]:
        assert i in cfg, f"{i} not in config"
        save_cfg[i] = cfg[i]
    save_cfg["robot"] = cfg["robot"].tolist()
    save_cfg["genes"] = a.genes.tolist()
    save_cfg["fitness"] = float(a.fitness)

    # Parametri extra
    save_cfg["generations"] = cfg.get("generations")
    save_cfg["max_steps"] = cfg.get("max_steps")
    save_cfg["lambda"] = cfg.get("lambda")
    save_cfg["algorithm"] = cfg.get("algorithm")
    
    
    with open(name, "w") as f:
        json.dump(save_cfg, f)
    return save_cfg

In [10]:
def load_solution(name="solution.json"):
    with open(name, "r") as f:
        cfg = json.load(f)
    cfg["robot"] = np.array(cfg["robot"])
    cfg["genes"] = np.array(cfg["genes"])
    a = Agent(Network, cfg, genes=cfg["genes"])
    a.fitness = cfg["fitness"]
    return a

## Algorithms

### Evolution Strategy

In [11]:
def ES(config):
    cfg = get_cfg(config["env_name"], robot=config["robot"]) # Get network dims
    cfg = {**config, **cfg} # Merge configs
    
    # Update weights
    mu = cfg["mu"]
    w = np.array([np.log(mu + 0.5) - np.log(i)
                          for i in range(1, mu + 1)])
    w /= np.sum(w)
    
    env = make_env(cfg["env_name"], robot=cfg["robot"])

    # Center of the distribution
    elite = Agent(Network, cfg)
    elite.fitness = -np.inf
    theta = elite.genes
    d = len(theta)

    fits = []
    total_evals = []

    bar = tqdm(range(cfg["generations"]))
    for gen in bar:
        population = []
        for i in range(cfg["lambda"]):
            genes = theta + np.random.randn(len(theta)) * cfg["sigma"]
            ind = Agent(Network, cfg, genes=genes)
            # ind.fitness = evaluate(ind, env, max_steps=cfg["max_steps"])
            population.append(ind)

        # with Pool(processes=len(population)) as pool:
        #     pop_fitness = pool.starmap(mp_eval, [(a, cfg) for a in population])
        
        pop_fitness = [evaluate(a, env, max_steps=cfg["max_steps"]) for a in population]
        
        for i in range(len(population)):
            population[i].fitness = pop_fitness[i]

        # sort by fitness
        inv_fitnesses = [- f for f in pop_fitness]
        # indices from highest fitness to lowest
        idx = np.argsort(inv_fitnesses)
        
        step = np.zeros(d)
        for i in range(mu):
            # update step
            step = step + w[i] * (population[idx[i]].genes - theta)
        # update theta
        theta = theta + step * cfg["lr"]

        if pop_fitness[idx[0]] > elite.fitness:
            elite.genes = population[idx[0]].genes
            elite.fitness = pop_fitness[idx[0]]

        fits.append(elite.fitness)
        total_evals.append(len(population) * (gen+1))

        bar.set_description(f"Best: {elite.fitness}")
        
    env.close()
    plt.plot(total_evals, fits)
    plt.xlabel("Evaluations")
    plt.ylabel("Fitness")
    plt.show()
    
    cfg["algorithm"] = "ES"
    save_solution(elite, cfg, name="solution.json")
    
    return elite

In [12]:
from cmaes import CMA

def CMAES(config):
    cfg = get_cfg(config["env_name"], robot=config["robot"])
    cfg.update(config)
    
    agent = Agent(Network, cfg)
    
    optimizer = CMA(mean=agent.genes, sigma=0.5)
    
    best_fitness = -np.inf
    best_agent = None
    
    for generation in range(config["generations"]):
        solutions = []
        fitnesses = []
        
        for _ in range(optimizer.population_size):
            genes = optimizer.ask()
            candidate = Agent(Network, cfg, genes=genes)
            fitness = mp_eval(candidate, cfg)
            
            solutions.append((genes, -fitness))  # <-- passa negativo a CMA-ES
            fitnesses.append(fitness)            # <-- tieni positivo per te
        
        optimizer.tell(solutions)
        print(f"Generation {generation}: Best fitness in generation = {max(fitnesses)}")
        
        max_fit = max(fitnesses)
        if max_fit > best_fitness:
            best_fitness = max_fit
            best_idx = np.argmax(fitnesses)
            best_agent = Agent(Network, cfg, genes=solutions[best_idx][0])
            best_agent.fitness = best_fitness
        
    cfg["algorithm"] = "CMAES"
    save_solution(best_agent, cfg, name="solution.json")
    
    return best_agent

In [13]:
from ribs.archives import GridArchive
from ribs.emitters import EvolutionStrategyEmitter
from ribs.schedulers import Scheduler
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

def CMA_ME(config):
    cfg = get_cfg(config["env_name"], robot=config["robot"])
    cfg = {**config, **cfg}
    dim = len(Agent(Network, cfg).genes)

    # Archive setup
    archive = GridArchive(
        solution_dim=dim,
        dims=[50, 50],  # 50x50 grid
        ranges=[(0, 10), (0, 2)],  # Example: distance, height
        qd_score_offset=-100
    )

    # Emitters
    emitters = [
        EvolutionStrategyEmitter(
            archive,
            x0=np.zeros(dim),
            sigma0=1.0,
            ranker="2imp",
            batch_size=cfg["lambda"]
        )
        for _ in range(5)  # Number of parallel emitters
    ]

    scheduler = Scheduler(archive, emitters)

    best_fitness = -np.inf
    best_solution = None

    fits = []
    total_evals = []

    bar = tqdm(range(cfg["generations"]))
    for gen in bar:
        solutions = scheduler.ask()
        objectives = []
        measures = []

        for sol in solutions:
            agent = Agent(Network, cfg, genes=sol)
            fitness = evaluate(agent, make_env(cfg["env_name"], robot=cfg["robot"]), max_steps=cfg["max_steps"])

            # Example measures: average gene values (replace with meaningful measures!)
            behavior_x = np.mean(sol[:dim // 2])
            behavior_y = np.mean(sol[dim // 2:])
            objectives.append(fitness)
            measures.append([behavior_x, behavior_y])

            if fitness > best_fitness:
                best_fitness = fitness
                best_solution = sol

        scheduler.tell(objectives, measures)

        fits.append(best_fitness)
        total_evals.append(len(solutions) * (gen + 1))
        bar.set_description(f"Best: {best_fitness}")

    plt.plot(total_evals, fits)
    plt.xlabel("Evaluations")
    plt.ylabel("Fitness")
    plt.show()

    best_agent = Agent(Network, cfg, genes=best_solution)
    best_agent.fitness = best_fitness
    
    cfg["algorithm"] = "CMA_ME"
    save_solution(best_agent, cfg, name="solution.json")
     
    return best_agent

In [43]:
import json
thrower = np.array([
    [0, 0, 1, 0, 0 ],
    [0, 0, 1, 0, 0],
    [0, 4, 3, 4, 2],
    [4, 3, 0, 4, 2],
    [5, 5, 5, 5, 0]
    ])

np.save("thrower_matrix.npy", thrower)


config = {
    "env_name": "Thrower-v0",
    "robot": thrower,
    "generations": 100, # to change: increase!
    "lambda":100, # Population size
    "mu": 50, # Parents pop size *ES*
    "sigma": 0.1, # mutation std
    "lr": 1, # Learning rate
    "max_steps": 500, # to change to 500
}

a = CMA_ME(config)
a.fitness

cfg = get_cfg(config["env_name"], robot=config["robot"]) # Get network dims
cfg = {**config, **cfg} # Merge configs

env = make_env(config["env_name"], robot=config["robot"])
evaluate(a, env, render=False)
env.close()

np.save("thrower.npy", a.genes)

save_solution(a, cfg, algo_name=cfg.get("algorithm"))

Best: 0.8575230706097303:   3%|▎         | 3/100 [20:19<10:57:11, 406.51s/it]


KeyboardInterrupt: 