# Research Mode

In this tutorial IPython Notebook, we will cover the more advanced functionalities of Gaggle. In particular, this is for users that are familiar with Pytorch and are looking for research-grade custom capabilities.

To do so, we will go over the different modules of gaggle, their purpose and how they should be interacted with. In the introductory tutorial, we covered the supervisor, which is essentially a wrapper for most of gaggle's functionalities for beginners. Now we'll first look at a basic training script and the different arguments classes that are used to implement smooth and easy config file support.

Below is an overview of the topic covered
- [Arguments and Training Script](#args)
- [Config File](#config_file)
- [Problem](#problem)
- [Classification Problem](#problem)
- [Reinforcement Learning](#rl)
- [Leap Problems](#leap)
- [GA](#ga)
- [Population Manager](#population_manager)
- [Selection](#selection)
- [Crossover](#crossover)
- [Mutation](#mutation)
- [Registering Custom Operators](#registering_custom_operators)


## Arguments and Training Script <a class="anchor" id="args"></a>

Below is a basic example training script that can also be found in examples folder of the gaggle github repository as train.py.

In [19]:
from gaggle.ga import GA
from gaggle.ga.ga_factory import GAFactory
from gaggle.arguments.config_args import parse_args


def train():
    """ Train a model from scratch on a data. """
    outdir_args, sys_args, individual_args, problem_args, ga_args, config_args = parse_args()
    if config_args.exists():
        outdir_args, sys_args, individual_args, problem_args, ga_args = config_args.get_args()

    trainer: GA = GAFactory.from_ga_args(ga_args=ga_args, problem_args=problem_args, sys_args=sys_args,
                                         outdir_args=outdir_args, individual_args=individual_args)
    trainer.train()


Let's first focus on line 8. 'outdir_args, sys_args, individual_args, problem_args, ga_args, config_args = parse_args()'.
This line does two things, it generates the different argument objects our framework uses, you can find a more thorough description of them in the documentation. It also reads whatever command line arguments that were set when running the script and updates the relevant arguments with the values mentioned in te command line.
This allows to run the code from the command line without a config file like you would a standard python script.

Then for line 9 and 10, it checks if the config_path argument was set in the command line. If that's the case, it overwrites the arguments with whatever was set in the config file. Beware, this means that any other command line argument that was passed also gets overwritten and therefore needs to be specified in the config file instead.

Finally, in lines 12 and 14, we initialize the GA by passing it the generated argument objects. The GA will take care of setting up the entire pipeline according to the given arguments. Line 14 tells the GA to start training.

### Config File <a id='config_file'></a>

We'll now see how to structure a standard experiment config file for our framework. Below is a simple example config file to train a population of models on the mnist dataset. This should be stored in a .yml file. In this case it can be train_mnist_lenet.yml for example.

<code>
individual_args:
  model_name: lenet

problem_args:
  problem_name: MNIST
  batch_size: 50000
  eval_batch_size: 10000

ga_args:
  population_size: 200
  num_parents: 200
  crossover: uniform
  selection: weighted
  mutation: normal
  mutation_std: 0.02
  mutation_chance: 0.01
  parent_survival_rate: 0.5
  elitism: 0.1
  generations: 1000
  eval_every_generation: 500

sys_args:
  device: cuda

output_dir:
  root: ../experiments   # creates this root directory for the outputs
  name: mnist            # the name of the experiment (optional)
</code>

We can now run the original training script with the above config by just running the following:

<pre><code>python3 train.py --config_path ./train_mnist_lenet.yml </code></pre>

Now that we've covered how to setup a basic training script and a configuration file for it, we'll dive deeper into how gaggle works on how to customize it for your own research/experimentation needs.
To that purpose, we'll first cover the Problem module/class.



## Problem
<a class="anchor" id='problem'></a>

The problem module and class encompass everything that represents the objective/fitness function that needs to be solved. From the simple Rastrigin benchmark function (as seen in the introductory notebook) to more complex Image Classification tasks like MNIST or CIFAR10.

All in all, Gaggle's problems can be summarized in 4 different types:
- classification
- rl (reinforcement learning)
- leap (other GA framework whose problems we also support)
- custom

We'll therefore first cover how to create and register one of each problem in gaggle's framework.

### Classification Problem

Below, I'll put the classification problem code (as of version 0.0.1) so we can go through it together and see what does what and what needs to be modified to implement a custom problem.

In [2]:
from gaggle.problem.problem import Problem
from gaggle.population import Individual, PopulationManager
from gaggle.arguments import ProblemArgs, SysArgs
from gaggle.problem.dataset import DatasetFactory
from gaggle.utils.smooth_value import SmoothedValue
from gaggle.utils.metrics import accuracy
import torch


class ClassificationProblem(Problem):
    """A Problem that represents a standard Machine Learning classification problem. It stores the associated
    training and validation dataset. Population evaluation optimized for GPU by default to speed up training.
    To create a classification problem with a custom dataset, register said dataset in the DatasetFactory.

    """
    def __init__(self, problem_args: ProblemArgs = None, sys_args: SysArgs = None):
        super(ClassificationProblem, self).__init__(problem_args, sys_args)
        self.train_dataset = DatasetFactory.from_problem_args(problem_args, train=True)
        self.train_data, self.train_transforms = self.train_dataset.get_data_and_transform()
        if self.problem_args.batch_size == -1:
            # this means use the entire dataset without batching
            self.problem_args.batch_size = self.train_data[0].size(0)

        if self.problem_args.batch_size == self.train_data[0].size(0):
            # we move everything to the gpu and let it live on the gpu
            print(f"Batching is not necessary, will store the entire data on device: {sys_args.device}")
            self.train_data = (self.train_data[0].to(self.sys_args.device), self.train_data[1].to(
                self.sys_args.device))

        self.eval_dataset = DatasetFactory.from_problem_args(problem_args, train=False)
        self.eval_data, self.eval_transforms = self.eval_dataset.get_data_and_transform()
        if self.problem_args.eval_batch_size == -1:
            self.problem_args.eval_batch_size = self.eval_data[0].size(0)

        self.current_batch = None
        self.fitness_function = accuracy

    @torch.no_grad()
    def evaluate_population(self, population_manager: PopulationManager,
                            use_freshness: bool = True, update_manager: bool = True, train: bool = True,
                            *args, **kwargs) -> dict[int: float]:
        """Population evaluation optimized for GPU by default to speed up training. Should only be modified if
        specific custom behavior is desired. It is usually not recommend to modify this function.

        Args:
            population_manager:
            use_freshness:
            update_manager:
            train:
            *args:
            **kwargs:

        Returns:
            The dictionary of individual fitnesses
        """
        all_data = self.train_data if train else self.eval_data
        transforms = self.train_transforms if train else self.eval_transforms
        batch_size = self.problem_args.batch_size if train else self.problem_args.eval_batch_size
        num_inputs = all_data[0].size(0)

        fitness = {}
        for i in range(population_manager.population_size):
            if population_manager.is_fresh(i) and use_freshness:
                fitness[i] = SmoothedValue()
            elif not use_freshness:
                fitness[i] = SmoothedValue()

        num_batches = num_inputs // batch_size
        rest = num_inputs % batch_size
        for j in range(num_batches):
            data = all_data[0][j * batch_size:(j + 1) * batch_size].to(self.sys_args.device)
            data = transforms(data)
            targets = all_data[1][j * batch_size:(j + 1) * batch_size].to(self.sys_args.device)
            self.current_batch = (data, targets)
            for k in list(fitness.keys()):
                fitness[k].update(self.evaluate(population_manager.get_individual(k), *args, **kwargs), n=batch_size)

        if rest > 0:
            data = transforms(all_data[0][-rest:].to(self.sys_args.device))
            targets = all_data[1][-rest:].to(self.sys_args.device)
            self.current_batch = (data, targets)
            for l in list(fitness.keys()):
                fitness[l].update(self.evaluate(population_manager.get_individual(l), *args, **kwargs), n=batch_size)

        for m in list(fitness.keys()):
            fitness[m] = fitness[m].global_avg
            if update_manager:
                population_manager.set_individual_fitness(m, fitness[m])
                if use_freshness:
                    population_manager.set_freshness(m, False)

        if train and use_freshness:
            return population_manager.get_fitness()

        return fitness

    @torch.no_grad()
    def evaluate(self, individual: Individual, train: bool = True, *args, **kwargs) -> float:
        """Evaluates an individual on the current batch of data.

        Args:
            individual:
            train: whether we are currently training or performing an inference.
            *args:
            **kwargs:

        Returns:

        """
        if train:
            individual.train()
        else:
            individual.eval()
        x, y = self.current_batch
        x, y = x.to(self.sys_args.device), y.to(self.sys_args.device)
        y_pred = individual(x)
        return self.fitness_function(y_pred, y).cpu().item()


A ClassificationProblem stores the datasets that are used to perform classification and it gets them from the DatasetFactory as seen in line 18 and 30.
By default, the fitness_function for the classification is the accuracy metric, but this can be easily changed as we'll see below.

First let's say we have a custom dataset, and we want to create a ClassificationProblem out of it for a model to evolve to solve.

In [3]:
from gaggle.problem.dataset import DataWrapper, Dataset, DatasetFactory

def build_dataset(train: bool = True):
    if train:
        # Train dataset
        train_data = torch.rand((50, 3, 32, 32))
        train_target = torch.randint(low=0, high=10, size=(50, 10)).to(torch.float)  # generating labels

        # we wrap the data and targets to create a dataset like object, for users familiar with pytorch, this is similar to a TensorDataset
        train_dataset = DataWrapper(data=train_data, targets=train_target)
        return train_dataset
    else:
        # Test dataset
        test_data = torch.rand((100, 3, 32, 32))
        test_target = torch.randint(low=0, high=10, size=(500, 10)).to(torch.float)  # generating labels

        test_dataset = DataWrapper(data=test_data, targets=test_target)
        return test_dataset

# We can now create our custom dataset
# any dataset always needs at least a problem args object, whether it is train or test and a sys args object.
# all the default functions we need will already be setup since we inherit from our Dataset class. So all we need to do is specify what the dataset attribute is.
class CustomDataset(Dataset):
    def __init__(self, problem_args: ProblemArgs = None, train: bool = True, sys_args: SysArgs = None):
        super(CustomDataset, self).__init__(problem_args, train, sys_args)
        self.dataset = build_dataset(train)


# now that our custom dataset is finally built, all we need to do is register it with our DatasetFactory class so that it can be built automatically.
# to register it, we need to give it a unique name, we can see what are the current registered datasets:
print(DatasetFactory.get_keys())

# for now we'll set its name to custom
key_name = "custom_dataset"

DatasetFactory.update(key=key_name, dataset=CustomDataset)  # make sure to pass the uninitialized class as the dataset gets re-initialized at runtime for both the testing and training datasets.

print(DatasetFactory.get_keys())


['CIFAR10', 'MNIST']
['CIFAR10', 'MNIST', 'custom_dataset']


The code above allows us to introduce a new custom dataset to our framework, now what if we want to change how the ClassificationProblem behaves. For example, we want to change the fitness function, although what I will showcase applies to any other significant change we want to make to the class' behavior.

So first, we want to create a new problem class. We want to inherit the behavior of the ClassificationProblem.

Now, let's change the fitness function to a new function. Let's take for example Pytorch's CrossEntropyLoss function.

In [4]:
import torch.nn as nn

class CustomClassificationProblem(ClassificationProblem):
    def __init__(self, problem_args: ProblemArgs = None, sys_args: SysArgs = None):
        super(CustomClassificationProblem, self).__init__(problem_args, sys_args)
        self.fitness_function = nn.CrossEntropyLoss()


Now we want to register this new problem to our framework, so we can use it with our original training script.
This will also allow us to explore a second type of problem at the same time, custom problems.

The main function we'll use for this is:
<code>ProblemFactory.register_problem</code>

It takes in 3 arguments:
- problem_type: (str) the type of problem to register. It has to be one of the types within <pre><code>ProblemFactory.registrable_problems</code></pre>
- problem_name: (str) the name that we want to give our problem. It has to be unique and not taken already by any of the other problems in any of the other problem types. This also includes custom dataset names registered through the DatasetFactory as the ProblemFactory scans the DatasetFactory to generate the classification problems.
- problem: (Callable - Ideally unintialized Problem object) the actual problem class that will get instantiated when problem_name is requested
- If any initialization time arguments are required to run your problem, they can be passed as *args and or **kwargs to the register_problem function.

We can now register the problem

In [5]:
from gaggle.problem import ProblemFactory

# we cannot call it custom_dataset as this was already registered when we registered the custom Dataset
print(ProblemFactory.problems)
ProblemFactory.register_problem("custom", "custom_dataset_problem", CustomClassificationProblem)
print(ProblemFactory.problems)


{'classification': {'CIFAR10': <class 'gaggle.problem.dataset.base_datasets.cifar10.CIFAR10'>, 'MNIST': <class 'gaggle.problem.dataset.base_datasets.mnist.MNIST'>, 'custom_dataset': <class '__main__.CustomDataset'>}, 'rl': {'cartpole': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb63f730>}, 'leap': {}, 'custom': {}}
{'classification': {'CIFAR10': <class 'gaggle.problem.dataset.base_datasets.cifar10.CIFAR10'>, 'MNIST': <class 'gaggle.problem.dataset.base_datasets.mnist.MNIST'>, 'custom_dataset': <class '__main__.CustomDataset'>}, 'rl': {'cartpole': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb63f730>}, 'leap': {}, 'custom': {'custom_dataset_problem': (<class '__main__.CustomClassificationProblem'>, (), {})}}


Now we should be able to run
<pre><code>python3 train.py --problem_name custom_dataset_problem</code></pre>
(with the other additional other necessary parameters).

### Reinforcement Learning
<a id='rl'></a>

For reinforcement learning problems, in particular OpenAI gym environment or any object whose class implements the Open AI environment API (reset, step, render etc...), we can use a process similar to custom datasets to register them

In [6]:
from gaggle.problem.environment import GymWrapper, EnvironmentFactory

# the gym wrapper is a simple class that takes in the name of the gym environment and has its __call__ return the initialized gym environment.
# If you are using your own environment that implements OpenAI gym environment's API, then you don't need to use the GymWrapper, just pass in the uninitialized class instead

env = GymWrapper("Pendulum-v1")

print(ProblemFactory.problems)

EnvironmentFactory.update(key="pendulum", environment=env)

print(ProblemFactory.problems)


{'classification': {'CIFAR10': <class 'gaggle.problem.dataset.base_datasets.cifar10.CIFAR10'>, 'MNIST': <class 'gaggle.problem.dataset.base_datasets.mnist.MNIST'>, 'custom_dataset': <class '__main__.CustomDataset'>}, 'rl': {'cartpole': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb63f730>}, 'leap': {}, 'custom': {'custom_dataset_problem': (<class '__main__.CustomClassificationProblem'>, (), {})}}
{'classification': {'CIFAR10': <class 'gaggle.problem.dataset.base_datasets.cifar10.CIFAR10'>, 'MNIST': <class 'gaggle.problem.dataset.base_datasets.mnist.MNIST'>, 'custom_dataset': <class '__main__.CustomDataset'>}, 'rl': {'cartpole': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb63f730>, 'pendulum': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb4ab550>}, 'leap': {}, 'custom': {'custom_dataset_problem': (<class '__main__.CustomClassificationProblem'>, (), {})}}


We can now get the environment by setting <code>problem_name</code> to <code>pendulum</code>


# LEAP Problems
<a id='leap'></a>

LEAP (or leap_ec) is another python library for Genetic Algorithms that provides a wide library of existing problems.
In order to make it easy for people that are familiar to LEAP to use any one of their problems directly into our framework, we developed a conversion system that converts a LEAP problem into a Gaggle problem.

We take for example below a fixed version of the RastriginProblem from the leap_ec library

In [7]:
# first the necessary leap code, so you don't have to download the leap package just for this simple example
from abc import ABC, abstractmethod
from math import isclose, isnan
import random


class Problem(ABC):
    """
        Abstract Base Class used to define problem definitions.

        A `Problem` is in charge of two major parts of an EA's behavior:

         1. Fitness evaluation (the `evaluate()` method)

         2. Fitness comparision (the `worse_than()` and `equivalent()` methods)
    """

    def __init__(self):
        super().__init__()

    @abstractmethod
    def evaluate(self, phenome, *args, **kwargs):
        """
        Evaluate the given individual based on its decoded phenome.

        Practitioners *must* over-ride this member function.

        Note that by default the individual comparison operators assume a
        maximization problem; if this is a minimization problem, then just
        negate the value when returning the fitness.

        :param phenome:
        :return: fitness
        """
        raise NotImplementedError

    def evaluate_multiple(self, phenomes):
        """Evaluate multiple individuals all at once, returning a list of fitness
        values.

        By default this just calls `self.evaluate()` multiple times.  Override this
        if you need to, say, send a group of individuals off to parallel """
        return [ self.evaluate(p) for p in phenomes ]

    @abstractmethod
    def worse_than(self, first_fitness, second_fitness):
        raise NotImplementedError

    @abstractmethod
    def equivalent(self, first_fitness, second_fitness):
        raise NotImplementedError


##############################
# Class ScalarProblem
##############################
class ScalarProblem(Problem):
    def __init__(self, maximize):
        super().__init__()
        self.maximize = maximize

    def worse_than(self, first_fitness, second_fitness):
        """
            Used in Individual.__lt__().

            By default returns first_fitness < second_fitness if a maximization
            problem, else first_fitness > second_fitness if a minimization
            problem.  Please over-ride if this does not hold for your problem.

            :return: true if the first individual is less fit than the second
        """
        # NaN is assigned if the individual is non-viable, which can happen if
        # an exception is thrown during evaluation. We consider NaN fitnesses to
        # always be the worse possible with regards to ordering.
        if isnan(first_fitness):
            if isnan(second_fitness):
                # both are nan, so to reduce bias flip a coin to arbitrarily
                # select one that is worst.
                return random.choice([True, False])
            # Doesn't matter how awful second_fitness is, nan will already be
            # considered worse.
            return True
        elif isnan(second_fitness):
            # No matter how awful the first_fitness is, if it's not a NaN the
            # NaN will always be worse
            return False

        # TODO If we accidentally pass an Individual in as first_ or second_fitness,
        # TODO then this can result in an infinite loop.  Add some error
        # handling for this.
        if self.maximize:
            return first_fitness < second_fitness
        else:
            return first_fitness > second_fitness

    def equivalent(self, first_fitness, second_fitness):
        """
            Used in Individual.__eq__().

            By default returns first.fitness== second.fitness.  Please
            over-ride if this does not hold for your problem.

            :return: true if the first individual is equal to the second
        """

        # Since we're comparing two real values, we need to be a little
        # smarter about that.  This will return true if the difference
        # between the two is within a small tolerance. This also handles
        # NaNs, inf, and -inf.
        if type(first_fitness) == float and type(second_fitness) == float:
            return isclose(first_fitness, second_fitness)
        else: # fallback if one or more are not floats
            return first_fitness == second_fitness



In [8]:
import numpy as np


class RastriginProblem(ScalarProblem):
    """ Modified to include negative fittness (LEAP had a bug)
    """
    """ Standard bounds."""
    bounds = (-5.12, 5.12)
    #NOTE we changed maximize to true
    def __init__(self, a=10.0, maximize=True):
        super().__init__(maximize)
        self.a = a
    def evaluate(self, phenome):
        """
        Computes the function value from a real-valued list phenome:

        >>> phenome = [1.0/12, 0]
        >>> RastriginProblem().evaluate(phenome) # doctest: +ELLIPSIS
        0.1409190406...

        :param phenome: real-valued vector to be evaluated
        :returns: its fitness
        """
        #NOTE: We made negative as it is wrong
        if isinstance(phenome, np.ndarray):
            return - (self.a * len(phenome) + \
                np.sum(phenome ** 2 - self.a * np.cos(2 * np.pi * phenome)))
        return self.a * \
            len(phenome) + sum([x ** 2 - self.a *
                                np.cos(2 * np.pi * x) for x in phenome])

    def worse_than(self, first_fitness, second_fitness):
        return super().worse_than(first_fitness, second_fitness)

    def __str__(self):
        return RastriginProblem.__name__

Now that we've defined the Leap Problem, we can import it into our framework

In [9]:
print(ProblemFactory.problems)

ProblemFactory.convert_and_register_leap_problem(problem_name="rastrigin", leap_problem=RastriginProblem)

print(ProblemFactory.problems)


{'classification': {'CIFAR10': <class 'gaggle.problem.dataset.base_datasets.cifar10.CIFAR10'>, 'MNIST': <class 'gaggle.problem.dataset.base_datasets.mnist.MNIST'>, 'custom_dataset': <class '__main__.CustomDataset'>}, 'rl': {'cartpole': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb63f730>, 'pendulum': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb4ab550>}, 'leap': {}, 'custom': {'custom_dataset_problem': (<class '__main__.CustomClassificationProblem'>, (), {})}}
{'classification': {'CIFAR10': <class 'gaggle.problem.dataset.base_datasets.cifar10.CIFAR10'>, 'MNIST': <class 'gaggle.problem.dataset.base_datasets.mnist.MNIST'>, 'custom_dataset': <class '__main__.CustomDataset'>}, 'rl': {'cartpole': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb63f730>, 'pendulum': <gaggle.problem.environment.environment_factory.GymWrapper object at 0x7faacb4ab550>}, 'leap': {'rastrigin': (<gaggle.problem.leap.leap_p

And that's it!
Now you can use that problem with Gaggle by specifying
<code>problem_name</code> as <code>rastrigin</code>

## GA
<a id='ga'></a>

Now that we've covered the Problem module/class we can move on to solving said problems! To do so, we'll look at the GA module/class.
The GA class controls your overall evolution algorithm, how and when each operator is called as well as gathering and storing training metrics.
Any custom implemented GA needs to inherit from the GA class (gaggle.ga.GA). There is only one abstract method to fill in: train.

We'll go over a basic re-implementation of the SimpleGA with only its core components to see how things are pieced together:

In [10]:
from typing import Callable, List
import time

import torch

from gaggle.arguments import GAArgs, SysArgs, IndividualArgs, ProblemArgs
from gaggle.arguments.outdir_args import OutdirArgs
from gaggle.population.population_manager import PopulationManager
from gaggle.utils.special_print import print_dict_highlighted
from gaggle.operators import Crossover, Mutation, Selection
from gaggle.ga import GA
from gaggle.problem import Problem


class SimpleGA(GA):
    r"""Implements a Simple Genetic Algorithm following Mitchell.

    """
    def __init__(self, population_manager: PopulationManager = None, ga_args: GAArgs = None,
                 selection: Selection = None, crossover: Crossover = None, mutation: Mutation = None,
                 problem_args: ProblemArgs = None, sys_args: SysArgs = None, outdir_args: OutdirArgs = None,
                 individual_args: IndividualArgs = None, problem: Problem = None):
        super(SimpleGA, self).__init__(population_manager, ga_args, selection, crossover, mutation, problem_args,
                                       sys_args, outdir_args, individual_args, problem)
        self.best = 0

    def train_one_generation(self):
        """
        Standard one generation GA pipeline
        """
        self.population_manager.train()
        train_fitness = self.problem.evaluate_population(self.population_manager,
                                                         use_freshness=self.ga_args.use_freshness, update_manager=True,
                                                         train=True)
        self.population_manager = self.selection_fn.select_all(self.population_manager,
                                                          self.crossover_fn.mates_per_crossover,
                                                          self.crossover_fn.children_per_crossover)
        self.population_manager = self.crossover_fn.crossover_pop(self.population_manager)
        self.population_manager = self.mutation_fn.mutate_pop(self.population_manager)
        return train_fitness

    def train(self, test: bool = True,
              callbacks: List[Callable] = None,
              display_train_metrics: bool = True,
              display_test_metrics: bool = True):
        """Call to begin the training process of the population using the arguments stored in this SimpleGA object.

        Args:
            test:
            callbacks:
            display_train_metrics:
            display_test_metrics:

        Returns:

        """
        print(f"Genome size: {self.population_manager.get_gene_count():.3e} params")

        print_dict_highlighted(vars(self.ga_args))

        if callbacks is None:
            callbacks = []

        for generation in range(self.ga_args.generations):
            train_fitness = self.train_one_generation()
            print(f"Generation {generation}: fitness = {train_fitness}")
            for callback in callbacks:
                callback(generation)


In the <code>train</code> method, we have a simple loop over the generations where for each generation we call another method <code>train_one_generation</code>. <code>train_one_generation</code> implements the standard one generation GA pipeline.
As you can notice, the different operators are called with the <code>population_manager</code> as an argument. In Gaggle's structure, the population manager holds and manages the population of individuals to evolve. In a way, it's a fancy datastructure with a couple of additional functionalities.

So the standard simple ga pipeline goes like this:
- get the population's fitness by evaluating the population
- perform selection
- perform crossover
- perform mutation
- return the fitness

When implementing you own GA, you can obviously play around and change the order of the operators, ignore some operators, add some extra ones etc...

Whenever you implement a new GA, you can register it in the GAFactory as shown below.


In [11]:
from gaggle.ga import GAFactory

print(GAFactory.gas)
GAFactory.update("custom_simple_ga", SimpleGA)
print(GAFactory.gas)


{'simple': <class 'gaggle.ga.simple_ga.SimpleGA'>}
{'simple': <class 'gaggle.ga.simple_ga.SimpleGA'>, 'custom_simple_ga': <class '__main__.SimpleGA'>}


We'll now go over population manager as it'll be helpful to understand how the operators operate on the population.

### Population Manager
<a id='population_manager'></a>

The Population Manager is a data structure that holds 3 main attributes and allows for the operators to communicate with each other while maintaining a simple API.
The 3 main attributes are:
- <code>population</code>: a python dictionary that stores the population of individuals where the key is the individual's unique id. Ids are typically <code>[0, 1, 2, ..., ga_args.population_size - 1]</code>. So we have <code>{id: Individual}</code>.
- <code>fitness</code>: a dictionary that stores key-value pairs of individual id and their fitness.
- <code>fresh</code>: a dictionary that stores key-value pairs of individual id and their freshness.

Freshness represents whether the fitness has already been computed since the model was last changed or not. We say a model is fresh if its fitness has not yet been updated and we say it is not fresh otherwise.
This is used when evaluating fitness to save time, as only fresh models have their fitness computed. This is relevant when things like elitism are used, since some models survive unchanged and therefore don't need to have their fitness recomputed. Freshness is optional and can be turned off by setting <code>use_freshness</code> to <code>False</code>.

Then we have the different buffers that the operators can use to communicate with one another:
- <code>protected</code>: a list that stores ids of individuals to be protected. In this case, protected means that the individuals are to survive this generation unscathed. This is what would be used to implement elitism, where the top performers are added to the list of protected models and therefore always make it to the next generation unmodified.
- <code>parents</code>: stores the parents selected during the selection operator. It stores it as a list of unique id (even if the parents appears multiple times during the selection process). This should be used to check if an individual is a parent but not be used to actually perform the crossovers. Instead, <code>mating_tuples</code> should be used for that.
- <code>mating_tuples</code>: a list that stores tuples of individual ids. Selection should generate this list of tuples of ids where each tuple represents the parents for a single breeding. This list of tuples is then used by the crossover operator to generate the new population where each individual is bread from a tuple (excluding the protected members).
- <code>to_mutate</code>: list of individual ids that stores which individuals should be affected by the mutation operator. This should be set usually by the crossover operator. Depending on whether <code>ga_args.mutate_protected</code> is set to True or not, the protected models can also be mutated.

We will now go over each type of operator and how they should work with/update those buffers and attributes. But first we need to specify that in case a custom operator requires an extra buffer for communication across operators, there are <code>create_buffer</code>, <code>get_buffer</code> and <code>update_buffer</code> methods in the population manager that allow for custom buffers.

### Selection
<a id='selection'></a>

Since it usually happens at the start of the pipeline, we'll start with selection! Below I copy-pasted the code for the base Selection object and I'll explain after.

In [12]:
import copy
from abc import abstractmethod

from gaggle.arguments.ga_args import GAArgs
from gaggle.population.population_manager import PopulationManager


class Selection:
    r""" The parent class for any Selection Operator.
    It gives functions to select the protected (elitism) set and to select the parents for crossover.

    """
    def __init__(self, ga_args: GAArgs = None):
        self.ga_args = ga_args if ga_args is not None else GAArgs()
        self.num_parents = ga_args.num_parents

    def select_protected(self, manager: PopulationManager) -> PopulationManager:
        """ By default, the select protected is elitism, to turn off elitism, set elitism to 0. in ga_args."""
        elitism = self.ga_args.elitism
        num_to_protect = int(elitism * manager.population_size)

        # we now do a topk selection process
        fitness = copy.deepcopy(manager.get_fitness())

        topk_indices = []
        for i in range(num_to_protect):
            best_idx = max(fitness, key=fitness.get)
            topk_indices.append(best_idx)
            fitness.pop(best_idx)

        manager.update_protected(new_protected=topk_indices)

        return manager

    @abstractmethod
    def select_parents(self, manager: PopulationManager, mates_per_crossover: int, children_per_crossover: int) -> PopulationManager:
        """ Should select both the parents and the mating tuples """
        raise NotImplementedError

    def select_all(self, manager: PopulationManager, mates_per_crossover: int, children_per_crossover: int) -> PopulationManager:
        """Calls both the protoected and parent selection fucntions"""
        manager = self.select_protected(manager)
        manager = self.select_parents(manager, mates_per_crossover, children_per_crossover)
        return manager


A selection operator is always needs at least the <code>GAArgs</code> to be initialized. Although if none is given, it will take the default values for it.

It has three methods, one of which is abstract and needs to be implemented for specific Selections. The other two can be modified if necessary to induce custom behavior, however their default behavior should work well enough for most applications.
I will go over them in order.

- <code>select_protected</code> has implements fitness-based elitism by default. It uses <code>ga_args.elitism</code> as the percentage of the best performing individuals to protect for the next generation. This can be changed to implement other kinds of elitisms or protection-based behaviors. If using the default implementation, all that needs to be done to turn off elitism is to set <code>ga_args.elitism</code> to 0.
- <code>select_all</code> is just a wrapper that calls both <code>select_protected</code> and <code>select_parents</code> to give a one-line interface for the GA.
- <code>select_parents</code> is where we set the <code>manager.mating_tuples</code> and <code>manager.parents</code> lists. This lets the crossover operator know what to crossover over. Below is an example of an implementation for a custom selection where we implement a standard roulette wheel selection where each individual's likehood of being selected is proportional to their fitness. This selection is also available by default with Gaggle as "weighted" for <code>ga_args.selection</code>

In [13]:
from numpy.random import choice


class WeightedSelection(Selection):
    r"""Standard Roulette Wheel selection
    Probability of selection is fittness/total fittness
    If negative fittness all fittness values are shifted to make positive

    """
    def __init__(self, ga_args: GAArgs = None):
        super(WeightedSelection, self).__init__(ga_args=ga_args)

    def select_parents(self, manager: PopulationManager, mates_per_crossover: int, children_per_crossover: int) -> PopulationManager:
        fitness = copy.deepcopy(manager.get_fitness())
        min_fit = min(fitness.items(), key=lambda x: x[1])[1]
        if min_fit < 0:
            offset = np.abs(min_fit)
            for key in fitness.keys():
                fitness[key] += offset
        p = []
        fitness_sum = 0.
        for key in fitness.keys():
            fitness_sum += fitness[key]

        ids = list(range(manager.population_size))
        for key in ids:
            fitness[key] /= fitness_sum
            p.append(fitness[key])

        protected_models = manager.get_protected()
        num_protected = len(protected_models)
        num_matings = (self.ga_args.population_size - num_protected) // children_per_crossover

        mating_tuples = []
        parents = []
        for j in range(num_matings):
            mating_tuple = tuple(choice(ids, size=(mates_per_crossover,), replace=False, p=p))
            mating_tuples.append(mating_tuple)
            parents.extend(mating_tuple)

        parents = list(np.unique(parents))

        manager.update_parents(new_parents=parents)
        manager.update_mating_tuples(mating_tuples)

        return manager

As previously mentioned, here we make sure to update the parents and the mating tuples in the manager by calling <code>manager.update_parents(new_parents=parents)</code> and <code>manager.update_mating_tuples(mating_tuples)</code>.

To compute the selection, we get the models' fitnesses by calling <code>manager.get_fitness()</code>

It's also worth noting that we assume the selection will get the number of mates per crossover and the number of childrens per crossover from the GA (we'll see where we get those values soon when we look at Crossovers). This allows us to compute how many mating tuples we need to generate while also taking into account how many protected models there also are.
<code>
        protected_models = manager.get_protected()
        num_protected = len(protected_models)
        num_matings = (self.ga_args.population_size - num_protected) // children_per_crossover
</code>

### Crossover
<a id='crossover'></a>

Similarly to Selection, I'll go over the basic Crossover class. I'll explain each of the methods and then we'll look at a specific implementation to see how to implement your own Crossover.

Below is the code for the basic Crossover class.

In [14]:
import copy
import random
from abc import abstractmethod

from gaggle.population.individual import Individual
from gaggle.arguments.ga_args import GAArgs
from gaggle.population.population_manager import PopulationManager


class Crossover:
    r""" The parent class for any Crossover Operator.
    It gives a basic function to crossover a whole population once the function for crossing over a single pair of parents is specified

    """
    mates_per_crossover = 0
    children_per_crossover = 0

    def __init__(self, ga_args: GAArgs = None):
        self.ga_args = ga_args if ga_args is not None else GAArgs()


    @abstractmethod
    def crossover_individual(self, individuals: list[Individual]) -> list[Individual]:
        r"""Speficies how to create children from parents
        Args:
            individuals: a list of parents to crossover (typically 2)
        Returns:
            A list of children created from the parents (typically 2)"""
        raise NotImplementedError

    def crossover_pop(self, manager: PopulationManager) -> PopulationManager:
        r""" Calls the crossover indivual operator over the whole popualtion
        while maintaining the protected parents
        For each pair of parents, crossover is called with probability ga_args.parent_survival_rate
        Args:
            manager: PopulationManager object holding the current population
        Returns:
            Modified PopulationManager object"""
        parents = manager.get_parents()
        num_parents = len(parents)
        population = manager.get_population()
        freshness = manager.get_freshness()

        new_population = {}
        new_freshness = {}

        free_indices = list(range(self.ga_args.population_size))
        to_mutate = []

        # first we get the protected indices and we add them to the list
        protected_models = manager.get_protected()

        # adding the protected
        for p in protected_models:
            new_population[p] = population[p]
            new_freshness[p] = freshness[p]
            free_indices.remove(p)
            if self.ga_args.mutate_protected:
                to_mutate.append(p)

        mating_tuples = manager.get_mating_tuples()
        surviving_parents = []
        to_mate = []
        # now we count how many parents to keep
        for mating_tuple in mating_tuples:
            # probability to keep the parents rather than the children
            keep_parents = random.random() < self.ga_args.parent_survival_rate
            if keep_parents:
                surviving_parents.extend(mating_tuple)
            else:
                to_mate.append(mating_tuple)

        # we now fill the lucky parents that survived
        for idx in surviving_parents:
            if idx in free_indices:
                new_population[idx] = copy.deepcopy(population[idx])  # we might have put that model already in so we don't want a double reference
                new_freshness[idx] = freshness[idx]
                free_indices.remove(idx)
                to_mutate.append(idx)
            else:
                for new_idx in free_indices:
                    if new_idx in surviving_parents:
                        continue
                    new_population[new_idx] = copy.deepcopy(population[idx])  # we might have put that model already in so we don't want a double reference
                    new_freshness[new_idx] = freshness[idx]
                    free_indices.remove(new_idx)
                    to_mutate.append(new_idx)
                    break

        # now we can do the actual crossover
        for mating_tuple in to_mate:
            party = [manager.population[idx] for idx in mating_tuple]
            children = self.crossover_individual(copy.deepcopy(party))
            for child in children:
                new_idx = free_indices.pop(0)
                to_mutate.append(new_idx)
                new_population[new_idx] = child
                new_freshness[new_idx] = True

        # if we are missing some, we fill the population
        for i in free_indices:
            idx = parents[random.randint(0, num_parents - 1)]
            new_population[i] = copy.deepcopy(population[idx])  # we might have put that model already in so we don't want a double reference
            new_freshness[i] = freshness[idx]
            free_indices.remove(i)
            to_mutate.append(i)

        # we finally update the manager
        manager.update_population(new_population, new_freshness)
        manager.update_to_mutate(new_to_mutate=to_mutate)
        return manager


As you can see, this one is a bit more complex than the basic selection. What's nice is you'll most likely won't have to touch the complicated <code>crossover_pop</code> method as all it does is fill the available population spots in order of priority.
We can summarize its behavior as follows:
- It first takes the protected models and adds them to the new population. That guarantees that the protected models always have space to be put into the population. The protected members also get to keep their previous id.
- Then, from the mating tuples it gets from <code>manager.get_mating_tuples()</code> (the ones generated by the selection), it sees using <code>ga_args.parent_survival_rate</code> whether each tuple of parents mates or survives instead of their children.
- It then adds the surviving parents to the population (while trying to preserve the ids as much as possible).
- Now, we can do the crossover, and we fill in the spots using the tuples of parents that didn't get chosen to survive instead of their children
- Finally, in case of some user mis-inputs or mis-calculations with <code>mates_per_crossover</code> or <code>children_per_crossover</code>, we fill in potential missing spots in the population. This avoids having indices in the population with no individuals since we want a fixed population size. To do so we just heuristically select one of the parents at random per spot left. This code usually is never run but is there as a safety net.
- Finally, we update the manager with the new population, freshness and the list of models to mutate using:

<code>
manager.update_population(new_population, new_freshness)

manager.update_to_mutate(new_to_mutate=to_mutate)
</code>

Notice how I <code>mates_per_crossover</code> and <code>children_per_crossover</code> are stored as class variables. This is because they are supposed to be fixed for each different Crossover and they are inherent to their particular Crossover.
The GA can get the value from the class and give it to the Selection so that the Selection can generate the appropriate number of mating tuples.

Now in practice, all you have to do to implement your own Crossover is implement the <code>crossover_individual</code> method. It takes in a list of individuals and returns the list of children individuals generated by the crossover. Below is an example using the k_point crossover.

In [15]:
from gaggle.population.individual import Individual
from gaggle.operators.crossover.crossover import Crossover
from gaggle.arguments.ga_args import GAArgs
from gaggle.utils.individual_helper import from_gene_pool, from_tensor

import torch
from numpy.random import default_rng
import numpy as np


class KPointCrossover(Crossover):
    r"""Generalization of single point crossover to k points
    See the following tutorial for a more in depth description
    https://www.tutorialspoint.com/genetic_algorithms/genetic_algorithms_crossover.htm
    Generates two children from two parents
    """
    mates_per_crossover = 2
    children_per_crossover = 2

    def __init__(self, ga_args: GAArgs = None):
        super(KPointCrossover, self).__init__(ga_args=ga_args)

    def crossover_individual(self, individuals: list[Individual]) ->list[Individual]:
        assert len(individuals) == self.mates_per_crossover
        individual_1, individual_2 = individuals
        assert individual_1.get_genome_size() == individual_2.get_genome_size()
        assert individual_1.sys_args.device == individual_2.sys_args.device
        genome_size = individual_1.get_genome_size()

        genome_1 = individual_1.get_gene_pool()
        genome_2 = individual_2.get_gene_pool()

        # convert to a singular tensor
        tensor_1, metadata_1 = from_gene_pool(genome_1)
        tensor_2, metadata_2 = from_gene_pool(genome_2)

        k = self.ga_args.k_point

        # select a set of k random indices without repeats
        rng = default_rng()
        cut_indices = np.sort(rng.choice(genome_size+1, size=k, replace=False))
        flip = True
        last_cut = 0
        for cut_idx in cut_indices:
            if flip:
                data_1 = tensor_1[last_cut:cut_idx].clone().detach()
                tensor_1[last_cut: cut_idx] = tensor_2[last_cut:cut_idx].clone().detach()
                tensor_2[last_cut: cut_idx] = data_1

            last_cut = cut_idx
            flip = not flip

        # don't forget the last one
        if flip:
            data_1 = tensor_1[last_cut:].clone().detach()
            tensor_1[last_cut:] = tensor_2[last_cut:].clone().detach()
            tensor_2[last_cut:] = data_1

        from_tensor(gene_pool=genome_1, tensor=tensor_1, metadata=metadata_1)
        from_tensor(gene_pool=genome_2, tensor=tensor_2, metadata=metadata_2)

        return [individual_1, individual_2]

No need to interact with the manager here since it's done in <code>crossover_pop</code>.

### Mutation
<a id='mutation'></a>

Now that we've covered Crossovers, we can finally look at mutations. Similarly to the other operators, I'll give the code for the base implementation of the Mutation class and we'll cover what needs to be done to create your own Mutation.


In [16]:
from abc import abstractmethod

from gaggle.population.individual import Individual
from gaggle.arguments.ga_args import GAArgs
from gaggle.population.population_manager import PopulationManager


class Mutation:
    r""" The parent class for any Mutation Operator.
    It gives a basic function to mutate a whole population once the function for mutating a single individual is specified

    """
    def __init__(self, ga_args: GAArgs = None):
        self.ga_args = ga_args if ga_args is not None else GAArgs()

    @abstractmethod
    def mutate_individual(self, individual: Individual) -> Individual:
        r"""Speficies how to mutate a single individual
        Args:
            individuals: a single individual to mutate
        Returns:
            A single individual after mutation"""
        raise NotImplementedError

    def mutate_pop(self, manager: PopulationManager) -> PopulationManager:
        r""" Calls the mutate_individual function for each member of the population
        Args:
            manager: PopulationManager object holding the current population
        Returns:
            Modified PopulationManager object"""
        population = manager.get_population()
        new_freshness = manager.get_freshness()

        to_mutate = manager.get_to_mutate()
        for individual_idx in to_mutate:
            population[individual_idx] = self.mutate_individual(population[individual_idx])
            new_freshness[individual_idx] = True

        manager.update_population(population, new_freshness)
        # in case we need to enforce parameter value bounds on the freshly mutated samples. Since it only applies to
        # mutated samples we don't have to update the freshness
        manager.apply_bounds(to_mutate)
        return manager


Similarly to crossover, we have <code>mutate_pop</code> and <code>mutate_individual</code>. Here in this case <code>mutate_pop</code> just iterates through the list of indices to be mutated that it gets from the manager (using <code> manager.get_to_mutate()</code>) and that should have been set by the Crossover. We then call <code>mutate_individual</code> on each of the individuals in a loop and update the population with the mutated population in the manager using <code>manager.update_population(population, new_freshness)</code>

We finally need to make another call to the manager to apply weight bounds restrictions. In cases where we want limits on the possible gene values, this ensures that the mutated population stays within the bounds and therefore should be also called whenever you implement your own <code>mutate_pop</code>. It is in general not recommended to modify <code>mutate_pop</code> unless absolutely necessary.

Most Mutations can be implemented by just implementing your own <code>mutate_individual</code> that should return the mutated individual. Below is an example with the uniform mutation.

In [17]:
import torch

from gaggle.population.base_individuals.nn_individual import Individual
from gaggle.operators.mutation.mutation import Mutation
from gaggle.arguments.ga_args import GAArgs


class UniformMutation(Mutation):
    r"""For real valued chromosomes
    Adds noise from a Uniform distribution within the range specified by:
    ga_args.uniform_mutation_min_val and ga_args.uniform_mutation_max_val
    Noise is only added to each gene with probability specified by ga_args.mutation_chance
    """
    def __init__(self, ga_args: GAArgs = None):
        super(UniformMutation, self).__init__(ga_args)
        self.uniform_mutation_min_val = self.ga_args.uniform_mutation_min_val
        self.uniform_mutation_max_val = self.ga_args.uniform_mutation_max_val

    def mutate_individual(self, individual: Individual) -> Individual:
        genome = individual.get_gene_pool()
        num_chromosomes = len(genome.keys())
        for i in range(num_chromosomes):
            # generate the random mask
            mask = torch.rand(genome[i]["param"].data.size(), dtype=torch.float,
                                device=genome[i]["param"].data.device) < self.ga_args.mutation_chance
            indices = torch.nonzero(mask, as_tuple=True)
            scaled_mutation = (torch.rand(size=genome[i]["param"].data[indices].size(), device=genome[i]["param"].data.device) *
                       (self.uniform_mutation_max_val - self.uniform_mutation_min_val)) - self.uniform_mutation_min_val
            genome[i]["param"].data[indices] += scaled_mutation
        return individual


### Registering Custom Operators
<a id='registering_custom_operators'></a>

Finally, I'll go over how to register custom operators to their respective factories so that they can be used with your basic original training script and your configuration files.

Similarly to how problems are registered in the problem, dataset and environment factories; operators have to be registered in the respective factories to be fully incorporated in our framework as if they were one of the base operators.

Below will be examples on how to register a custom operator for each of the operators

In [18]:
from gaggle.operators import SelectionFactory, CrossoverFactory, MutationFactory

# registering the Selection
print(SelectionFactory.selections)
SelectionFactory.update("custom_weighted_selection", WeightedSelection)
print(SelectionFactory.selections)

# registering the Crossover
print(CrossoverFactory.crossovers)
CrossoverFactory.update("custom_k_point_crossover", KPointCrossover)
print(CrossoverFactory.crossovers)

# registering the Mutation
print(MutationFactory.mutations)
MutationFactory.update("custom_uniform_mutation", UniformMutation)
print(MutationFactory.mutations)


{'truncation': <class 'gaggle.operators.selection.base_selections.truncation_selection.TruncationSelection'>, 'weighted': <class 'gaggle.operators.selection.base_selections.weighted_selection.WeightedSelection'>, 'relative_weighted': <class 'gaggle.operators.selection.base_selections.relative_weighted_selection.RelativeWeightedSelection'>, 'probabilistic_tournament': <class 'gaggle.operators.selection.base_selections.probabilistic_tournament_selection.ProbabilisticTournamentSelection'>, 'simple_tournament': <class 'gaggle.operators.selection.base_selections.simple_tournament_selection.SimpleTournamentSelection'>}
{'truncation': <class 'gaggle.operators.selection.base_selections.truncation_selection.TruncationSelection'>, 'weighted': <class 'gaggle.operators.selection.base_selections.weighted_selection.WeightedSelection'>, 'relative_weighted': <class 'gaggle.operators.selection.base_selections.relative_weighted_selection.RelativeWeightedSelection'>, 'probabilistic_tournament': <class 'g