# Asynchronous parallel evaluation

This demonstrates an asynchronous evaluation model, or as is known more formally, an asynchronous steady-state evolutionary algorithm (ASEA).  We will demonstrate two approaches.  The first shows a detailed implementation suitable for practitioners that like to have full control of their implementations.  The second shows a more accessible approach using a single, monolithic function that implements a full ASEA.

In [1]:
import multiprocessing.popen_spawn_posix  # Python 3.9 workaround for Dask.  See https://github.com/dask/distributed/issues/4168
from distributed import Client, LocalCluster
import toolz

from leap_ec import Representation, context
from leap_ec import ops
from leap_ec.decoder import IdentityDecoder

from leap_ec.binary_rep.problems import MaxOnes
from leap_ec.binary_rep.initializers import create_binary_sequence
from leap_ec.binary_rep.ops import mutate_bitflip

from leap_ec.distrib import asynchronous
from leap_ec.distrib import evaluate
from leap_ec.distrib.individual import DistributedIndividual

First, let's set up `dask` to run on local pretend "cluster" on your machine.

In [2]:
cluster = LocalCluster()
client = Client(cluster)

Now create an initial random population of five individuals that use a binary representation of four bits for solving the MAX ONES problem.

In [3]:
parents = DistributedIndividual.create_population(5,
                                            initialize=create_binary_sequence(4),
                                            decoder=IdentityDecoder(), problem=MaxOnes())

To get things started, we send the entire randomly generated initial population to dask to start getting evaluated asynchronously.  We do this by calling `asynchronous.eval_population()`, which returns a distributed dask `as_completed` iterator.  Essentially running `next()` on that iterator will iterate to the next completed individual.

In [4]:
    as_completed_iter = asynchronous.eval_population(parents, client=client)

We create a "bag" that will contain the evaluated individuals.  This bag will initially be an empty list.

In [5]:
bag = []

Then we fall into a loop where we insert individuals into a bag with an arbitrary capacity of three.  That means the first three individuals will just be inserted into the bag.  However, the fourth and fifth individual will have to fight it out to be inserted.  We chose the greedy insertion function that means new individuals fight it out with the current *weakest* individual in the bag; there is an alternative function, `tournament_insert_into_pop()` that just randomly selects an opponent from the current bag.

To make things more interesting, we will create up to four *new* offspring from the bag. In later, more complex examples, we'll implement a proper births budget to limit the total number of evaluated individuals.

In [6]:
num_offspring = 0

for i, evaluated_future in enumerate(as_completed_iter):

    evaluated = evaluated_future.result()

    print(i, ', evaluated: ', evaluated.genome, evaluated.fitness)

    asynchronous.greedy_insert_into_pop(evaluated, bag, 3)

    if num_offspring < 4:
        # Only create offspring if we have the budget for one
        offspring = toolz.pipe(bag,
                               ops.random_selection,
                               ops.clone,
                               mutate_bitflip(expected_num_mutations=1),
                               ops.pool(size=1))
        print('created offspring:', offspring[0].genome)

        # Now asyncrhonously submit to dask
        as_completed_iter.add(client.submit(evaluate.evaluate(context=context), offspring[0]))

        num_offspring += 1

0 , evaluated:  [1, 0, 1, 1] 3
created offspring: [1, 0, 1, 1]
1 , evaluated:  [1, 1, 0, 1] 3
created offspring: [1, 0, 1, 0]
2 , evaluated:  [1, 0, 1, 0] 2
created offspring: [1, 0, 1, 1]
3 , evaluated:  [1, 0, 0, 0] 1
created offspring: [0, 1, 0, 0]
4 , evaluated:  [1, 0, 1, 0] 2
5 , evaluated:  [1, 0, 1, 1] 3
6 , evaluated:  [1, 0, 1, 0] 2
7 , evaluated:  [1, 0, 1, 1] 3
8 , evaluated:  [0, 1, 0, 0] 1


Now `bag` should contain the final population of the seven total individuals cooked down to the three best.  Note that there are nine total "evaluated" lines that correspond to the original five randomly generated individuals plus the four new ones we added inside the loop.

In [7]:
[print(i, ind.genome, ind.fitness) for i, ind in enumerate(bag)]

0 [1, 0, 1, 1] 3
1 [1, 1, 0, 1] 3
2 [1, 0, 1, 1] 3


[None, None, None]

## Using convenience function `steady_state`

However, if you are comfortable with relinquishing control over implementation details, you might find it easier to use `leap.distributed.steady_state`.  Under the hood it essentially does everything above, plus a few other things, such as allowing you to decide if non-viable individuals count towards the birth budget, or not.  You can also specify the strategy for inserting new individuals into the bag.

### A note about non-viable individuals

A non-viable individual is one that didn't get a chance to get evaluated because, say, an exception was thrown during the evaluation process.  For example, if you were tuning deep-learner (DL) hyper-parameters, it may be that a given DL configuration an individual represents doesn't make sense such that it caused pytorch or tensorflow to throw an exception.  That individual would be "non-viable" because the corresponding DL hyper-parameter set didn't even get a chance to be trained.

Essentially, any exception thrown during an individual's evaluation will cause `leap.distributed` to deem that individual to be non-viable.  It will see the `is_viable` to `False`, set the fitness to `math.nan`, and set the attribute `exception` to the thrown exception.  This should hopefully make it easier to track such individuals and to provide a diagnostic to how and when individuals are marked non-viable during runs.  `leap.core.context['leap']['distributed']['non_viable']` is incremented to keep a running total of non-viable individuals during a run; you will need to manually reset this counter between runs.

Both `leap.distributed.synchronous` and `leap.distributed.asynchronous` use `leap.distributed.evaluate` to implement this functionality.

### `steady_state()` example

The following example uses `steady_state()` to do what we did above, though we go with the default inserter, `tournament_insert_into_pop()` that is less greedy that what we used earlier.  Note that we didn't specify the `individual_cls` argument since the default already uses `core.Individual`.

In [8]:
final_pop = asynchronous.steady_state(client, births=9, init_pop_size=5, pop_size=3,
                                      representation=Representation(
                                                  decoder=IdentityDecoder(),
                                                  initialize=create_binary_sequence(4),
                                                  individual_cls=DistributedIndividual),
                                      problem=MaxOnes(),
                                      offspring_pipeline=[ops.random_selection,
                                                ops.clone,
                                                mutate_bitflip(expected_num_mutations=1),
                                                ops.pool(size=1)])

In [9]:
[print(i, ind.genome, ind.fitness) for i, ind in enumerate(final_pop)]

0 [1, 0, 0, 1] 2
1 [1, 0, 0, 1] 2
2 [1, 0, 0, 1] 2


[None, None, None]

In [10]:
context

{'leap': {'distrib': {'non_viable': 0}, 'births': 9}}

In [11]:
final_pop = asynchronous.steady_state(client, births=9, init_pop_size=5, pop_size=3,
                                      representation=Representation(
                                                  decoder=IdentityDecoder(),
                                                  initialize=create_binary_sequence(4),
                                                  individual_cls=DistributedIndividual),
                                      problem=MaxOnes(),
                                      offspring_pipeline=[ops.random_selection,
                                                ops.clone,
                                                mutate_bitflip(expected_num_mutations=1),
                                                ops.pool(size=1)])

In [12]:
[print(i, ind.genome, ind.fitness) for i, ind in enumerate(final_pop)]

0 [1, 1, 0, 1] 3
1 [1, 1, 1, 0] 3
2 [1, 1, 0, 1] 3


[None, None, None]

In [13]:
context

{'leap': {'distrib': {'non_viable': 0}, 'births': 9}}