diff --git a/libensemble/alloc_funcs/start_only_persistent.py b/libensemble/alloc_funcs/start_only_persistent.py index 5de788a8c..3353352ea 100644 --- a/libensemble/alloc_funcs/start_only_persistent.py +++ b/libensemble/alloc_funcs/start_only_persistent.py @@ -12,6 +12,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info): .. seealso:: `test_persistent_uniform_sampling.py `_ # noqa + `test_persistent_uniform_sampling_async.py `_ # noqa """ Work = {} @@ -21,18 +22,33 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info): # The one persistent worker is done. Exiting return Work, persis_info, 1 - # If i is in persistent mode, and all of its calculated values have - # returned, give them back to i. Otherwise, give nothing to i for i in avail_worker_ids(W, persistent=True): - gen_inds = (H['gen_worker'] == i) - if np.all(H['returned'][gen_inds]): - last_time_gen_gave_batch = np.max(H['gen_time'][gen_inds]) - inds_of_last_batch_from_gen = H['sim_id'][gen_inds][H['gen_time'][gen_inds] == last_time_gen_gave_batch] - gen_work(Work, i, - sim_specs['in'] + [n[0] for n in sim_specs['out']] + [('sim_id')], - np.atleast_1d(inds_of_last_batch_from_gen), persis_info[i], persistent=True) - - H['given_back'][inds_of_last_batch_from_gen] = True + if gen_specs['user'].get('async', False): + # If i is in persistent mode, asynchronous behavior is desired, and + # *any* of its calculated values have returned, give them back to i. + # Otherwise, give nothing to i + returned_but_not_given = np.logical_and.reduce((H['returned'], ~H['given_back'], H['gen_worker'] == i)) + if np.any(returned_but_not_given): + inds_to_give = np.where(returned_but_not_given)[0] + gen_work(Work, i, + sim_specs['in'] + [n[0] for n in sim_specs['out']] + [('sim_id')], + np.atleast_1d(inds_to_give), persis_info[i], persistent=True) + + H['given_back'][inds_to_give] = True + + else: + # If i is in persistent mode, batch behavior is desired, and + # *all* of its calculated values have returned, give them back to i. + # Otherwise, give nothing to i + gen_inds = (H['gen_worker'] == i) + if np.all(H['returned'][gen_inds]): + last_time_gen_gave_batch = np.max(H['gen_time'][gen_inds]) + inds_to_give = H['sim_id'][gen_inds][H['gen_time'][gen_inds] == last_time_gen_gave_batch] + gen_work(Work, i, + sim_specs['in'] + [n[0] for n in sim_specs['out']] + [('sim_id')], + np.atleast_1d(inds_to_give), persis_info[i], persistent=True) + + H['given_back'][inds_to_give] = True task_avail = ~H['given'] for i in avail_worker_ids(W, persistent=False): diff --git a/libensemble/gen_funcs/persistent_uniform_sampling.py b/libensemble/gen_funcs/persistent_uniform_sampling.py index d67fb1e9b..768f1b3dc 100644 --- a/libensemble/gen_funcs/persistent_uniform_sampling.py +++ b/libensemble/gen_funcs/persistent_uniform_sampling.py @@ -7,10 +7,14 @@ def persistent_uniform(H, persis_info, gen_specs, libE_info): """ This generation function always enters into persistent mode and returns - ``gen_specs['gen_batch_size']`` uniformly sampled points. + ``gen_specs['gen_batch_size']`` uniformly sampled points the first time it + is called. Afterwards, it returns the number of points given. This can be + used in either a batch or asynchronous mode by adjusting the allocation + function. .. seealso:: `test_persistent_uniform_sampling.py `_ # noqa + `test_persistent_uniform_sampling_async.py `_ # noqa """ ub = gen_specs['user']['ub'] lb = gen_specs['user']['lb'] @@ -23,5 +27,7 @@ def persistent_uniform(H, persis_info, gen_specs, libE_info): H_o = np.zeros(b, dtype=gen_specs['out']) H_o['x'] = persis_info['rand_stream'].uniform(lb, ub, (b, n)) tag, Work, calc_in = sendrecv_mgr_worker_msg(libE_info['comm'], H_o) + if calc_in is not None: + b = len(calc_in) return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG diff --git a/libensemble/tests/regression_tests/test_persistent_uniform_sampling_async.py b/libensemble/tests/regression_tests/test_persistent_uniform_sampling_async.py new file mode 100644 index 000000000..6fdf2d13a --- /dev/null +++ b/libensemble/tests/regression_tests/test_persistent_uniform_sampling_async.py @@ -0,0 +1,63 @@ +# """ +# Runs libEnsemble on the 6-hump camel problem. Documented here: +# https://www.sfu.ca/~ssurjano/camel6.html +# +# Execute via one of the following commands (e.g. 3 workers): +# mpiexec -np 4 python3 test_6-hump_camel_persistent_uniform_sampling_async.py +# python3 test_6-hump_camel_persistent_uniform_sampling_async.py --nworkers 3 --comms local +# python3 test_6-hump_camel_persistent_uniform_sampling_async.py --nworkers 3 --comms tcp +# +# The number of concurrent evaluations of the objective function will be 4-1=3. +# """ + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local tcp +# TESTSUITE_NPROCS: 4 + +import sys +import numpy as np + +# Import libEnsemble items for this test +from libensemble.libE import libE +from libensemble.sim_funcs.branin.branin_obj import call_branin as sim_f +from libensemble.gen_funcs.persistent_uniform_sampling import persistent_uniform as gen_f +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.tools import parse_args, save_libE_output, add_unique_random_streams + +nworkers, is_master, libE_specs, _ = parse_args() + +if nworkers < 2: + sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") + +n = 2 +sim_specs = {'sim_f': sim_f, + 'in': ['x'], + 'out': [('f', float)], + 'user': {'uniform_random_pause_ub': 0.1} + } + +gen_specs = {'gen_f': gen_f, + 'in': [], + 'out': [('x', float, (n,))], + 'user': {'gen_batch_size': nworkers - 1, + 'async': True, + 'lb': np.array([-3, -2]), + 'ub': np.array([3, 2])} + } + +alloc_specs = {'alloc_f': alloc_f, 'out': [('given_back', bool)]} + +persis_info = add_unique_random_streams({}, nworkers + 1) + +exit_criteria = {'gen_max': 100, 'elapsed_wallclock_time': 300} + +# Perform the run +H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, + alloc_specs, libE_specs) + +if is_master: + [_, counts] = np.unique(H['gen_time'], return_counts=True) + assert counts[0] == nworkers - 1, "The first gen_time should be common among gen_batch_size number of points" + assert len(np.unique(counts)) > 1, "There is no variablitiy in the gen_times but there should be for the async case" + + save_libE_output(H, persis_info, __file__, nworkers)