# Lab. 7: Bio-Inspired Optimization

## Introduction

#### <u>The goal of this lab is to study different Bio-Inspired Optimization techniques.</u>

**Disclamer**
- When studying the effect of the parameters is extremely important to vary just one parameter at a time. Therefore, you are suggested to study one parameter by fixing all the others, and then moving to the next.

- When comparing different algorithms, is very important to run each of them several times (e.g., 30) by using different initial random seeds.

In [None]:
import functools
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from random import Random
from typing import Any, Callable, Optional

import benchmark_functions as bf
import inspyred
import numpy as np
import pylab as pl
from inspyred.ec import (
    Bounder,
    EvolutionaryComputation,
    Individual,
    replacers,
    selectors,
    terminators,
    variators,
)
from inspyred.swarm import topologies
from matplotlib import pyplot as plt
from numpy.typing import NDArray

### Helper functions

In [None]:
GLOBAL = "Global"
INDIVIDUAL = "Individual"
CORRELATED = "Correlated"

STAR = "star"
RING = "ring"


class OptFun:
    def __init__(self, wf: bf.BenchmarkFunction):
        self.f = wf
        self.history: list[list[float]] = []
        self.__name__ = f"OptFun({wf.__class__})"

    def __call__(
        self, candidates: list[list[float]], *args: Any, **kwargs: Any
    ) -> list[float]:
        """
        Evaluate the objective function for a list of candidates.
        """
        y: list[float] = []
        for x0 in candidates:
            self.history.append(deepcopy(x0))
            y.append(self.f(x0))  # type: ignore
        return y

    def minima(self) -> list[bf.fil.Optimum]:
        return self.f.minima()

    def bounder(self) -> Callable[[list[float], tuple[float, ...]], list[float]]:
        def fcn(candidate: list[float], *args: tuple[float, ...]) -> list[float]:
            bounds: tuple[list[float], list[float]] = self.f.suggested_bounds()

            for i, (m, M) in enumerate(zip(*bounds)):
                if candidate[i] < m:
                    candidate[i] = m
                if candidate[i] > M:
                    candidate[i] = M
            return candidate

        return fcn

    def bounds(self) -> list[tuple[float, float]]:
        """
        Return the bounds of the objective function.
        """
        return self._convert_bounds(self.f.suggested_bounds())

    def plot(self, filename: Optional[str] = None):
        """
        Plot the history of the objective function values.
        """
        plt.clf()
        f, ax = plt.subplots(1, 2, figsize=(16, 6))
        f.suptitle(self.f.name())
        # heatmap
        resolution = 50
        bounds_lower, bounds_upper = self.f.suggested_bounds()
        x = np.linspace(bounds_lower[0], bounds_upper[0], resolution)
        if self.f.n_dimensions() > 1:
            y = np.linspace(bounds_lower[1], bounds_upper[1], resolution)
            X, Y = np.meshgrid(x, y)
            Z = np.asarray(
                [
                    [self.f((X[i][j], Y[i][j])) for j in range(len(X[i]))]
                    for i in range(len(X))
                ]
            )
        else:
            raise ValueError("Function has only one dimension.")

        ax[0].contour(x, y, Z, 15, linewidths=0.5, colors="k")  # hight lines
        contour = ax[0].contourf(
            x, y, Z, 15, cmap="viridis", vmin=Z.min(), vmax=Z.max()
        )
        ax[0].set_xlabel("x")
        ax[0].set_ylabel("y")
        cbar = plt.colorbar(contour, ax=ax[0])
        cbar.set_label("z")
        if len(self.history) > 0:  # plot points
            xdata = [x[0] for x in self.history]
            ydata = [x[1] for x in self.history]
            ax[0].plot(xdata, ydata, "or-", markersize=3, linewidth=1)

        # plot evaluations
        values = [self.f(v) for v in self.history]
        min_value: float = self.minima()[0].score  # type: ignore
        ax[1].plot(values)
        ax[1].axhline(min_value, color="r", label="optimum")
        ax[1].legend(loc="upper right")

        if filename is not None:
            plt.savefig(filename, dpi=400)

        plt.show()

    def current_calls(self) -> int:
        """
        Return the number of calls to the objective function.
        """
        return len(self.history)

    def clear(self):
        self.history = []

    def _convert_bounds(
        self, bounds: tuple[list[float], list[float]]
    ) -> list[tuple[float, float]]:
        """
        Convert bounds to a list of tuples.
        """
        return [(bounds[0][i], bounds[1][i]) for i in range(len(bounds[0]))]

In [None]:
class NumpyRandomWrapper(pl.RandomState):
    def __init__(self, seed: Optional[int] = None):
        super(NumpyRandomWrapper, self).__init__(seed)

    def sample(self, population: list[int] | int, k: int):
        if isinstance(population, int):
            population = list(range(population))

        return pl.asarray(
            [
                population[i]
                for i in self._choice_without_replacement(len(population), k)
            ]
        )

    def random(self) -> float:  # type: ignore
        return self.random_sample()

    def gauss(self, mu: float, sigma: float) -> float:
        return self.normal(mu, sigma)

    def _choice_without_replacement(self, n: int, size: int) -> set[int]:
        result = set()
        while len(result) < size:
            result.add(self.randint(0, n))
        return result


def initial_pop_observer(
    population: list[Individual],
    num_generations: int,
    num_evaluations: int,
    args: dict[str, Any],
) -> None:
    if num_generations == 0:
        args["initial_pop_storage"]["individuals"] = pl.asarray(
            [guy.candidate for guy in population]
        )
        args["initial_pop_storage"]["fitnesses"] = pl.asarray(
            [guy.fitness for guy in population]
        )


def generator(random: Random, args: dict[str, Any]) -> NDArray[pl.float64]:
    return pl.asarray(
        [
            random.uniform(args["pop_init_range"][0], args["pop_init_range"][1])
            for _ in range(args["num_vars"])
        ]
    )


def generator_wrapper(
    func: Callable[[Random, dict[str, Any]], NDArray[pl.float64]]
) -> Callable[[Random, dict[str, Any]], NDArray[pl.float64]]:
    @functools.wraps(func)
    def _generator(random: Random, args: dict[str, Any]):
        return pl.asarray(func(random, args))

    return _generator


# helper function used to store the various populations at each generation
def my_archiver(
    random: Random,
    population: list[Individual],
    archive: list[list[Individual]],
    args: dict[str, Any],
) -> list[list[Individual]]:
    archive.append(population)
    return archive


def plot_population_evolution(
    populations: list[list[NDArray[pl.float64]]],
    objective_fun: OptFun,
    generation_step: int = 1,
):
    plt.clf()
    resolution = 50
    fig, ax = plt.subplots(2, 3)
    ax = ax.flatten()
    fig.set_figwidth(10)
    fig.set_figheight(7)
    fig.suptitle(objective_fun.f.name())

    bounds_lower, bounds_upper = objective_fun.f.suggested_bounds()
    x = np.linspace(bounds_lower[0], bounds_upper[0], resolution)
    if objective_fun.f.n_dimensions() > 1:
        y = np.linspace(bounds_lower[1], bounds_upper[1], resolution)
        X, Y = np.meshgrid(x, y)
        Z = np.asarray(
            [
                [objective_fun.f((X[i][j], Y[i][j])) for j in range(len(X[i]))]
                for i in range(len(X))
            ]
        )
    else:
        raise ValueError("Function has only one dimension.")

    for i in range(min(len(populations), 6)):
        if i * generation_step < len(populations):
            ax[i].contour(x, y, Z, 15, linewidths=0.5, colors="k")
            ax[i].contourf(
                x, y, Z, 15, cmap="viridis", vmin=Z.min(), vmax=Z.max()
            )  # heat map
            ax[i].set_xlabel("x")
            ax[i].set_ylabel("y")
            ax[i].title.set_text("Generation " + str(i * generation_step))

            # scatter plot of the population
            current_pop = populations[i * generation_step]
            xdata = [x[0] for x in current_pop]
            ydata = [x[1] for x in current_pop]
            ax[i].scatter(xdata, ydata, color="r", zorder=2, label="population")

            # plot minimum
            min_value = objective_fun.minima()[0].position
            ax[i].scatter(
                min_value[0],
                min_value[1],
                color="g",
                marker="x",
                zorder=2,
                label="optimum",
            )

    handles, labels = ax[0].get_legend_handles_labels()
    fig.legend(handles, labels, loc="upper right")
    plt.tight_layout()
    plt.show()

In [None]:
def run_ga(
    random: Random | NumpyRandomWrapper,
    func: OptFun,
    num_vars: int = 0,
    maximize: bool = False,
    **kwargs: Any,
) -> tuple[
    NDArray[pl.float64], float, list[Individual], list[list[NDArray[pl.float64]]]
]:
    # create dictionaries to store data about initial population, and lines
    initial_pop_storage = {}

    algorithm = EvolutionaryComputation(random)
    algorithm.terminator = terminators.generation_termination
    algorithm.replacer = replacers.generational_replacement
    algorithm.variator = [  # type: ignore
        variators.uniform_crossover,
        variators.gaussian_mutation,
    ]
    algorithm.selector = selectors.tournament_selection
    algorithm.archiver = my_archiver
    algorithm.observer = initial_pop_observer

    kwargs["num_selected"] = kwargs["pop_size"]

    kwargs["bounder"] = func.bounder()
    kwargs["generator"] = generator

    final_pop: list[Individual] = algorithm.evolve(
        evaluator=func,
        maximize=False,
        initial_pop_storage=initial_pop_storage,
        num_vars=num_vars,
        **kwargs,
    )

    all_populations: list[list[Individual]] = algorithm.archive  # type: ignore
    all_candidates: list[list[NDArray[pl.float64]]] = [[] for _ in all_populations]
    for i in range(len(all_populations)):
        all_candidates[i] = [elem.candidate for elem in all_populations[i]]  # type: ignore

    # best_guy = final_pop[0].candidate
    # best_fitness = final_pop[0].fitness
    final_pop_fitnesses: NDArray[pl.float64] = pl.asarray(
        [guy.fitness for guy in final_pop]
    )
    final_pop_candidates: NDArray[pl.float64] = pl.asarray(
        [guy.candidate for guy in final_pop]
    )

    sort_indexes = sorted(
        range(len(final_pop_fitnesses)), key=final_pop_fitnesses.__getitem__
    )
    final_pop_fitnesses = final_pop_fitnesses[sort_indexes]
    final_pop_candidates = final_pop_candidates[sort_indexes]

    best_guy: NDArray[pl.float64] = final_pop_candidates[0]
    best_fitness: float = final_pop_fitnesses[0]

    return best_guy, best_fitness, final_pop, all_candidates

In [None]:
class ES(EvolutionaryComputation):
    """Evolution Strategy EC

    This class is a stub for you to implement progressively more
    sophisticated evolution strategies.

    Optional keyword arguments in ``evolve`` args parameter that you
    will need add support for:

    - *strategy_mode* -- One of {None, 'global', 'individual', 'correlated'}
    - *epsilon* -- the minimum allowed strategy parameter (default 0.00001)
    - *tau* -- a global proportionality constant (default None)
    - *tau_i* -- an individual proportionality constant (default None)
    - *num_offspring* -- number of offspring to generate at each iteration
                        (``\\lambda``) should be a multiple of \\mu
    - *mixing_number* -- mixing number (``\\rho``) (number of parents
                        involved in producing each offspring),
                        default 1 (no-mixing)

    If *tau* is ``None``, it will be set to ``1 / sqrt(2*n)``, where
    ``n`` is the length of a candidate. If *tau_i* is ``None``, it will be
    set to ``1 / sqrt(2*sqrt(n))``.

    """

    def __init__(self, random: Random | NumpyRandomWrapper):
        EvolutionaryComputation.__init__(self, random)
        self.selector = selectors.default_selection
        self.variator = self._internal_variation
        # self.replacer = replacers.comma_replacement
        self.replacer = replacers.plus_replacement
        self.bounder = Bounder()

    def elementary_rotation(
        self, p: int, q: int, alphas: list[float]
    ) -> NDArray[pl.float64]:
        num_vars: int = self.num_vars  # type: ignore
        R = pl.ones((num_vars, num_vars))

        # taken from Schwefel et al "Contemporary Evolution Strategies"
        k = int(0.5 * (2 * num_vars - p - 1) * (p + 2) - 2 * num_vars + q)
        cos_alpha = pl.cos(alphas[k])
        sin_alpha = pl.sin(alphas[k])
        R[p][p] = cos_alpha
        R[q][q] = cos_alpha
        R[p][q] = -sin_alpha
        R[q][p] = sin_alpha
        return R

    def _internal_variation(
        self,
        random: Random | NumpyRandomWrapper,
        candidates: list[list[float]],
        args: dict[str, Any],
    ):
        tau = args.setdefault("tau", None)
        tau_i = args.setdefault("tau", None)
        epsilon = args.setdefault("epsilon", 0.00001)

        # num_offspring (\\lambda)
        num_offspring = args.setdefault("num_offspring", len(candidates))

        mixing_number = args.setdefault("mixing_number", 1)

        if num_offspring % len(candidates) != 0:
            raise Exception(
                "num_offspring (\\lambda) should be a multiple " + "of pop_size (\\mu)"
            )

        mutants: list[NDArray[pl.float64]] = []
        num_vars: int = self.num_vars  # type: ignore

        if tau is None:
            tau = 1.0 / pl.sqrt(2 * num_vars)
        if tau_i is None:
            tau_i = 1.0 / pl.sqrt(2 * pl.sqrt(num_vars))

        while len(mutants) < num_offspring:
            parent_family_indices = random.sample(
                [*range(len(candidates))], mixing_number
            )
            parent_family = pl.asarray([candidates[i] for i in parent_family_indices])
            parent = parent_family.mean(0)

            cand = parent[: self.num_vars].copy()
            if self.strategy_mode is None:
                strat = []
                sigma = args.setdefault("sigma", 1.0)
                if isinstance(random, NumpyRandomWrapper):
                    cand += random.normal(0, sigma, cand.shape)
                else:
                    for i, c in enumerate(cand):
                        cand[i] = c + random.gauss(0, sigma)

            else:
                strat = parent[self.num_vars :].copy()

                if self.strategy_mode is GLOBAL:
                    sigmas = strat
                else:
                    sigmas = strat[: self.num_vars]  # view into strat

                e_global = tau * random.gauss(0, 1)

                # more efficient with numpy
                if isinstance(random, NumpyRandomWrapper):
                    sigmas *= pl.exp(
                        e_global + tau_i * random.normal(0, 1, sigmas.shape)
                    )
                    sigmas = pl.maximum(sigmas, epsilon)
                else:
                    for i, s in enumerate(sigmas):
                        sigmas[i] = s * pl.exp(e_global + tau_i * random.gauss(0, 1))
                        sigmas[i] = max(strat[i], epsilon)

                if self.strategy_mode is CORRELATED:
                    alphas = strat[self.num_vars :]  # another view into strat
                    beta_squared = (5.0 * pl.pi / 180) ** 2  # 5 deg squared
                    if isinstance(random, NumpyRandomWrapper):
                        alphas += random.normal(0, beta_squared, alphas.shape) + pl.pi
                        alphas %= 2 * pl.pi
                        alphas -= pl.pi
                    else:
                        for j, a in enumerate(alphas):
                            alphas[j] = (
                                (a + random.gauss(0, beta_squared) + pl.pi)
                                % (2 * pl.pi)
                            ) - pl.pi

                if self.strategy_mode is GLOBAL:
                    sigma = sigmas[0]
                    if isinstance(random, NumpyRandomWrapper):
                        cand = cand + random.normal(0, sigma, cand.shape)
                    else:
                        for i, c in enumerate(cand):
                            cand[i] = c + random.gauss(0, sigma)
                elif self.strategy_mode is INDIVIDUAL:
                    if isinstance(random, NumpyRandomWrapper):
                        cand += random.multivariate_normal(
                            pl.zeros(num_vars), pl.diag(sigmas**2)
                        )
                    else:
                        for i, (c, s) in enumerate(zip(cand, sigmas)):
                            cand[i] = c + random.gauss(0, s)
                elif self.strategy_mode is CORRELATED:
                    # build correlation matrix
                    T = reduce(
                        pl.dot,
                        [
                            reduce(
                                pl.dot,
                                [
                                    self.elementary_rotation(p, q, alphas)  # type: ignore
                                    for q in range(p + 1, num_vars)
                                ],
                            )
                            for p in range(num_vars - 1)
                        ],
                    )

                    if isinstance(random, NumpyRandomWrapper):
                        cand += random.multivariate_normal(
                            pl.zeros(num_vars), pl.dot(T, pl.diag(sigmas**2))
                        )
                    else:
                        raise Exception(
                            "NumpyRandomWrapper required" + " for correlated mutations"
                        )
                else:
                    raise Exception("Unknown strategy mode")

            cand = self.bounder(cand, args)
            cand: NDArray[pl.float64] = np.concatenate((cand, strat))
            mutants.append(cand)

        return mutants

    def _internal_evaluator(
        self, func: OptFun
    ) -> Callable[[list[list[NDArray[pl.float64]]], dict[str, Any]], list[float]]:
        @functools.wraps(func)
        def evaluator(
            candidates: list[list[NDArray[pl.float64]]], args: dict[str, Any]
        ):
            # convert candidates to array and then back to list
            # makes slicing easier
            return func(list(pl.asarray(candidates)[:, 0 : self.num_vars]), args)

        return evaluator

    def strategize(
        self, generator: Callable[[Random, dict[str, Any]], NDArray[pl.float64]]
    ):
        """Add strategy parameters to candidates created by a generator.

        This function decorator is used to provide a means of adding strategy
        parameters to candidates created by a generator. The generator function
        is modifed to extend the candidate with strategy parameters based on
        the strategy_mode argument passed to evolve.

        Each strategy parameter is initialized to a random value:
        in [0, 1] for ``\\sigma_i`` and in [-pi,pi] for ``\\alpha_i``

        """

        @functools.wraps(generator)
        def strategy_generator(random: Random, args: Any) -> NDArray[pl.float64]:
            candidate = generator(random, args)
            num_vars: int = self.num_vars  # type: ignore
            if self.strategy_mode is None:
                return candidate
            elif self.strategy_mode is GLOBAL:
                return np.concatenate((candidate, [random.random()]))
            else:
                sigmas = [random.random() for _ in range(num_vars)]
                if self.strategy_mode is INDIVIDUAL:
                    return np.concatenate((candidate, sigmas))
                elif self.strategy_mode is CORRELATED:
                    # since have python random, do it like this... would be
                    # better with numpy
                    alphas = [
                        random.uniform(-pl.pi, pl.pi)
                        for _ in range((num_vars**2 - num_vars) // 2)
                    ]
                    return np.concatenate((candidate, alphas, sigmas))
                else:
                    raise Exception("Unknown strategy mode")

        return strategy_generator

    def evolve(
        self,
        generator: Callable[[Random, dict[str, Any]], NDArray[pl.float64]],
        evaluator: OptFun,
        pop_size: int = 100,
        seeds: Optional[list[int]] = None,
        maximize: bool = False,
        bounder: Optional[
            Callable[[list[float], tuple[float, ...]], list[float]]
        ] = None,
        strategy_mode: Optional[str] = None,
        num_vars: Optional[int] = None,
        **args: dict[str, Any],
    ) -> list[Individual]:
        self.strategy_mode = strategy_mode
        self.num_vars = num_vars

        generator = self.strategize(generator)
        evaluator = self._internal_evaluator(evaluator)  # type: ignore
        return EvolutionaryComputation.evolve(
            self,
            generator,
            evaluator,
            pop_size,
            seeds,
            maximize=maximize,
            num_vars=num_vars,
            bounder=bounder,
            **args,
        )


def run_es(
    random: Random | NumpyRandomWrapper,
    func: OptFun,
    num_vars: int = 0,
    maximize: bool = False,
    **kwargs: Any,
) -> tuple[
    NDArray[pl.float64], float, list[Individual], list[list[NDArray[pl.float64]]]
]:
    # create dictionaries to store data about initial population, and lines
    initial_pop_storage = {}

    algorithm = ES(random)
    algorithm.terminator = terminators.generation_termination

    algorithm.observer = initial_pop_observer
    algorithm.archiver = my_archiver

    kwargs["num_selected"] = kwargs["pop_size"]
    kwargs["bounder"] = func.bounder()
    kwargs["generator"] = generator_wrapper(generator)

    final_pop = algorithm.evolve(
        evaluator=func,
        maximize=maximize,
        initial_pop_storage=initial_pop_storage,
        num_vars=num_vars,
        **kwargs,
    )
    all_populations: list[list[Individual]] = algorithm.archive  # type: ignore
    all_candidates: list[list[NDArray[pl.float64]]] = [[] for _ in all_populations]
    for i in range(len(all_populations)):
        all_candidates[i] = [elem.candidate for elem in all_populations[i]]  # type: ignore

    # best_guy = final_pop[0].candidate[0:num_vars]
    # best_fitness = final_pop[0].fitness
    final_pop_fitnesses = pl.asarray([guy.fitness for guy in final_pop])
    final_pop_candidates = pl.asarray([guy.candidate[0:num_vars] for guy in final_pop])  # type: ignore

    sort_indexes = sorted(
        range(len(final_pop_fitnesses)), key=final_pop_fitnesses.__getitem__
    )
    final_pop_fitnesses = final_pop_fitnesses[sort_indexes]
    final_pop_candidates = final_pop_candidates[sort_indexes]

    best_guy = final_pop_candidates[0]
    best_fitness = final_pop_fitnesses[0]

    return best_guy, best_fitness, final_pop, all_candidates

In [None]:
# This class inherit from the original inspyred.swarm.PSO class to allow archive of all the populations at each generation
class PSO(inspyred.swarm.PSO):
    def __init__(self, random: Random | NumpyRandomWrapper):
        inspyred.swarm.PSO.__init__(self, random)
        self.swarm_archive = []

    def _swarm_archiver(
        self,
        random: Random,
        population: list[Individual],
        archive: list[Individual],
        args: dict[str, Any],
    ) -> list[Individual]:
        self.swarm_archive.append(population)
        if len(archive) == 0:
            return population[:]
        else:
            new_archive: list[Individual] = []
            for p, a in zip(population[:], archive[:]):
                if p < a:
                    new_archive.append(a)
                else:
                    new_archive.append(p)
            return new_archive


def run_pso(
    random: Random | NumpyRandomWrapper,
    func: OptFun,
    num_vars: int = 0,
    maximize: bool = False,
    **kwargs: Any,
) -> tuple[
    NDArray[pl.float64], float, list[Individual], list[list[NDArray[pl.float64]]]
]:
    # create dictionaries to store data about initial population, and lines
    initial_pop_storage = {}

    algorithm = PSO(random)
    algorithm.swarm_archive = []
    algorithm.topology = topologies.star_topology
    algorithm.terminator = terminators.generation_termination

    algorithm.observer = initial_pop_observer

    if "topology" in kwargs:
        if kwargs["topology"] is STAR:
            algorithm.topology = topologies.star_topology
        elif kwargs["topology"] is RING:
            algorithm.topology = topologies.ring_topology

    kwargs["num_selected"] = kwargs["pop_size"]
    kwargs["bounder"] = func.bounder()
    kwargs["generator"] = generator

    final_pop = algorithm.evolve(
        evaluator=func,
        maximize=maximize,
        initial_pop_storage=initial_pop_storage,
        num_vars=num_vars,
        **kwargs,
    )

    all_populations: list[list[NDArray[pl.float64]]] = []
    for i in range(len(algorithm.swarm_archive)):
        all_populations.append([elem.candidate for elem in algorithm.swarm_archive[i]])

    final_pop_fitnesses: NDArray[pl.float64] = pl.asarray(
        [guy.fitness for guy in final_pop]
    )
    final_pop_candidates: NDArray[pl.float64] = pl.asarray(
        [guy.candidate for guy in final_pop]
    )

    sort_indexes = sorted(
        range(len(final_pop_fitnesses)), key=final_pop_fitnesses.__getitem__
    )
    final_pop_fitnesses = final_pop_fitnesses[sort_indexes]
    final_pop_candidates = final_pop_candidates[sort_indexes]

    best_guy = final_pop_candidates[0]
    best_fitness = final_pop_fitnesses[0]

    return best_guy, best_fitness, final_pop, all_populations

In [None]:
@dataclass(frozen=True)
class Result:
    mean_best_fitness: float
    std_best_fitness: float
    mean_best_individual: list[float]
    std_best_individual: list[float]

    def __repr__(self) -> str:
        return (
            f"Mean best fitness: {self.mean_best_fitness}\n"
            f"Std best fitness: {self.std_best_fitness}\n"
            f"Mean best individual: {self.mean_best_individual}\n"
            f"Std best individual: {self.std_best_individual}\n"
        )

    def __str__(self) -> str:
        return self.__repr__()


class Method(Enum):
    GA = "GA"
    ES = "ES"
    PSO = "PSO"


def run_simulation(
    func: OptFun,
    optimization_method: Method,
    num_simulations: int,
    args: Any,
    print_plots: bool,
) -> Result:
    seeds = [pl.randint(0, 1000) for _ in range(num_simulations)]
    best_fitnesses: list[float] = []
    best_individuals = []
    best_fitness = None
    best_all_populations = []
    best_func = func

    for i in range(num_simulations):
        func.clear()
        function = deepcopy(func)
        match optimization_method.value:
            case Method.GA.value:
                b_guy, b_fitness, _, all_populations = run_ga(
                    NumpyRandomWrapper(seeds[i]), function, **args
                )
            case Method.ES.value:
                b_guy, b_fitness, _, all_populations = run_es(
                    NumpyRandomWrapper(seeds[i]), function, **args
                )
            case Method.PSO.value:
                b_guy, b_fitness, _, all_populations = run_pso(
                    NumpyRandomWrapper(seeds[i]), function, **args
                )
            case _:  # type: ignore
                raise ValueError("Invalid optimization method")
        best_fitnesses.append(b_fitness)
        best_individuals.append(b_guy)
        if best_fitness is None or b_fitness < best_fitness:
            best_fitness = b_fitness
            best_all_populations = all_populations
            best_func = deepcopy(function)

    mean_best_fitness: float = pl.mean(best_fitnesses).item()
    std_best_fitness: float = pl.std(best_fitnesses).item()
    mean_best_individual: list[float] = pl.mean(best_individuals, axis=0)
    std_best_individual: list[float] = pl.std(best_individuals, axis=0)

    if print_plots:
        plot_population_evolution(best_all_populations, best_func, generation_step=2)
        best_func.plot()

    return Result(
        mean_best_fitness, std_best_fitness, mean_best_individual, std_best_individual
    )

### Exercises

#### Exercise 1/3: Genetic algorithms

1. Compare the results of mutation-only vs crossover-only
2. Fixing the mutation probability, compare different values for the crossover
3. How does the selective pressure (i.e., tournament size) affect the search?
4. Compare the search process on different benchmark functions

In [None]:
# change mutation rate
func = OptFun(bf.Ackley(2))
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["pop_size"] = 50  # population size
args["max_generations"] = 50  # Number of generations of the GA
args["crossover_rate"] = 0.7
args["mutation_rate"] = 0.1
args["tournament_size"] = 2
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

results: dict[str, Result] = {}
mutations = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
for mutation in mutations:
    args["mutation_rate"] = mutation
    results[f"mutation_rate_{mutation}"] = run_simulation(
        func, Method.GA, 10, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Mutation only
func = OptFun(bf.Ackley(2))
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["pop_size"] = 50  # population size
args["max_generations"] = 50  # Number of generations of the GA
args["crossover_rate"] = 0.0
args["mutation_rate"] = 0.1
args["tournament_size"] = 2
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

results: dict[str, Result] = {}
mutations = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
for mutation in mutations:
    args["mutation_rate"] = mutation
    results[f"mutation_rate_{mutation}"] = run_simulation(
        func, Method.GA, 10, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Crossover only
func = OptFun(bf.Ackley(2))
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["pop_size"] = 50  # population size
args["max_generations"] = 50  # Number of generations of the GA
args["crossover_rate"] = 0.0
args["mutation_rate"] = 0.0
args["tournament_size"] = 2
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

results: dict[str, Result] = {}
crossovers = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
for crossover in crossovers:
    args["crossover_rate"] = crossover
    results[f"crossover_rate_{crossover}"] = run_simulation(
        func, Method.GA, 10, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Fix the mutation rate and change crossover rate
func = OptFun(bf.Ackley(2))
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["pop_size"] = 50  # population size
args["max_generations"] = 50  # Number of generations of the GA
args["crossover_rate"] = 0.0
args["mutation_rate"] = 0.2
args["tournament_size"] = 2
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

results: dict[str, Result] = {}
crossovers = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
for crossover in crossovers:
    args["crossover_rate"] = crossover
    results[f"crossover_rate_{crossover}"] = run_simulation(
        func, Method.GA, 10, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Change tournament size
func = OptFun(bf.Rastrigin(2))
pop_size = 50
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["pop_size"] = pop_size  # population size
args["max_generations"] = 50  # Number of generations of the GA
args["crossover_rate"] = 0.7
args["mutation_rate"] = 0.2
args["tournament_size"] = 2
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

results: dict[str, Result] = {}
for size in [1, 5, 10, 20, 30, 40, 50]:
    args["tournament_size"] = size
    results[f"tournament_size_{size}"] = run_simulation(
        func, Method.GA, 10, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Change benchmark function
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["gaussian_stdev"] = 0.5  # Standard deviation of the Gaussian mutations
args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["pop_size"] = 50  # population size
args["max_generations"] = 50  # Number of generations of the GA
args["crossover_rate"] = 0.7
args["mutation_rate"] = 0.1
args["tournament_size"] = 2

results: dict[str, Result] = {}
functions: list[bf.BenchmarkFunction] = [
    bf.Ackley(2),
    bf.Rastrigin(2),
    bf.Hypersphere(2),
    bf.Rosenbrock(2),
    bf.Griewank(2),
]
for wf in functions:
    func = OptFun(wf)
    args["pop_init_range"] = func.bounds()[0]
    results[f"{func.f.name()}"] = run_simulation(func, Method.GA, 10, args, True)

for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

#### Exercise 2/3: Evolutionary Strategies

1. How does the number of offspring  λ  parameter affect the search?
2. How does the mixing number  ρ  affect the search?
3. Describe the impact of the different strategies (None/GLOBAL/INDIVIDUAL) on the search process.

In [None]:
func = OptFun(bf.Ackley(2))

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["max_generations"] = 50
args["sigma"] = 1.0  # default standard deviation
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population
args["pop_size"] = 50  # mu
args["num_offspring"] = 100  # lambda
args["mixing_number"] = 5  # rho
args["strategy_mode"] = None

best_guy, best_fitness, final_pop, all_populations = run_es(
    Random(0), func, **args  # Seeded random number generator
)

plot_population_evolution(all_populations, func, generation_step=2)
func.plot()

In [None]:
# Change number of offspring
func = OptFun(bf.Ackley(2))
num_simulations = 10
pop_size = 50

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["max_generations"] = 50
args["sigma"] = 1.0  # default standard deviation
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population
args["pop_size"] = pop_size  # mu
args["num_offspring"] = 100  # lambda
args["mixing_number"] = 5  # rho
args["strategy_mode"] = None


results: dict[str, Result] = {}
num_offsprings = range(pop_size, pop_size * 11, pop_size)
for offspring in num_offsprings:
    args["num_offspring"] = offspring
    results[f"num_offspring_{offspring}"] = run_simulation(
        func, Method.ES, num_simulations, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Change mixing number (parent used for crossover)
func = OptFun(bf.Ackley(2))
num_simulations = 10

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["max_generations"] = 50
args["sigma"] = 1.0  # default standard deviation
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population
args["pop_size"] = 50  # mu
args["num_offspring"] = 100  # lambda
args["mixing_number"] = 5  # rho
args["strategy_mode"] = None


results: dict[str, Result] = {}
mixing_numbers = [1, 5, 10, 20, 30, 40, 50]
for mix_number in mixing_numbers:
    args["mixing_number"] = mix_number
    results[f"mixing_number_{mix_number}"] = run_simulation(
        func, Method.ES, num_simulations, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# Change strategy mode
func = OptFun(bf.Ackley(2))
num_simulations = 10

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["max_generations"] = 50
args["sigma"] = 1.0  # default standard deviation
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population
args["pop_size"] = 50  # mu
args["num_offspring"] = 100  # lambda
args["mixing_number"] = 5  # rho
args["strategy_mode"] = None


results: dict[str, Result] = {}
strategies = [None, GLOBAL, INDIVIDUAL, CORRELATED]
for strategy in strategies:
    args["strategy_mode"] = strategy
    results[f"strategy_{strategy}"] = run_simulation(
        func, Method.ES, num_simulations, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

#### Exercise 3/3: Particle Swarm Optimization

1. How does PSO compare to GA on different benchmark functions?
2. How does PSO compare to ES on different benchmark functions?
3. Vary the pop_size and the n. of generations in such a way that their product is always constant, and compare the outcome of the search (over multiple runs). What is better? To have higher pop_size or higher number of generations?

In [None]:
func = OptFun(bf.Ackley(2))

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

args["pop_size"] = 50  # mu
args["topology"] = STAR  # RING, STAR
args["neighborhood_size"] = 2  # used only for the ring topology
args["inertia"] = 0.3
args["cognitive_rate"] = 0.5
args["social_rate"] = 2.1
args["max_generations"] = 50

best_guy, best_fitness, final_pop, all_populations = run_pso(
    NumpyRandomWrapper(0), func, **args  # Seeded random number generator
)

plot_population_evolution(all_populations, func, generation_step=2)
func.plot()

In [None]:
# constant pop_size * num_generations
func = OptFun(bf.Ackley(2))

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

args["pop_size"] = 50  # mu
args["topology"] = STAR  # RING, STAR
args["neighborhood_size"] = 2  # used only for the ring topology
args["inertia"] = 0.3
args["cognitive_rate"] = 0.5
args["social_rate"] = 2.1
args["max_generations"] = 50

results: dict[str, Result] = {}
product = 50 * 50
pop_sizes = [20, 40, 60, 80, 100]
for size in pop_sizes:
    args["pop_size"] = size
    generations = product // size
    args["max_generations"] = generations
    results[f"size_{size}_generations_{generations}"] = run_simulation(
        func, Method.PSO, num_simulations, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# change topology
func = OptFun(bf.Ackley(2))

args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

args["pop_size"] = 50  # mu
args["topology"] = STAR  # RING, STAR
args["neighborhood_size"] = 2  # used only for the ring topology
args["inertia"] = 0.3
args["cognitive_rate"] = 0.5
args["social_rate"] = 2.1
args["max_generations"] = 50

results: dict[str, Result] = {}
for topology in [RING, STAR]:
    args["topology"] = topology
    results[f"topology_{topology}"] = run_simulation(
        func, Method.PSO, num_simulations, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")

In [None]:
# change neighborhood size
func = OptFun(bf.Ackley(2))
pop_size = 50
args = {}
args["num_vars"] = 2  # Number of dimensions of the search space
args["pop_init_range"] = func.bounds()[0]  # Range for the initial population

args["pop_size"] = pop_size  # mu
args["topology"] = RING  # RING, STAR
args["neighborhood_size"] = 2  # used only for the ring topology
args["inertia"] = 0.3
args["cognitive_rate"] = 0.5
args["social_rate"] = 2.1
args["max_generations"] = 50

results: dict[str, Result] = {}
for size in [2, 5, 10, 20, 30, 40, 50]:
    args["neighborhood_size"] = size
    results[f"neighborhood_size_{size}"] = run_simulation(
        func, Method.PSO, num_simulations, args, False
    )
for key, result in results.items():
    print(key)
    print(result)
    print("--------------------")