Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel function evaluation #31

Closed
AStupidBear opened this issue Sep 7, 2017 · 9 comments
Closed

parallel function evaluation #31

AStupidBear opened this issue Sep 7, 2017 · 9 comments

Comments

@AStupidBear
Copy link

For some real world applications, each function evaluation is expensive to run. So it's necessary to parallelize the fitness evaluation part. I can easily do that in Julia, but my version of CMA-ES is not as feature-full as this package. Do you have any interest in implementing that in Python.

@nikohansen
Copy link
Contributor

The interface for parallel evaluations is exposed in the cma package via the cma.CMAEvolutionStrategy.ask method, which returns a list of solutions to be evaluated. This leaves us with the desire for a wrapper making parallel calls to the objective function with these solutions. That is, we want to parallelise the line

f_values = [f(x) for x in X]

where X is a list of numpy arrays and f is the objective function (considered to be CPU-expensive).

I haven't seen something like a three-liner to do this in Python yet and currently cannot provide any ready-to-go code doing so. If someone can help out that would be great. I am looking forward to any contribution to this end!

@drallensmith
Copy link

Well, an example to do it in Python for NEAT is here. It's not a 3-liner, but should help.

@drallensmith
Copy link

Note, BTW, that threading is not the way to go for most implementations of Python, due to the Global Interpreter Lock (GIL), although there is code for doing it in NEAT-Python.

@nikohansen
Copy link
Contributor

nikohansen commented Sep 8, 2017

Well then, for a start, a 10-or-so-liner with a speed up of 10 on a function which takes 0.1 seconds to evaluate:

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(x):
    time.sleep(0.1)
    return sum(x**2)

# serial evaluation of all solutions
def serial_evals(X, f=fitness, args=()):
    return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, X, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = [self.pool.apply_async(self.eval_function, (x, ) + args) for x in X]
    return [job.get(timeout=self.timeout) for job in jobs]
ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, fitness)

# time both
for eval_all in [serial_evals, parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(10 * [1], 1, {'maxiter': 50})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X))
    es.disp()
(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 10 (seed=940512, Fri Sep  8 17:43:25 2017)
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
   50    500 2.838045498955964e-03 1.8e+00 2.66e-02  2e-02  2e-02 0:52.0
(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 10 (seed=960184, Fri Sep  8 17:44:17 2017)
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
   50    500 4.426200851808875e-03 1.9e+00 3.67e-02  2e-02  3e-02 0:05.1

@nikohansen
Copy link
Contributor

nikohansen commented Sep 10, 2017

I have been prototyping (not yet published) a class EvalParallel (in module cma.fitness_transformations). This becomes in effect a one-liner, to be used like:

import time
import cma

def fitness(x):
    time.sleep(0.1)
    return sum(x**2)

with cma.fitness_transformations.EvalParallel(12) as eval_all:
    es = cma.CMAEvolutionStrategy(10 * [1], 1)
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(fitness, X))  
        # was: es.tell(X, [fitness(x) for x in X])
        es.disp()

@nikohansen
Copy link
Contributor

Solved with class cma.fitness_transformations.EvalParallel in commit 9ea9716

@nikohansen
Copy link
Contributor

An alternative implementation with a different interface that doesn't need passing the objective function each time when the parallel version is called (in beta):

class EvalParallel2(object):
    """A class and context manager for parallel evaluations.

    The interface changed, such that the fitness function can also be
    given once in the constructor. Hence the function has become an
    optional and the second argument of `__call__`.

    TODO: What is the best order of constructor arguments?

    To be used with the `with` statement (otherwise `terminate` needs to
    be called to free resources)::

        with EvalParallel2() as eval_all:
            fvals = eval_all(solutions, fitness)

    assigns a callable `EvalParallel2` class instance to ``eval_all``.
    The instance can be called with a `list` (or `tuple` or any
    sequence) of solutions and returns their fitness values. That is::

        eval_all(solutions, fitness) == [fitness(x) for x in solutions]

    `EvalParallel2.__call__` may take two additional optional arguments,
    namely `args` passed to ``fitness`` and `timeout` passed to the
    `multiprocessing.pool.ApplyResult.get` method which raises
    `multiprocessing.TimeoutError` in case.

    Examples:

    >>> import cma
    >>> from cma.fitness_transformations import EvalParallel2
    >>> # class usage, don't forget to call terminate
    >>> ep = EvalParallel2(4, cma.fitness_functions.elli)
    >>> ep([[1,2], [3,4], [4, 5]])  # doctest:+ELLIPSIS
    [4000000.944...
    >>> ep.terminate()
    ...
    >>> # use with `with` statement (context manager)
    >>> es = cma.CMAEvolutionStrategy(3 * [1], 1, dict(verbose=-9))
    >>> with EvalParallel2(12) as eval_all:
    ...     while not es.stop():
    ...         X = es.ask()
    ...         es.tell(X, eval_all(X, cma.fitness_functions.elli))
    >>> assert es.result[1] < 1e-13 and es.result[2] < 1500

    Parameters: the `EvalParallel2` constructor takes the number of
    processes as optional input argument, which is by default
    ``multiprocessing.cpu_count()``.

    Limitations: The `multiprocessing` module (on which this class is
    based upon) does not work with class instance method calls.

    In some cases the execution may be considerably slowed down,
    as for example with test suites from coco/bbob.

    """
    def __init__(self, number_of_processes=None, fitness_function=None):
        self.processes = number_of_processes
        self.fitness_function = fitness_function
        self.pool = ProcessingPool(self.processes)

    def __call__(self, solutions, fitness_function=None, args=(), timeout=None):
        """evaluate a list/sequence of solution-"vectors", return a list
        of corresponding f-values.

        Raises `multiprocessing.TimeoutError` if `timeout` is given and
        exceeded.
        """
        fitness_function = fitness_function or self.fitness_function
        if fitness_function is None:
            raise ValueError("`fitness_function` was never given, must be"
                             " passed in `__init__` or `__call__`")
        warning_str = ("WARNING: `fitness_function` must be a function,"
                       " not an instancemethod, in order to work with"
                       " `multiprocessing`")
        if isinstance(fitness_function, type(self.__init__)):
            print(warning_str)
        jobs = [self.pool.apply_async(fitness_function, (x,) + args)
                for x in solutions]
        try:
            return [job.get(timeout) for job in jobs]
        except:
            print(warning_str)
            raise

    def terminate(self):
        """free allocated processing pool"""
        # self.pool.close()  # would wait for job termination
        self.pool.terminate()  # terminate jobs regardless
        self.pool.join()  # end spawning

    def __enter__(self):
        # we could assign self.pool here, but then `EvalParallel2` would
        # *only* work when using the `with` statement
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.terminate()

    def __del__(self):
        """though generally not recommended `__del__` should be OK here"""
        self.terminate()

@lazyoracle
Copy link

Is this alternative implementation a part of the current master branch?

@nikohansen
Copy link
Contributor

No, sorry, it's not (yet) in the master branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants