# Introduction

## Goal
The goal of this lab is to familiarize yourself with the most important techniques for neuro-evolution. You will first explore the basic neuro-evolution techniques and then move on to more advanced algorithms such as Neural Evolution of Augmenting Topologies (NEAT).

This lab continues the use of the *inspyred* framework for the Python programming language seen in the previous labs. If you did not participate in the previous labs, you may want to look that over first and then start this lab's exercises. Furthermore, in this lab we will use another Python library, *neat-python*, that contains a complete implementation of **NEAT** ([link here](https://neat-python.readthedocs.io)).


Note that, unless otherwise specified, in this module's exercises we will use real-valued genotypes and that the aim of the algorithms will be to *minimize* the training error function of the evolved Neural Networks, i.e., lower values correspond to a better fitness!

In [None]:
import os
import sys

module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
    sys.path.append(module_path)

In [None]:
import numpy as np
from numpy.typing import NDArray


class NeuralNet:
    def __init__(
        self,
        num_inputs: int,
        num_outputs: int,
        num_hidden: int = 0,
        recurrent: bool = False,
        batch_size: int = 1,
    ):
        self.weights: list[NDArray[np.float64]] = []
        self.num_inputs = num_inputs
        self.num_outputs = num_outputs
        self.num_hidden = num_hidden
        self.recurrent = recurrent

        # if there is a hidden layer, we feed all inputs to hidden and all
        # hidden to outputs

        # if recurrent, we feedback hidden activations to hidden
        # layer (or output to output layer, if there is no hidden layer)

        # here we create data structures for weights
        # NOTE: we consider the bias to be an extra input to each layer for
        # ease of calculation
        if num_hidden == 0:
            n = num_inputs + 1
            if recurrent:
                n += num_outputs
            m = num_outputs
            self.weights.append(np.zeros((n, m), dtype=float))

        else:
            # input to hidden matrix first
            n1 = num_inputs + 1
            if recurrent:
                n1 += num_hidden
            m1 = num_hidden
            self.weights.append(np.zeros((n1, m1), dtype=float))

            # and now hidden to output
            n2 = num_hidden + 1
            m2 = num_outputs

            self.weights.append(np.zeros((n2, m2), dtype=float))

        self.num_params = sum([w.size for w in self.weights])

        # create vector for neuron activations
        self.reset(batch_size)

    def reset(self, batch_size: int = 1):
        self.batch_size = batch_size
        # zero out all activations
        self.activations = np.zeros((batch_size, self.num_hidden + self.num_outputs))

    def set_params(self, params: list[float] | NDArray[np.float64]) -> None:
        if len(params) != self.num_params:
            raise Exception(
                "Incorrect number of params! Expected "
                + str(self.num_params)
                + ", but received "
                + str(len(params))
            )

        self.weights[0][:, :] = np.asarray(params[: self.weights[0].size]).reshape(
            self.weights[0].shape
        )
        if self.num_hidden > 0:
            self.weights[1][:, :] = np.asarray(params[self.weights[0].size :]).reshape(
                self.weights[1].shape
            )

    def step(self, inputs: NDArray[np.int64]) -> NDArray[np.float64]:  # type: ignore
        # step the network and return the current output activations
        inputs: NDArray[np.float64] = np.asarray(inputs, dtype=float)
        one_dimensional = len(inputs.shape) == 1

        # first, if just given a 1D input array, convert to a matrix
        if one_dimensional:
            inputs = inputs[None, :]  # adds a second dimension
        if inputs.shape[0] != self.batch_size:
            raise Exception(
                "Incorrect batch size! Should be "
                + str(self.batch_size)
                + ", but is "
                + str(inputs.shape[0])
            )
        if inputs.shape[1] != self.num_inputs:
            raise Exception(
                "Incorrect number of inputs! Should be "
                + str(self.num_inputs)
                + ", but is "
                + str(inputs.shape[1])
            )

        # second, add in biases and current hidden activations (if recurrent)
        if self.recurrent:
            if self.num_hidden > 0:
                input_values = np.hstack((
                    inputs,
                    self.activations[:, : self.num_hidden],
                    np.ones((self.batch_size, 1)),
                ))
            else:
                input_values = np.hstack((
                    inputs,
                    self.activations[:, :],
                    np.ones((self.batch_size, 1)),
                ))
        else:
            input_values = np.hstack((inputs, np.ones((self.batch_size, 1))))

        # now calculate new activations by taking dot product and applying sigmoid
        new_activations = 1.0 / (
            1 + np.exp(-1.0 * np.dot(input_values, self.weights[0]))
        )
        if self.num_hidden == 0:
            self.activations[:, :] = new_activations
        else:
            # use old hidden activations to compute outputs
            hiddens = np.hstack((
                self.activations[:, : self.num_hidden],
                np.ones((self.batch_size, 1)),
            ))
            self.activations[:, self.num_hidden :] = 1.0 / (
                1 + np.exp(-1.0 * np.dot(hiddens, self.weights[1]))
            )
            self.activations[:, : self.num_hidden] = new_activations

        # finally, convert back to 1D, if that's what we started with
        # otherwise return the matrix
        if one_dimensional:
            return self.activations[0, self.num_hidden :]
        else:
            return self.activations[:, self.num_hidden :]

In [None]:
from inspyred import ec
from inspyred.benchmarks import Benchmark
import numpy as np
from typing import Any, Callable
from numpy.typing import NDArray
from utils.inspyred_utils import NumpyRandomWrapper
from abc import abstractmethod

MAX_WEIGHT: int = 8

INPUTS: NDArray[np.int64] = np.zeros((4, 2), dtype=int)
for i in range(4):
    INPUTS[i, :] = np.array([1 if i < 2 else 0, 1 if i % 2 == 0 else 0])


class NeuralNetworkBenchmark(Benchmark):
    """Defines the base class for Neural Network Benchmark Problems.  Other
    neural net benchmarks should inherit from this
    """

    def __init__(self, net: NeuralNet):
        self.net = net
        super(NeuralNetworkBenchmark, self).__init__(self.net.num_params)
        self.bounder = ec.Bounder(
            [-MAX_WEIGHT] * self.dimensions, [MAX_WEIGHT] * self.dimensions
        )

    def evaluator(
        self, candidates: NDArray[np.float64], args: Any
    ) -> NDArray[np.float64]:
        return np.array(list(map(self.evaluate_single, candidates)))

    def generator(
        self, random: NumpyRandomWrapper, args: dict[str, Any]
    ) -> NDArray[np.float64]:
        return np.asarray([
            random.uniform(-MAX_WEIGHT, MAX_WEIGHT) for _ in range(self.net.num_params)
        ])

    @abstractmethod
    def evaluate_single(self, candidate: list[float]) -> int:
        pass


class BaseLogicBenchmark(NeuralNetworkBenchmark):
    """Defines the base class for single time step logic
    Neural Network Benchmark Problems. Other logic neural net benchmarks
    should inherit from this
    """

    def __init__(self, net: NeuralNet, logic_fn: Callable[[Any, Any], Any]):
        super(BaseLogicBenchmark, self).__init__(net)
        self.maximize = False
        self.targets = np.asarray(logic_fn(INPUTS[:, 0], INPUTS[:, 1]), dtype=float)[
            :, None
        ]

    def evaluate_single(self, candidate: list[float]) -> int:
        self.net.set_params(candidate)
        self.net.reset(4)
        # step once if no hidden
        outputs = self.net.step(INPUTS)
        # if hidden, step a second time so inputs reach outputs
        if self.net.num_hidden > 0:
            outputs = self.net.step(INPUTS)

        # feed the inputs to the net and calculate error
        return np.sum((outputs - self.targets) ** 2).item()


class Or(BaseLogicBenchmark):
    """Defines Or Benchmark function, using a neural net"""

    def __init__(self, num_hidden: int = 0, recurrent: bool = False):
        super(Or, self).__init__(NeuralNet(2, 1, num_hidden, recurrent), np.logical_or)


class And(BaseLogicBenchmark):
    """Defines OR Benchmark function, using a neural net"""

    def __init__(self, num_hidden: int = 0, recurrent: bool = False):
        super(And, self).__init__(
            NeuralNet(2, 1, num_hidden, recurrent), np.logical_and
        )


class Xor(BaseLogicBenchmark):
    """Defines Xor Benchmark function, using a neural net"""

    def __init__(self, num_hidden: int = 0, recurrent: bool = False):
        super(Xor, self).__init__(
            NeuralNet(2, 1, num_hidden, recurrent), np.logical_xor
        )


class TemporalLogicBenchmark(NeuralNetworkBenchmark):
    """Defines the base class for temporal logic
    Neural Network Benchmark Problems. Other temporal logic neural net
    benchmarks should inherit from this
    """

    def __init__(self, net: NeuralNet, logic_fn: Callable[[Any, Any], Any]):
        super(TemporalLogicBenchmark, self).__init__(net)
        self.maximize = False
        self.targets = np.asarray(logic_fn(INPUTS[:, 0], INPUTS[:, 1]), dtype=float)[
            :, None
        ]

    def evaluate_single(self, candidate: list[float]) -> int:
        self.net.set_params(candidate)
        self.net.reset(4)

        # for temporal xor we care about the output after seeing both inputs
        outputs: NDArray[np.float64] = np.zeros((4, 1), dtype=float)
        for i in range(2):
            # step once if no hidden
            outputs = self.net.step(INPUTS[:, i : i + 1])
            # if hidden, step a second time for each input so inputs
            # reach outputs
            if self.net.num_hidden > 0:
                outputs = self.net.step(INPUTS[:, i : i + 1])

        # feed the inputs to the net and calculate error
        return np.sum((outputs - self.targets) ** 2).item()


class TemporalOr(TemporalLogicBenchmark):
    """Defines Temporal Or Benchmark function, using a neural net"""

    def __init__(self, num_hidden: int = 0, recurrent: bool = False):
        super(TemporalOr, self).__init__(
            NeuralNet(1, 1, num_hidden, recurrent), np.logical_or
        )


class TemporalAnd(TemporalLogicBenchmark):
    """Defines OR Benchmark function, using a neural net"""

    def __init__(self, num_hidden: int = 0, recurrent: bool = False):
        super(TemporalAnd, self).__init__(
            NeuralNet(1, 1, num_hidden, recurrent), np.logical_and
        )


class TemporalXor(TemporalLogicBenchmark):
    """Defines Temporal Xor Benchmark function, using a neural net"""

    def __init__(self, num_hidden: int = 0, recurrent: bool = False):
        super(TemporalXor, self).__init__(
            NeuralNet(1, 1, num_hidden, recurrent), np.logical_xor
        )

In [None]:
from typing import Optional
import matplotlib.pyplot as plt
from matplotlib import colors
from matplotlib.path import Path
from matplotlib.axes import Axes

import matplotlib.patches as patches
import numpy as np
from numpy.typing import NDArray

from math import cos, sin, atan


class ANNPlotter(object):
    """
    Plot a neural network, positions determination based on code from:
    https://stackoverflow.com/questions/29888233/how-to-visualize-a-neural-network
    """

    def __init__(
        self,
        net: NeuralNet,
        axes: Optional[Axes] = None,
        vertical_distance_between_layers: int = 10,
        horizontal_distance_between_neurons: int = 10,
        neuron_radius: float = 0.9,
    ):
        self.net = net
        self.axes = plt.figure("ANN Plotter").gca() if axes is None else axes

        self.vertical_distance_between_layers = vertical_distance_between_layers
        self.horizontal_distance_between_neurons = horizontal_distance_between_neurons
        self.neuron_radius: float = neuron_radius

        self.number_of_neurons_in_widest_layer: int = np.max([
            net.num_inputs,
            net.num_hidden,
            net.num_outputs,
        ])

        # now calculate positions of all neurons
        self.neuron_positions: list[list[tuple[float, float]]] = [[]]
        x = self.__calculate_left_margin_so_layer_is_centered(self.net.num_inputs)
        y = 0.0
        for _ in range(self.net.num_inputs):
            self.neuron_positions[-1].append((x, y))
            x += self.horizontal_distance_between_neurons

        if self.net.num_hidden > 0:
            self.neuron_positions.append([])
            y += self.vertical_distance_between_layers
            x = self.__calculate_left_margin_so_layer_is_centered(self.net.num_hidden)
            for _ in range(self.net.num_hidden):
                self.neuron_positions[-1].append((x, y))
                x += horizontal_distance_between_neurons

        self.neuron_positions.append([])
        y += self.vertical_distance_between_layers
        x = self.__calculate_left_margin_so_layer_is_centered(self.net.num_outputs)
        for _ in range(self.net.num_outputs):
            self.neuron_positions[-1].append((x, y))
            x += horizontal_distance_between_neurons

        norm = colors.Normalize(vmin=-MAX_WEIGHT, vmax=MAX_WEIGHT)
        self.scalar_map = plt.cm.ScalarMappable(norm=norm, cmap="jet")
        self.weights = []

    def __calculate_left_margin_so_layer_is_centered(
        self, number_of_neurons: int
    ) -> float:
        return (
            self.horizontal_distance_between_neurons
            * (self.number_of_neurons_in_widest_layer - number_of_neurons)
        ) / 2.0

    def __draw_synapse(
        self,
        neuron1: tuple[float, float],
        neuron2: tuple[float, float],
        weight: NDArray[np.float64],
        recurrent: bool = False,
    ):
        color = self.scalar_map.to_rgba(weight)
        self.weights.append(weight)

        if not recurrent:
            angle = atan((neuron2[0] - neuron1[0]) / float(neuron2[1] - neuron1[1]))

            x_adjustment = self.neuron_radius * sin(angle)
            y_adjustment = self.neuron_radius * cos(angle)

            self.axes.arrow(
                neuron1[0],
                neuron1[1],
                neuron2[0] - x_adjustment - neuron1[0],
                neuron2[1] - y_adjustment - neuron1[1],
                head_width=0.5,
                head_length=0.5,
                fc=color,
                ec=color,
                length_includes_head=True,
                zorder=1,
            )

            # show weight 1/3 of way up connection line

            label_x = neuron1[0] + (neuron2[0] - neuron1[0]) / 3.0
            label_y = neuron1[1] + (neuron2[1] - neuron1[1]) / 3.0

        else:
            if neuron2[1] == neuron1[1]:  # same layer
                height_factor = 1.5
                width_factor = 2.0
                if neuron1[0] > neuron2[0]:
                    height_factor = 4.0
                    width_factor = 4.0
                verts = [
                    neuron1,
                    (
                        neuron1[0]
                        - self.horizontal_distance_between_neurons / width_factor,
                        neuron1[1]
                        + self.vertical_distance_between_layers / height_factor,
                    ),
                    (
                        neuron2[0]
                        + self.horizontal_distance_between_neurons / width_factor,
                        neuron2[1]
                        + self.vertical_distance_between_layers / height_factor,
                    ),
                    neuron2,
                ]
                codes = [Path.MOVETO, Path.CURVE4, Path.CURVE4, Path.CURVE4]
            else:  # going to prior layer (actually doesn't exist right now)
                factor = -2.0
                if neuron2[0] > neuron1[0]:  # left to right
                    factor = 2.0
                verts = [
                    neuron1,
                    (
                        neuron1[0] + self.horizontal_distance_between_neurons * factor,
                        (neuron1[1] + neuron2[1]) / 2.0,
                    ),
                    neuron2,
                ]
                codes = [Path.MOVETO, Path.CURVE4, Path.CURVE4]

            path = Path(verts, codes)

            patch = patches.PathPatch(path, ec=color, fc="none", zorder=1, aa=True)
            # patch = patches.FancyArrowPatch(path=path, ec=color, fc=color, zorder=1, aa=True, arrowstyle="-|>", mutation_scale=150**.5)
            patch_verts: NDArray[np.float64] = patch.get_path().vertices  # type: ignore
            self.axes.add_patch(patch)

            if neuron1 == neuron2:
                vert = patch_verts[int(len(patch_verts) / 2)]
            else:
                vert = patch_verts[int(len(patch_verts) / 3)]

            label_x = vert[0]
            label_y = vert[1]

            if patch_verts[-2][1] != neuron2[1]:
                angle = atan(
                    (patch_verts[-2][0] - neuron2[0])
                    / float(patch_verts[-2][1] - neuron2[1])
                )
                dx = 0.5 * sin(angle)
                dy = 0.5 * cos(angle)
                x_adjustment = self.neuron_radius * sin(angle)
                y_adjustment = self.neuron_radius * cos(angle)

                self.axes.arrow(
                    neuron2[0] + x_adjustment + dx,
                    neuron2[1] + y_adjustment + dy,
                    -dx,
                    -dy,
                    head_width=0.5,
                    head_length=0.5,
                    fc=color,
                    ec=color,
                    length_includes_head=True,
                    zorder=1,
                    aa=True,
                )

            else:
                if neuron1[0] > neuron2[0]:
                    dx = 0.5
                else:
                    dx = -0.5

                self.axes.arrow(
                    (neuron1[0] + neuron2[0]) / 2 - 0.1,
                    patch_verts[int(len(patch_verts) / 2 - 1)][1] + 0.05,
                    dx,
                    0,
                    head_width=0.5,
                    head_length=0.5,
                    fc=color,
                    ec=color,
                    length_includes_head=True,
                    zorder=1,
                    aa=True,
                )

        # use default box of white with white border
        bbox = dict(
            boxstyle="round",
            ec=(1.0, 1.0, 1.0),
            fc=(1.0, 1.0, 1.0),
        )
        self.axes.text(
            label_x,
            label_y,
            "{:.2f}".format(weight),
            color="black",
            size="x-small",
            verticalalignment="center",
            horizontalalignment="center",
            bbox=bbox,
            zorder=2,
        )

    def __draw_bias(self, layer: int) -> tuple[float, float]:
        position = (
            self.number_of_neurons_in_widest_layer
            * self.horizontal_distance_between_neurons,
            self.neuron_positions[layer][0][1]
            + self.vertical_distance_between_layers / 2.0,
        )
        text = self.axes.text(
            position[0],
            position[1],
            "Bias",
            color="black",
            horizontalalignment="center",
            size="large",
        )
        text.set_bbox(dict(edgecolor="black", facecolor="lightgray"))
        text.set_zorder(2)
        return position

    def draw(self):
        for layer in self.neuron_positions:
            for neuron_position in layer:
                circle = patches.Circle(
                    neuron_position,
                    radius=self.neuron_radius,
                    ec="black",
                    fc="lightgray",
                    aa=True,
                )
                circle.set_zorder(2)
                self.axes.add_patch(circle)

        bias_positions = []
        bias_positions.append(self.__draw_bias(0))
        if self.net.num_hidden > 0:
            bias_positions.append(self.__draw_bias(1))

        if self.net.num_hidden == 0:
            for j in range(self.net.num_outputs):
                for i in range(self.net.num_inputs):
                    self.__draw_synapse(
                        self.neuron_positions[0][i],
                        self.neuron_positions[1][j],
                        self.net.weights[0][i][j],
                    )
                self.__draw_synapse(
                    bias_positions[0],
                    self.neuron_positions[1][j],
                    self.net.weights[0][-1][j],
                )
            if self.net.recurrent:
                # for no hidden units, but recurrent, we feedback outputs to self
                for j in range(self.net.num_outputs):
                    for i in range(self.net.num_outputs):
                        self.__draw_synapse(
                            self.neuron_positions[1][i],
                            self.neuron_positions[1][j],
                            self.net.weights[0][self.net.num_inputs + i][j],
                            True,
                        )
        else:
            for j in range(self.net.num_hidden):
                for i in range(self.net.num_inputs):
                    self.__draw_synapse(
                        self.neuron_positions[0][i],
                        self.neuron_positions[1][j],
                        self.net.weights[0][i][j],
                    )
                self.__draw_synapse(
                    bias_positions[0],
                    self.neuron_positions[1][j],
                    self.net.weights[0][-1][j],
                )
            for j in range(self.net.num_outputs):
                for i in range(self.net.num_hidden):
                    self.__draw_synapse(
                        self.neuron_positions[1][i],
                        self.neuron_positions[2][j],
                        self.net.weights[1][i][j],
                    )
                self.__draw_synapse(
                    bias_positions[1],
                    self.neuron_positions[2][j],
                    self.net.weights[1][-1][j],
                )

            if self.net.recurrent:
                # for hidden + recurrent, we feedback hidden to hidden
                for j in range(self.net.num_hidden):
                    for i in range(self.net.num_hidden):
                        self.__draw_synapse(
                            self.neuron_positions[1][i],
                            self.neuron_positions[1][j],
                            self.net.weights[0][self.net.num_inputs + i][j],
                            True,
                        )

        self.axes.relim()
        self.axes.autoscale(tight=False)
        xlim = self.axes.get_xlim()
        self.axes.set_xlim(
            xlim[0] - self.neuron_radius,
            xlim[1] + self.horizontal_distance_between_neurons + self.neuron_radius,
        )
        self.axes.set_aspect("equal")
        # turn off all x axis ticks
        self.axes.get_xaxis().set_ticks([])
        self.axes.get_yaxis().set_ticks([
            layer[0][1] for layer in self.neuron_positions
        ])
        if self.net.num_hidden > 0:
            self.axes.get_yaxis().set_ticklabels(["Input", "Hidden", "Output"])
        else:
            self.axes.get_yaxis().set_ticklabels(["Input", "Output"])

        self.scalar_map.set_array(np.asarray(self.weights))
        plt.colorbar(self.scalar_map, ax=self.axes)

In [None]:
from utils.plot_utils import plot_results_1D, plot_results_2D, plot_observer
from inspyred import benchmarks
from inspyred.ec import (
    terminators,
    replacers,
    selectors,
    variators,
    EvolutionaryComputation,
    Individual,
)
import numpy as np
from typing import Any
from numpy.typing import NDArray
from random import Random
from utils.inspyred_utils import NumpyRandomWrapper


def generate_offspring(
    random: Random,
    x0: list[float] | NDArray[np.float64],  # type: ignore
    std_dev: float,
    num_offspring: int,
    display: bool,
    kwargs: Any,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
    x0: NDArray[np.float64] = np.asarray(x0, dtype=np.float64)

    problem = benchmarks.Sphere(len(x0))

    parent_fitness: NDArray[np.float64] = problem.evaluator([x0], None)[0]

    algorithm = EvolutionaryComputation(random)
    algorithm.terminator = terminators.generation_termination
    algorithm.replacer = replacers.generational_replacement
    algorithm.variator = variators.gaussian_mutation  # type: ignore

    final_pop = algorithm.evolve(
        generator=(lambda random, args: x0.copy()),  # type: ignore
        evaluator=problem.evaluator,
        pop_size=num_offspring,
        maximize=False,
        max_generations=1,
        mutation_rate=1.0,
        gaussian_stdev=std_dev,
    )

    offspring_fitnesses = np.asarray([guy.fitness for guy in final_pop])
    offspring = np.asarray([guy.candidate for guy in final_pop])

    if display:
        # Plot the parent and the offspring on the fitness landscape
        # (only for 1D or 2D functions)
        if len(x0) == 1:
            plot_results_1D(
                problem,
                x0,
                parent_fitness,
                offspring,
                offspring_fitnesses,
                "Parent",
                "Offspring",
                kwargs,
            )

        elif len(x0) == 2:
            plot_results_2D(
                problem, np.asarray([x0]), offspring, "Parent", "Offspring", kwargs
            )

    return (parent_fitness, offspring_fitnesses)


def generator(random: Random, args: dict[str, Any]) -> NDArray[np.float64]:
    return np.asarray([
        random.uniform(args["pop_init_range"][0], args["pop_init_range"][1])
        for _ in range(args["num_vars"])
    ])


def initial_pop_observer(
    population: list[Individual],
    num_generations: int,
    num_evaluations: int,
    args: dict[str, Any],
) -> None:
    if num_generations == 0:
        args["initial_pop_storage"]["individuals"] = np.asarray([
            guy.candidate for guy in population
        ])
        args["initial_pop_storage"]["fitnesses"] = np.asarray([
            guy.fitness for guy in population
        ])


def run_ga(
    random: Random | NumpyRandomWrapper,
    display: bool = False,
    num_vars: int = 0,
    problem_class: Any = benchmarks.Sphere,
    maximize: bool = False,
    use_bounder: bool = True,
    **kwargs: Any,
) -> tuple[NDArray[np.float64], float, list[Individual]]:
    # create dictionaries to store data about initial population, and lines
    initial_pop_storage = {}

    algorithm = EvolutionaryComputation(random)
    algorithm.terminator = terminators.generation_termination
    algorithm.replacer = replacers.generational_replacement
    algorithm.variator = [  # type: ignore
        variators.uniform_crossover,
        variators.gaussian_mutation,
    ]
    algorithm.selector = selectors.tournament_selection

    if display:
        algorithm.observer = [plot_observer, initial_pop_observer]  # type: ignore
    else:
        algorithm.observer = initial_pop_observer

    kwargs["num_selected"] = kwargs["pop_size"]

    if issubclass(problem_class.__class__, NeuralNetworkBenchmark):
        if "num_hidden_units" not in kwargs:
            kwargs["num_hidden_units"] = 0
        if "recurrent" not in kwargs:
            kwargs["recurrent"] = False
        problem = problem_class(kwargs["num_hidden_units"], kwargs["recurrent"])  # type: ignore
        num_vars = problem.dimensions
        print("neural network benchmark")
    else:
        print("not neural network benchmark")
        problem = problem_class(num_vars)

    if use_bounder:
        kwargs["bounder"] = problem.bounder
    if "pop_init_range" in kwargs:
        kwargs["generator"] = generator
    else:
        kwargs["generator"] = problem.generator

    final_pop: list[Individual] = algorithm.evolve(
        evaluator=problem.evaluator,
        maximize=problem.maximize,
        initial_pop_storage=initial_pop_storage,
        num_vars=num_vars,
        **kwargs,
    )

    # best_guy = final_pop[0].candidate
    # best_fitness = final_pop[0].fitness
    final_pop_fitnesses: NDArray[np.float64] = np.asarray([
        guy.fitness for guy in final_pop
    ])
    final_pop_candidates: NDArray[np.float64] = np.asarray([
        guy.candidate for guy in final_pop
    ])

    sort_indexes = sorted(
        range(len(final_pop_fitnesses)), key=final_pop_fitnesses.__getitem__
    )
    final_pop_fitnesses = final_pop_fitnesses[sort_indexes]
    final_pop_candidates = final_pop_candidates[sort_indexes]

    best_guy: NDArray[np.float64] = final_pop_candidates[0]
    best_fitness: float = final_pop_fitnesses[0]

    if display:
        # Plot the parent and the offspring on the fitness landscape
        # (only for 1D or 2D functions)
        if num_vars == 1:
            plot_results_1D(
                problem,
                initial_pop_storage["individuals"],
                initial_pop_storage["fitnesses"],
                final_pop_candidates,
                final_pop_fitnesses,
                "Initial Population",
                "Final Population",
                kwargs,
            )

        elif num_vars == 2:
            plot_results_2D(
                problem,
                initial_pop_storage["individuals"],
                final_pop_candidates,
                "Initial Population",
                "Final Population",
                kwargs,
            )

    return best_guy, best_fitness, final_pop

## Exercise 1
In the first two exercises you will investigate running an EA to evolve the weights of an Artificial Neural Network (ANN). While there are other ways to learn the weights of an ANN (gradient based methods, such as backpropagation and alike), using evolution is an effective means in many circumstances. In this first exercise we will evolve the weights of a simple Feed Forward Neural Network (FFNN), while in the next one we will evolve the weights of more complex, recurrent neural nets. In both cases, we will assume a fixed topology with one input layer with as many nodes as the inputs, one output layer with a single node, and one (optional) hidden layer with a predefined number of nodes. Weights range in $[-8,8]$ (with real-valued encoding). All nodes of these Neural Networks use the logistic activation function (sigmoid):
$f(x) = \frac{1}{1+e^{-x}}$.

We begin by evolving the weights of a minimal Neural Network to solve the ``Or`` problem. That means we will use a Neural Network that has two inputs, and one output, which should produce the logical ``Or`` function of the two input values, see the truth table shown in table below.

| **Input 1** | **Input 2** | **Output** |
|:-----------:|:-----------:|:----------:|
| 0           | 0           | 0          |
| 0           | 1           | 1          |
| 1           | 0           | 1          |
| 1           | 1           | 1          |


Run the code below to evolve the weights for this ``Or`` network. The fitness here is the sum of squared errors between the network's output and the target output across each of the four input patterns. If you see the best fitness approach zero (e.g. go under 0.1) then you have found a network able to solve this problem. This most likely looks similar to what shown in the Figure below. 

![A graphical representation of the evolved Neural Network for the ``Or`` problem"](img/ann_or.png)

Here the Neural Network is depicted with its weights and biases shown by the corresponding color. If you were able to solve the ``Or`` problem, look at the weights of the Neural Network (note that, in the terminal, weights appear ordered by layer and, for each layer, by node, the bias weight being the last one) and think about / compute how it behaves when given different input patterns. Try to plug manually different couples of Input 1 and Input 2 into the network, and calculate the corresponding Output. It is important to think about this now, because it will be difficult to keep track of what our Neural Networks are doing once we start using more complex topologies.

If you were not able to solve the ``Or`` problem, try modifying some of the EA parameters (see the comments in the script), until you are able to do so.

Once you are able to solve ``Or``, try solving the ``And`` problem instead (change in the script ``problem_class=Or`` to ``problem_class=And``).

- Do the same EA parameters that you used for ``Or`` work for ``And`` as well? If not, modify them until you are able to solve ``And``.


Now that we can solve ``Or`` and ``And``, we will try something a little more challenging. Change the parameter ``problem_class`` to be ``Xor``, so that we are now trying to solve the ``Exclusive Or`` (``Xor``) function, see the truth table shown in the Table below.

| **Input 1** | **Input 2** | **Output**           |
|:-----------:|:-----------:|:--------------------:|
| 0           | 0           | 0                    |
| 0           | 1           | 1                    |
| 1           | 0           | 1                    |
| 1           | 1           |<span style="color:red">0</span>|


This function has one small, but crucial difference from ``Or`` (highlighted in red), as can be seen by comparing their truth tables.

Try running the code again after changing ``problem_class`` to ``Xor``.
- Can you solve it? If you are unable to solve it, why is that?

In this case, it is worth considering an additional parameter that can be tuned, that is the number of hidden nodes of the Neural Network (parameter ``args["num_hidden_units"]``. Try changing this parameter from 0 to 1 (this will add to the topology a hidden layer with one node).

- Does this allow you to solve the problem? What if you change this value to 2 or more?
- How many hidden nodes are required to solve this problem? Can you provide an explanation for why that is the case?

When you find a network that does compute ``Xor``, once again see if you can understand how the network does so. Try to plug manually different couples of Input 1 and Input 2 into the network, and calculate the corresponding Output.

In [None]:
from random import Random


args = {}

"""   
-------------------------------------------------------------------------
Edit this part to do the exercises
"""

# problem
problem_class = Xor  # And, Xor

# parameters for the GA
args["num_hidden_units"] = 2  # Number of hidden units of the neural network
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["crossover_rate"] = 0.8  # Crossover fraction
args["tournament_size"] = 2  # Tournament size
args["pop_size"] = 10  # Population size

args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["mutation_rate"] = 0.8  # fraction of loci to perform mutation on

# by default will use the problem's defined init_range
# uncomment the following line to use a specific range instead
# args["pop_init_range"] = [-500, 500] # Range for the initial population

args["use_bounder"] = True  # use the problem's bounder to restrict values
# comment out the previously line to run unbounded

args["max_generations"] = 100  # Number of generations of the GA
display = True  # Plot initial and final populations

"""
-------------------------------------------------------------------------
"""

args["fig_title"] = "GA"


rng = Random(x=41)

best_individual, best_fitness, final_pop = run_ga(
    rng, display=display, problem_class=problem_class, **args
)
print("Best Individual", best_individual)
print("Best Fitness", best_fitness)

if display:
    net = problem_class(args["num_hidden_units"]).net
    net.set_params(best_individual)

    ann_plotter = ANNPlotter(net)
    ann_plotter.draw()

    plt.ioff()
    plt.show()

## Exercise 2
So far we have used Neural Networks for solving tasks where the output depends *statically* on the input vector, i.e., the input-output dependency does not change over time. However, there are many tasks such as time-series forecast and some robotic applications where the input-output dependence is *dynamic*, i.e., the output of the system at time $t$ depends on the inputs at the same step, but also on the inputs at the previous time step(s).
First, we start with a modified version of the ``Or`` problem, that is called ``Temporal Or``. While the basic ``Or`` problem involved evolving a Neural Network that would give a single output when provided two simultaneous inputs, in ``Temporal Or``, there is only a single input node and the input values are provided in sequence. Therefore the network will have to remember the first input when seeing the second, in order to output the correct value.

Run the jupyter block code to solve ``Temporal Or``.
- Can you solve it? If you are unable to solve it, why is that?

In this case, notice that there is one new parameter that you can modify in the script: ``recurrent``. This parameter is a Boolean flag, that says whether the network is recurrent (in this case an Elman network, [link here](https://en.wikipedia.org/wiki/Recurrent\_neural\_network\#Elman\_networks\_and\_Jordan\_networks)) or not (in which case it is a FFNN).
- If you set ``recurrent`` to be ``True``, can you now evolve a successful network?
- Why might recurrence be important for solving a temporal problem such as this?
  
Once you have been able to evolve a network capable of solving ``Temporal Or``, you can change in the script the parameter ``problem_class`` to ``TemporalAnd``, to attempt solving a temporal version of ``And``, and repeat.
- Do the same EA parameters that solved ``Temporal Or`` also work for ``Temporal And``?
- Why, or why not?

Finally, change in the script the parameter ``problem_class`` to ``TemporalXor``, to attempt solving a temporal version of ``Xor``. Run the code again.
- Are you able to find a successful network?
- If not, think back to what you just saw in the previous exercise. What combination of recurrence and no. of hidden nodes is needed to solve ``Temporal Xor`` and why is that?

In [None]:
import matplotlib.pyplot as plt

args = {}

"""   
-------------------------------------------------------------------------
Edit this part to do the exercises
"""

# problem
problem_class = TemporalXor

# parameters for the GA
args["num_hidden_units"] = 2  # Number of hidden units of the neural network
args["recurrent"] = True  # Number of hidden units of the neural network
args["gaussian_stdev"] = 1.0  # Standard deviation of the Gaussian mutations
args["crossover_rate"] = 0.8  # Crossover fraction
args["tournament_size"] = 2  # Tournament size
args["pop_size"] = 100  # Population size

args["num_elites"] = 1  # number of elite individuals to maintain in each gen
args["mutation_rate"] = 0.5  # fraction of loci to perform mutation on

# by default will use the problem's defined init_range
# uncomment the following line to use a specific range instead
# args["pop_init_range"] = [-500, 500] # Range for the initial population

args["use_bounder"] = True  # use the problem's bounder to restrict values
# comment out the previously line to run unbounded

args["max_generations"] = 100  # Number of generations of the GA
display = True  # Plot initial and final populations

"""
-------------------------------------------------------------------------
"""

args["fig_title"] = "GA"

rng = Random(x=41)

best_individual, best_fitness, final_pop = run_ga(
    rng, display=display, problem_class=problem_class, **args
)
print("Best Individual", best_individual)
print("Best Fitness", best_fitness)

if display:
    net = problem_class(args["num_hidden_units"], args["recurrent"]).net
    net.set_params(best_individual)

    ann_plotter = ANNPlotter(net)
    ann_plotter.draw()

    plt.ioff()
    plt.show()

## Exercise 3

In this exercise we will use the Python implementation of Neural Evolution of Augmenting Topologies (NEAT) provided by *neat-python*, to solve the ``Xor`` problem we have seen in the first exercise. The main difference is that in this case we won't fix the network topology *a priori* and evolve its weights, but rather we will evolve the weights *and* the network topology itself. 

**NOTE**: In this case NEAT is configured to *maximize* the number of correct outputs in the $4$ input cases of the ``Xor`` truth table, therefore the optimal fitness value is $4$.

See the jupyter block code and try to understand its main steps. The *neat-python* library allows to configure all the algorithmic details of NEAT by means of an external configuration file. In this exercise two different configuration files **(inside folder utils/utils_08)** will be used, namely:
-  ``config-feedforward-2input-xor-noelitism.txt``
-  ``config-feedforward-2input-xor-elitism.txt``

Spend some time on one of the two files to get an idea about the main configurations that can be changed in NEAT ([see link](https://neat-python.readthedocs.io)). As you will see, in this case the two configuration files are pretty much the same, except for two parameters, namely ``species_elitism`` and ``elitism``. These represent respectively the no. of elite species (remember from the lecture that NEAT uses a **speciation** mechanism to allow mating only of *similar* networks) and elite individuals (i.e., networks) that are kept in the population. More specifically,``config-feedforward-2input-xor-noelitism.txt`` sets both parameters to zero, while ``config-feedforward-2input-xor-elitism.txt`` sets them to two, meaning that two elite species and two elite networks are kept.

The script can be configured either to run a single instance of NEAT (by setting ``num_runs=1``) on one of the two configurations, or multiple runs (by setting ``num_runs`` to values bigger than one) on one or both configurations. In the first case, you can choose one of the two configuration files (with/without elitism) and observe how a single run of each configuration performs. The script will log on the console the runtime details of the evolutionary process (to disable this feature, simply comment the line ``p.add_reporter(neat.StdOutReporter(True)``). At the end of the run, you should obtain two figures similar to those shown below. In the second case, the script will execute multiple runs (e.g. $10$) of both configurations, and then produce a boxplot comparing the best fitness obtained by each configuration at the end of the computational budget. By default, the stop condition is set to $100$ generations, see the parameter ``num_generations`` in the code. However, the algorithm has an additional stop criterion, i.e., it stops when it obtains a Neural Network whose fitness is higher than the parameter ``fitness_threshold`` in the corresponding configuration file (in this case, by default this parameter is set to $3.9$, sufficiently close to the optimal value of $4$ to be approximated by a sigmoid function).

Fitness trend for the ``Xor`` problem solved by NEAT |  Corresponding species evolution, where each stacked plot represents one species and its size over evolutionary time. Species can go extinct if their size goes to zero
- | - 
![alt](img/trend.png) | ![alt](img/species.png)

- First, run a single instance of each of the two configurations (with/without elitism, simply change ``config_files[0]`` to ``config_files[1]``). What do you observe? Is the algorithm without elitism able to converge to the optimal fitness value? What about the algorithm with elitism? What is the effect of elitism on convergence? What about the number of species and their dynamics?
- Change the parameter ``num_runs`` to $10$ or more. Does the boxplot confirm -in statistical terms- what you observed on a single run? (**NOTE**: it takes 1-2 minutes to execute 10 runs for both configurations.)

In [None]:
import os
from typing import Any
import neat
from utils.plot_utils import draw_net, plot_stats, plot_species
import matplotlib.pyplot as plt
from neat import Config

"""
2-input XOR
"""

# 2-input XOR inputs and expected outputs.
inputs = [(0.0, 0.0), (0.0, 1.0), (1.0, 0.0), (1.0, 1.0)]
outputs = [(0.0,), (1.0,), (1.0,), (0.0,)]

num_generations = 100
num_runs = 1

config_files = [
    "../../datasets/lab08/config-feedforward-2input-xor-noelitism.txt",
    "../../datasets/lab08/config-feedforward-2input-xor-elitism.txt",
]


def eval_genomes(genomes: Any, config: Config) -> None:
    for _, genome in genomes:
        genome.fitness = len(inputs)
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        for xi, xo in zip(inputs, outputs):
            output = net.activate(xi)
            genome.fitness -= (output[0] - xo[0]) ** 2


local_dir = os.path.dirname("labs/lab08")

if num_runs == 1:
    # Load configuration.
    config_file = config_files[0]
    config = neat.Config(
        neat.DefaultGenome,
        neat.DefaultReproduction,
        neat.DefaultSpeciesSet,
        neat.DefaultStagnation,
        config_file,
    )

    # Create the population, which is the top-level object for a NEAT run.
    p = neat.Population(config)

    # Add a stdout reporter to show progress in the terminal.
    stats = neat.StatisticsReporter()
    p.add_reporter(neat.StdOutReporter(True))
    p.add_reporter(stats)

    # run NEAT for num_generations
    winner = p.run(eval_genomes, num_generations)

    # Display the winning genome.
    print("\nBest genome:\n{!s}".format(winner))

    # Show output of the most fit genome against training data.
    print("\nOutput:")
    winner_net = neat.nn.FeedForwardNetwork.create(winner, config)
    for xi, xo in zip(inputs, outputs):
        output = winner_net.activate(xi)
        print("input {!r}, expected output {!r}, got {!r}".format(xi, xo, output))

    node_names = {-1: "A", -2: "B", 0: "A XOR B"}
    draw_net(config, winner, filename="2-input OR", view=True, node_names=node_names)
    plot_stats(stats, ylog=False, view=True)
    plot_species(stats, view=True)

else:
    results = []
    for file in config_files:
        # Load configuration.
        config_file = file
        config = neat.Config(
            neat.DefaultGenome,
            neat.DefaultReproduction,
            neat.DefaultSpeciesSet,
            neat.DefaultStagnation,
            config_file,
        )

        best_fitnesses: list[float] = []
        for i in range(num_runs):
            print("{0}/{1}".format(i + 1, num_runs))
            p = neat.Population(config)
            winner = p.run(eval_genomes, num_generations)
            best_fitnesses.append(winner.fitness)  # type: ignore
        results.append(best_fitnesses)

    fig = plt.figure("NEAT")
    ax = fig.gca()
    ax.boxplot(results)
    ax.set_xticklabels(["Without elitism", "With elitism"])
    # ax.set_yscale('log')
    ax.set_xlabel("Condition")
    ax.set_ylabel("Best fitness")
    plt.show()

## Exercise 4
In this exercise we use again NEAT, this time to solve a 3-input Boolean function described by the truth table shown in the Table below. This function returns $1$ if and only if only one input is equal to $1$, otherwise it returns $0$.

| **Input 1** | **Input 2** | **Input 3** | **Output** |
|:-----------:|:-----------:|:-----------:|:----------:|
| 0           | 0           | 0           | 0          |
| 0           | 0           | 1           | 1          |
| 0           | 1           | 0           | 1          |
| 0           | 1           | 1           | 0          |
| 1           | 0           | 0           | 1          |
| 1           | 0           | 1           | 0          |
| 1           | 1           | 0           | 0          |
| 1           | 1           | 1           | 0          |


This script has the same structure that we have seen in the previous exercise. In this exercise two different configuration files will be used, namely:
- ``config-feedforward-3input-function-nohidden.txt``
- ``config-feedforward-3input-function-hidden.txt``

In this case the only difference between the two configurations (which both use elitism) is the number of hidden nodes to add to each genome in the initial population (parameter ``num_hidden``), which is set respectively to $0$ and $3$ (also, note that the parameter ``num_inputs`` is set to $3$ to allow the use of $3$ inputs, while in the previous exercise it was set to $2$). Note that as in this case the optimal fitness value is $8$ (i.e., the Neural Network output is correct in all 8 input cases), in both configuration files the parameter ``fitness_threshold`` is set to $7.9$.

- What do you observe in this case when you execute a single run of each configuration? What is the effect of using hidden nodes in the initial population?
- What happens when you configure the script to execute multiple runs? Does the boxplot confirm -in statistical terms- what you observed on a single run? (**NOTE**: it takes 1-2 minutes to execute 10 runs for both configurations.)

In [None]:
import neat
from utils.plot_utils import draw_net, plot_stats, plot_species
from neat import Config
from typing import Any
import matplotlib.pyplot as plt

# 3-input Boolean function inputs and expected outputs.
inputs = [
    (0.0, 0.0, 0.0),
    (0.0, 0.0, 1.0),
    (0.0, 1.0, 0.0),
    (0.0, 1.0, 1.0),
    (1.0, 0.0, 0.0),
    (1.0, 0.0, 1.0),
    (1.0, 1.0, 0.0),
    (1.0, 1.0, 1.0),
]
outputs = [(0.0,), (1.0,), (1.0,), (0.0,), (1.0,), (0.0,), (0.0,), (0.0,)]

num_generations = 100
num_runs = 1

config_files = [
    "../../datasets/lab08/config-feedforward-3input-function-nohidden.txt",
    "../../datasets/lab08/config-feedforward-3input-function-hidden.txt",
]


def eval_genomes(genomes: Any, config: Config) -> None:
    for _, genome in genomes:
        genome.fitness = len(inputs)
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        for xi, xo in zip(inputs, outputs):
            output = net.activate(xi)
            genome.fitness -= (output[0] - xo[0]) ** 2


if num_runs == 1:
    # Load configuration.
    config_file = config_files[0]
    config = neat.Config(
        neat.DefaultGenome,
        neat.DefaultReproduction,
        neat.DefaultSpeciesSet,
        neat.DefaultStagnation,
        config_file,
    )

    # Create the population, which is the top-level object for a NEAT run.
    p = neat.Population(config)

    # Add a stdout reporter to show progress in the terminal.
    stats = neat.StatisticsReporter()
    p.add_reporter(neat.StdOutReporter(True))
    p.add_reporter(stats)

    # run NEAT for num_generations
    winner = p.run(eval_genomes, num_generations)

    # Display the winning genome.
    print("\nBest genome:\n{!s}".format(winner))

    # Show output of the most fit genome against training data.
    print("\nOutput:")
    winner_net = neat.nn.FeedForwardNetwork.create(winner, config)
    for xi, xo in zip(inputs, outputs):
        output = winner_net.activate(xi)
        print("input {!r}, expected output {!r}, got {!r}".format(xi, xo, output))

    node_names = {-1: "A", -2: "B", -3: "C", 0: "f(A,B,C)"}
    draw_net(
        config,
        winner,
        filename="3-input Bool function",
        view=True,
        node_names=node_names,
    )
    plot_stats(stats, ylog=False, view=True)
    plot_species(stats, view=True)

else:
    results = []
    for file in config_files:
        # Load configuration.
        config_file = file
        config = neat.Config(
            neat.DefaultGenome,
            neat.DefaultReproduction,
            neat.DefaultSpeciesSet,
            neat.DefaultStagnation,
            config_file,
        )

        best_fitnesses = []
        for i in range(num_runs):
            print("{0}/{1}".format(i + 1, num_runs))
            p = neat.Population(config)
            winner = p.run(eval_genomes, num_generations)
            best_fitnesses.append(winner.fitness)  # type: ignore
        results.append(best_fitnesses)

    fig = plt.figure("NEAT")
    ax = fig.gca()
    ax.boxplot(results)
    ax.set_xticklabels(["Without hidden nodes", "With hidden nodes"])
    # ax.set_yscale('log')
    ax.set_xlabel("Condition")
    ax.set_ylabel("Best fitness")
    plt.show()

## Instruction and questions
Concisely note down your observations from the previous exercises (follow the bullet points) and think about the following questions. 
- What is the genotype and what is the phenotype in the problems considered in this lab?
- Why are hidden nodes sometimes needed for a Neural Network to solve a given task? What is the defining feature of problems that networks without hidden nodes are unable to solve?
- Why are recurrent connections needed to solve certain problems? What is the defining feature of problems that networks without recurrent connections are unable to solve? Are there problems that require recurrent connections and multiple hidden nodes?