Skip to content

Commit

Permalink
Merge a38693c into 9c0a75f
Browse files Browse the repository at this point in the history
  • Loading branch information
shuds13 committed Apr 2, 2020
2 parents 9c0a75f + a38693c commit aad39be
Show file tree
Hide file tree
Showing 17 changed files with 87 additions and 49 deletions.
2 changes: 2 additions & 0 deletions docs/data_structures/worker_array.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ worker array
::

W: numpy structured array
'worker_id' [int]:
The worker ID
'active' [int]:
Is the worker active or not
'persis_state' [int]:
Expand Down
10 changes: 5 additions & 5 deletions libensemble/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(self, inbox, outbox, copy_msg=False):
self._inbox = inbox
self._outbox = outbox
self._copy = copy_msg
self._pushback = None
self.recv_buffer = None
with QComm.lock:
QComm._ncomms.value += 1

Expand All @@ -136,8 +136,8 @@ def send(self, *args):

def recv(self, timeout=None):
"Return a message from the inbox queue or raise TimeoutError."
pb_result = self._pushback
self._pushback = None
pb_result = self.recv_buffer
self.recv_buffer = None
if pb_result is not None:
return pb_result
try:
Expand All @@ -148,8 +148,8 @@ def recv(self, timeout=None):
raise Timeout()

# TODO: This should go away once I have internal comms working
def push_back(self, *args):
self._pushback = args
def push_to_buffer(self, *args):
self.recv_buffer = args

def mail_flag(self):
"Check whether we know a message is ready for receipt."
Expand Down
16 changes: 8 additions & 8 deletions libensemble/comms/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ def __init__(self, mpi_comm, remote_rank=0):
self.remote_rank = remote_rank
self.status = MPI.Status()
self._outbox = []
self._pushed = None
self.recv_buffer = None

def __del__(self):
"Wait on anything pending if comm is killed."
for req in self._outbox:
req.Wait()

def mail_flag(self):
return (self._pushed is not None
return (self.recv_buffer is not None
or self.mpi_comm.Iprobe(source=self.remote_rank))

def kill_pending(self):
Expand All @@ -65,9 +65,9 @@ def send(self, *args):

def recv(self, timeout=None):
"Receive a message or raise TimeoutError."
if self._pushed is not None:
result = self._pushed
self._pushed = None
if self.recv_buffer is not None:
result = self.recv_buffer
self.recv_buffer = None
return result
if timeout is not None:
tfinal = time.time() + timeout
Expand All @@ -85,9 +85,9 @@ def process_incoming(self, msg, status):
"Convert an MPI message and tag to a local communicator format message."
return msg[0]

def push_back(self, *args):
assert self._pushed is None, "Cannot push back multiple messages"
self._pushed = args
def push_to_buffer(self, *args):
assert self.recv_buffer is None, "Cannot push back multiple messages"
self.recv_buffer = args

def get_num_workers(self):
return self.mpi_comm.Get_size() - 1
Expand Down
5 changes: 3 additions & 2 deletions libensemble/gen_funcs/persistent_aposmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import nlopt
import dfols

from libensemble.message_numbers import STOP_TAG, PERSIS_STOP
from libensemble.message_numbers import STOP_TAG, PERSIS_STOP, FINISHED_PERSISTENT_GEN_TAG
from libensemble.tools.gen_support import send_mgr_worker_msg
from libensemble.tools.gen_support import get_mgr_worker_msg

Expand Down Expand Up @@ -217,6 +217,7 @@ def aposmm(H, persis_info, gen_specs, libE_info):

if tag in [STOP_TAG, PERSIS_STOP]:
clean_up_and_stop(local_H, local_opters, run_order)
persis_info['run_order'] = run_order
break

n_s, n_r = update_local_H_after_receiving(local_H, n, n_s, user_specs, Work, calc_in, fields_to_pass)
Expand Down Expand Up @@ -283,7 +284,7 @@ def aposmm(H, persis_info, gen_specs, libE_info):
send_mgr_worker_msg(comm, local_H[new_inds_to_send_mgr + new_opt_inds_to_send_mgr][[i[0] for i in gen_specs['out']]])
something_sent = True

return local_H, persis_info, tag
return [], persis_info, FINISHED_PERSISTENT_GEN_TAG


class LocalOptInterfacer(object):
Expand Down
9 changes: 5 additions & 4 deletions libensemble/gen_funcs/persistent_deap_nsga2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import array

from libensemble.message_numbers import STOP_TAG, PERSIS_STOP
from libensemble.message_numbers import STOP_TAG, PERSIS_STOP, FINISHED_PERSISTENT_GEN_TAG
from libensemble.tools.gen_support import sendrecv_mgr_worker_msg


Expand Down Expand Up @@ -125,8 +125,9 @@ def deap_nsga2(H, persis_info, gen_specs, libE_info):
print('Finished evaluating population, doing selection now.')
# Running fitness calc on gens > 0
invalid_ind, tag = evaluate_pop(g, invalid_ind, Out, comm)
# Select the next generation population
pop = toolbox.select(pop + offspring, MU)
if tag not in [STOP_TAG, PERSIS_STOP]:
# Select the next generation population
pop = toolbox.select(pop + offspring, MU)
else:
print('There were no invalid indiviuals')
# Don't update population
Expand All @@ -140,4 +141,4 @@ def deap_nsga2(H, persis_info, gen_specs, libE_info):
print('Current minimum:', np.min(fits))
print('Sum of fit values at end of loop', sum(fits))

return Out, persis_info, tag
return Out, persis_info, FINISHED_PERSISTENT_GEN_TAG
3 changes: 1 addition & 2 deletions libensemble/gen_funcs/persistent_fd_param_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,10 @@ def fd_param_finder(H, persis_info, gen_specs, libE_info):
os.remove('fnoise.out')

if np.all(inform == 1):
tag = FINISHED_PERSISTENT_GEN_TAG
break

H0 = build_H0(x_f_pairs_new, gen_specs, noise_h_mat)
tag, Work, calc_in = sendrecv_mgr_worker_msg(comm, H0)

persis_info['Fnoise'] = Fnoise
return H0, persis_info, tag
return H0, persis_info, FINISHED_PERSISTENT_GEN_TAG
4 changes: 2 additions & 2 deletions libensemble/gen_funcs/persistent_inverse_bayes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np

from libensemble.message_numbers import STOP_TAG, PERSIS_STOP
from libensemble.message_numbers import STOP_TAG, PERSIS_STOP, FINISHED_PERSISTENT_GEN_TAG
from libensemble.tools.gen_support import sendrecv_mgr_worker_msg


Expand Down Expand Up @@ -37,4 +37,4 @@ def persistent_updater_after_likelihood(H, persis_info, gen_specs, libE_info):
if calc_in is not None:
w = H_o['prior'] + calc_in['like'] - H_o['prop']

return H_o, persis_info, tag
return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG
3 changes: 1 addition & 2 deletions libensemble/gen_funcs/persistent_tasmanian.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ def sparse_grid(H, persis_info, gen_specs, libE_info):

persis_info['aResult'][prec] = aResult

tag = FINISHED_PERSISTENT_GEN_TAG
return H0, persis_info, tag
return H0, persis_info, FINISHED_PERSISTENT_GEN_TAG
4 changes: 2 additions & 2 deletions libensemble/gen_funcs/persistent_uniform_sampling.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np

from libensemble.message_numbers import STOP_TAG, PERSIS_STOP
from libensemble.message_numbers import STOP_TAG, PERSIS_STOP, FINISHED_PERSISTENT_GEN_TAG
from libensemble.tools.gen_support import sendrecv_mgr_worker_msg


Expand All @@ -24,4 +24,4 @@ def persistent_uniform(H, persis_info, gen_specs, libE_info):
H_o['x'] = persis_info['rand_stream'].uniform(lb, ub, (b, n))
tag, Work, calc_in = sendrecv_mgr_worker_msg(libE_info['comm'], H_o)

return H_o, persis_info, tag
return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG
7 changes: 2 additions & 5 deletions libensemble/gen_funcs/uniform_or_localopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def nlopt_obj_fun(x, grad):
gen_specs['user']['ub'], gen_specs['user']['lb'], local=True, active=True)
tag, Work, calc_in = sendrecv_mgr_worker_msg(comm, H_o)
if tag in [STOP_TAG, PERSIS_STOP]:
nlopt.forced_stop.message = 'tag=' + str(tag)
raise nlopt.forced_stop

# Return function value (and maybe gradient)
Expand Down Expand Up @@ -97,13 +96,11 @@ def nlopt_obj_fun(x, grad):
persis_info_updates = {'done': True}
if exit_code > 0 and exit_code < 5:
persis_info_updates['x_opt'] = x_opt
tag_out = FINISHED_PERSISTENT_GEN_TAG
except Exception as e: # Raised when manager sent PERSIS_STOP or STOP_TAG
except Exception: # Raised when manager sent PERSIS_STOP or STOP_TAG
x_opt = []
persis_info_updates = {}
tag_out = int(e.message.split('=')[-1])

return x_opt, persis_info_updates, tag_out
return x_opt, persis_info_updates, FINISHED_PERSISTENT_GEN_TAG


def add_to_Out(H_o, x, i, ub, lb, local=False, active=False):
Expand Down
37 changes: 29 additions & 8 deletions libensemble/libE_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libensemble.message_numbers import \
EVAL_SIM_TAG, FINISHED_PERSISTENT_SIM_TAG, \
EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, \
STOP_TAG, UNSET_TAG, \
STOP_TAG, UNSET_TAG, PERSIS_STOP, \
WORKER_KILL, WORKER_KILL_ON_ERR, WORKER_KILL_ON_TIMEOUT, \
TASK_FAILED, WORKER_DONE, \
MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL
Expand Down Expand Up @@ -128,6 +128,7 @@ def __init__(self, hist, libE_specs, alloc_specs,
self.elapsed = lambda: timer.elapsed
self.wcomms = wcomms
self.WorkerExc = False
self.persis_pending = []
self.W = np.zeros(len(self.wcomms), dtype=Manager.worker_dtype)
self.W['worker_id'] = np.arange(len(self.wcomms)) + 1
self.term_tests = \
Expand Down Expand Up @@ -277,6 +278,7 @@ def _check_received_calc(D_recv):
assert calc_status in [FINISHED_PERSISTENT_SIM_TAG,
FINISHED_PERSISTENT_GEN_TAG,
UNSET_TAG,
PERSIS_STOP,
MAN_SIGNAL_FINISH,
MAN_SIGNAL_KILL,
WORKER_KILL_ON_ERR,
Expand Down Expand Up @@ -314,10 +316,15 @@ def _update_state_on_worker_msg(self, persis_info, D_recv, w):
calc_status = D_recv['calc_status']
Manager._check_received_calc(D_recv)

self.W[w-1]['active'] = 0
if w not in self.persis_pending:
self.W[w-1]['active'] = 0

if calc_status in [FINISHED_PERSISTENT_SIM_TAG,
FINISHED_PERSISTENT_GEN_TAG]:
self.W[w-1]['persis_state'] = 0
if w in self.persis_pending:
self.persis_pending.remove(w)
self.W[w-1]['active'] = 0
else:
if calc_type == EVAL_SIM_TAG:
self.hist.update_history_f(D_recv)
Expand Down Expand Up @@ -371,14 +378,28 @@ def _final_receive_and_kill(self, persis_info):
nonblocking receive is posted (though the manager will not receive this
data) and a kill signal is sent.
"""

# Send a handshake signal to each persistent worker.
if any(self.W['persis_state']):
for w in self.W['worker_id'][self.W['persis_state'] > 0]:
logger.debug("Manager sending PERSIS_STOP to worker {}".format(w))
self.wcomms[w-1].send(PERSIS_STOP, MAN_SIGNAL_KILL)
if not self.W[w-1]['active']:
# Re-activate if necessary
self.W[w-1]['active'] = self.W[w-1]['persis_state']
self.persis_pending.append(w)

exit_flag = 0
while any(self.W['active']) and exit_flag == 0:
while (any(self.W['active']) or any(self.W['persis_state'])) and exit_flag == 0:
persis_info = self._receive_from_workers(persis_info)
if self.term_test(logged=False) == 2 and any(self.W['active']):
logger.manager_warning(_WALLCLOCK_MSG)
sys.stdout.flush()
sys.stderr.flush()
exit_flag = 2
if self.term_test(logged=False) == 2:
# Elapsed Wallclock has expired
if not any(self.W['persis_state']):
if any(self.W['active']):
logger.manager_warning(_WALLCLOCK_MSG)
sys.stdout.flush()
sys.stderr.flush()
exit_flag = 2
if self.WorkerExc:
exit_flag = 1

Expand Down
11 changes: 10 additions & 1 deletion libensemble/libE_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from libensemble.message_numbers import \
EVAL_SIM_TAG, EVAL_GEN_TAG, \
UNSET_TAG, STOP_TAG, CALC_EXCEPTION
UNSET_TAG, STOP_TAG, PERSIS_STOP, CALC_EXCEPTION
from libensemble.message_numbers import MAN_SIGNAL_FINISH
from libensemble.message_numbers import calc_type_strings, calc_status_strings

Expand Down Expand Up @@ -363,6 +363,14 @@ def _handle_calc(self, Work, calc_in):
"Calculation output must be at least two elements."

calc_status = out[2] if len(out) >= 3 else UNSET_TAG

# Check for buffered receive
if self.comm.recv_buffer:
tag, message = self.comm.recv()
if tag in [STOP_TAG, PERSIS_STOP]:
if message is MAN_SIGNAL_FINISH:
calc_status = MAN_SIGNAL_FINISH

return out[0], out[1], calc_status
except Exception:
logger.debug("Re-raising exception from calc")
Expand Down Expand Up @@ -441,6 +449,7 @@ def run(self):
logger.debug("Iteration {}".format(worker_iter))

mtag, Work = self.comm.recv()

if mtag == STOP_TAG:
break

Expand Down
17 changes: 12 additions & 5 deletions libensemble/message_numbers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# --- Tags

UNSET_TAG = 0

# When received by a worker, tells worker to do a sim eval;
# When received by the manager, tells manager that worker is done with sim eval.
EVAL_SIM_TAG = 1

# When received by a worker, tells worker to do a gen eval;
# When received by the manager, tells manager that worker is done with sim eval.
EVAL_GEN_TAG = 2
STOP_TAG = 3
PERSIS_STOP = 4 # manager tells persistent worker to desist

STOP_TAG = 3 # Manager tells worker (or persistent calc) to stop
PERSIS_STOP = 4 # Manager tells persistent calculation to stop

# last_message_number_rst_tag

Expand All @@ -26,14 +33,14 @@
WORKER_KILL = 30 # Worker kills not covered by a more specific case
WORKER_KILL_ON_ERR = 31 # Worker killed due to an error in results
WORKER_KILL_ON_TIMEOUT = 32 # Worker killed on timeout
TASK_FAILED = 33 # Calc had tasks that failed
TASK_FAILED = 33 # Calc had tasks that failed
WORKER_DONE = 34 # Calculation was successful
# last_calc_status_rst_tag
CALC_EXCEPTION = 35 # Reserved: Automatically used if gen_f or sim_f raised an exception.

calc_status_strings = {
FINISHED_PERSISTENT_SIM_TAG: "Persis gen finished",
FINISHED_PERSISTENT_GEN_TAG: "Persis sim finished",
FINISHED_PERSISTENT_SIM_TAG: "Persis sim finished",
FINISHED_PERSISTENT_GEN_TAG: "Persis gen finished",
MAN_SIGNAL_FINISH: "Manager killed on finish",
MAN_SIGNAL_KILL: "Manager killed task",
WORKER_KILL_ON_ERR: " Worker killed task on Error",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
alloc_specs, libE_specs)

if is_master:
assert persis_info[1].get('run_order'), "Run_order should have been given back"
assert flag == 0
assert len(H) >= budget

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
alloc_specs, libE_specs)

if is_master:
assert persis_info[1].get('run_order'), "Run_order should have been given back"
min_ids = np.where(H['local_min'])

# The minima are known on this test problem. If the above [lb,ub] domain is
Expand Down
2 changes: 1 addition & 1 deletion libensemble/tests/unit_tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_qcomm():
assert outq.get() == ('a', 1) and outq.get() == ('b',) and outq.empty(), \
"Check send appropriately goes to output queue."

comm.push_back('b', 0)
comm.push_to_buffer('b', 0)
inq.put(('c', 3))
inq.put(('d',))
assert (comm.recv() == ('b', 0)
Expand Down
4 changes: 2 additions & 2 deletions libensemble/tools/gen_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_mgr_worker_msg(comm, status=None):
"""
tag, Work = comm.recv()
if tag in [STOP_TAG, PERSIS_STOP]:
comm.push_back(tag, Work)
return tag, None, None
comm.push_to_buffer(tag, Work)
return tag, Work, None
_, calc_in = comm.recv()
return tag, Work, calc_in

0 comments on commit aad39be

Please sign in to comment.