Skip to content

Commit

Permalink
Merge pull request #522 from Libensemble/feature/async_uniform_sampling
Browse files Browse the repository at this point in the history
Feature/async uniform sampling
  • Loading branch information
jmlarson1 committed Oct 18, 2020
2 parents e4cdbed + 6153d1e commit af5db3b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 12 deletions.
38 changes: 27 additions & 11 deletions libensemble/alloc_funcs/start_only_persistent.py
Expand Up @@ -12,6 +12,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info):
.. seealso::
`test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/regression_tests/test_persistent_uniform_sampling.py>`_ # noqa
`test_persistent_uniform_sampling_async.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/regression_tests/test_persistent_uniform_sampling_async.py>`_ # noqa
"""

Work = {}
Expand All @@ -21,18 +22,33 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info):
# The one persistent worker is done. Exiting
return Work, persis_info, 1

# If i is in persistent mode, and all of its calculated values have
# returned, give them back to i. Otherwise, give nothing to i
for i in avail_worker_ids(W, persistent=True):
gen_inds = (H['gen_worker'] == i)
if np.all(H['returned'][gen_inds]):
last_time_gen_gave_batch = np.max(H['gen_time'][gen_inds])
inds_of_last_batch_from_gen = H['sim_id'][gen_inds][H['gen_time'][gen_inds] == last_time_gen_gave_batch]
gen_work(Work, i,
sim_specs['in'] + [n[0] for n in sim_specs['out']] + [('sim_id')],
np.atleast_1d(inds_of_last_batch_from_gen), persis_info[i], persistent=True)

H['given_back'][inds_of_last_batch_from_gen] = True
if gen_specs['user'].get('async', False):
# If i is in persistent mode, asynchronous behavior is desired, and
# *any* of its calculated values have returned, give them back to i.
# Otherwise, give nothing to i
returned_but_not_given = np.logical_and.reduce((H['returned'], ~H['given_back'], H['gen_worker'] == i))
if np.any(returned_but_not_given):
inds_to_give = np.where(returned_but_not_given)[0]
gen_work(Work, i,
sim_specs['in'] + [n[0] for n in sim_specs['out']] + [('sim_id')],
np.atleast_1d(inds_to_give), persis_info[i], persistent=True)

H['given_back'][inds_to_give] = True

else:
# If i is in persistent mode, batch behavior is desired, and
# *all* of its calculated values have returned, give them back to i.
# Otherwise, give nothing to i
gen_inds = (H['gen_worker'] == i)
if np.all(H['returned'][gen_inds]):
last_time_gen_gave_batch = np.max(H['gen_time'][gen_inds])
inds_to_give = H['sim_id'][gen_inds][H['gen_time'][gen_inds] == last_time_gen_gave_batch]
gen_work(Work, i,
sim_specs['in'] + [n[0] for n in sim_specs['out']] + [('sim_id')],
np.atleast_1d(inds_to_give), persis_info[i], persistent=True)

H['given_back'][inds_to_give] = True

task_avail = ~H['given']
for i in avail_worker_ids(W, persistent=False):
Expand Down
8 changes: 7 additions & 1 deletion libensemble/gen_funcs/persistent_uniform_sampling.py
Expand Up @@ -7,10 +7,14 @@
def persistent_uniform(H, persis_info, gen_specs, libE_info):
"""
This generation function always enters into persistent mode and returns
``gen_specs['gen_batch_size']`` uniformly sampled points.
``gen_specs['gen_batch_size']`` uniformly sampled points the first time it
is called. Afterwards, it returns the number of points given. This can be
used in either a batch or asynchronous mode by adjusting the allocation
function.
.. seealso::
`test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/regression_tests/test_persistent_uniform_sampling.py>`_ # noqa
`test_persistent_uniform_sampling_async.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/regression_tests/test_persistent_uniform_sampling_async.py>`_ # noqa
"""
ub = gen_specs['user']['ub']
lb = gen_specs['user']['lb']
Expand All @@ -23,5 +27,7 @@ def persistent_uniform(H, persis_info, gen_specs, libE_info):
H_o = np.zeros(b, dtype=gen_specs['out'])
H_o['x'] = persis_info['rand_stream'].uniform(lb, ub, (b, n))
tag, Work, calc_in = sendrecv_mgr_worker_msg(libE_info['comm'], H_o)
if calc_in is not None:
b = len(calc_in)

return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG
@@ -0,0 +1,63 @@
# """
# Runs libEnsemble on the 6-hump camel problem. Documented here:
# https://www.sfu.ca/~ssurjano/camel6.html
#
# Execute via one of the following commands (e.g. 3 workers):
# mpiexec -np 4 python3 test_6-hump_camel_persistent_uniform_sampling_async.py
# python3 test_6-hump_camel_persistent_uniform_sampling_async.py --nworkers 3 --comms local
# python3 test_6-hump_camel_persistent_uniform_sampling_async.py --nworkers 3 --comms tcp
#
# The number of concurrent evaluations of the objective function will be 4-1=3.
# """

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_NPROCS: 4

import sys
import numpy as np

# Import libEnsemble items for this test
from libensemble.libE import libE
from libensemble.sim_funcs.branin.branin_obj import call_branin as sim_f
from libensemble.gen_funcs.persistent_uniform_sampling import persistent_uniform as gen_f
from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f
from libensemble.tools import parse_args, save_libE_output, add_unique_random_streams

nworkers, is_master, libE_specs, _ = parse_args()

if nworkers < 2:
sys.exit("Cannot run with a persistent worker if only one worker -- aborting...")

n = 2
sim_specs = {'sim_f': sim_f,
'in': ['x'],
'out': [('f', float)],
'user': {'uniform_random_pause_ub': 0.1}
}

gen_specs = {'gen_f': gen_f,
'in': [],
'out': [('x', float, (n,))],
'user': {'gen_batch_size': nworkers - 1,
'async': True,
'lb': np.array([-3, -2]),
'ub': np.array([3, 2])}
}

alloc_specs = {'alloc_f': alloc_f, 'out': [('given_back', bool)]}

persis_info = add_unique_random_streams({}, nworkers + 1)

exit_criteria = {'gen_max': 100, 'elapsed_wallclock_time': 300}

# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info,
alloc_specs, libE_specs)

if is_master:
[_, counts] = np.unique(H['gen_time'], return_counts=True)
assert counts[0] == nworkers - 1, "The first gen_time should be common among gen_batch_size number of points"
assert len(np.unique(counts)) > 1, "There is no variablitiy in the gen_times but there should be for the async case"

save_libE_output(H, persis_info, __file__, nworkers)

0 comments on commit af5db3b

Please sign in to comment.