Skip to content

Commit

Permalink
Merge pull request #478 from Libensemble/debugging/aposmm_hard_kill_p…
Browse files Browse the repository at this point in the history
…rocesses

removing old code that still hangs in an existing example
  • Loading branch information
jmlarson1 committed Jun 16, 2020
2 parents 23627ce + 4c16d4c commit 05e6361
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 172 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ install:
# Begin: Dependencies only for regression tests
- pip install DFO-LS
- pip install deap
- conda install psutil
- pip install mpmath
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
pip install scikit-build packaging Tasmanian --user;
Expand Down
68 changes: 16 additions & 52 deletions libensemble/gen_funcs/aposmm_localopt_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
__all__ = ['LocalOptInterfacer', 'run_local_nlopt', 'run_local_tao',
'run_local_dfols', 'run_local_scipy_opt', 'run_external_localopt']

import os
import signal
import psutil
import numpy as np
from libensemble.message_numbers import STOP_TAG, EVAL_GEN_TAG # Only used to simulate receiving from manager
from multiprocessing import Event, Process, Queue

import libensemble.gen_funcs

optimizer_list = ['petsc', 'nlopt', 'dfols', 'scipy', 'external']
optimizers = libensemble.gen_funcs.rc.aposmm_optimizers

Expand Down Expand Up @@ -85,11 +84,10 @@ def __init__(self, user_specs, x0, f0, grad0=None):
immediately after creating the class.
"""
self.parent_can_read = Event()

self.parent_can_read = Event()
self.comm_queue = Queue()
self.child_can_read = Event()
self.hard_kill = False

self.x0 = x0.copy()
self.f0 = f0.copy()
Expand All @@ -104,7 +102,6 @@ def __init__(self, user_specs, x0, f0, grad0=None):
run_local_opt = run_local_nlopt
elif user_specs['localopt_method'] in ['pounders', 'blmvm', 'nm']:
run_local_opt = run_local_tao
self.hard_kill = True
elif user_specs['localopt_method'] in ['scipy_Nelder-Mead', 'scipy_COBYLA', 'scipy_BFGS']:
run_local_opt = run_local_scipy_opt
elif user_specs['localopt_method'] in ['dfols']:
Expand Down Expand Up @@ -137,6 +134,7 @@ def iterate(self, data):
:param grad: A numpy array of the function's gradient.
:param fvec: A numpy array of the function's component values.
"""

self.parent_can_read.clear()

if 'grad' in data.dtype.names:
Expand All @@ -153,62 +151,28 @@ def iterate(self, data):
if isinstance(x_new, ErrorMsg):
raise APOSMMException(x_new.x)
elif isinstance(x_new, ConvergedMsg):
self.process.join()
self.comm_queue.close()
self.comm_queue.join_thread()
self.is_running = False
self.close()
else:
x_new = np.atleast_2d(x_new)

return x_new

def destroy(self, previous_x):

count = 0
while not isinstance(previous_x, ConvergedMsg) and count <= 1000:
self.parent_can_read.clear()
if self.grad0 is None:
self.comm_queue.put((previous_x, 0*np.ones_like(self.f0),))
else:
self.comm_queue.put((previous_x, 0*np.ones_like(self.f0), np.zeros_like(self.grad0)))

self.child_can_read.set()
self.parent_can_read.wait()

previous_x = self.comm_queue.get()
count += 1

if not isinstance(previous_x, ConvergedMsg):
if self.hard_kill:
self.kill_process()
else:
self.process.terminate()
self.process.join(timeout=0.2)
if self.process.is_alive():
self.kill_process()

def destroy(self):
"""Recursively kill any optimizer processes still running"""
if self.process.is_alive():
process = psutil.Process(self.process.pid)
for child in process.children(recursive=True):
child.kill()
process.kill()
self.close()

def close(self):
"""Join process and close queue"""
self.process.join()
self.comm_queue.close()
self.comm_queue.join_thread()
self.is_running = False

def kill_process(self):
"""Kill with process.kill() or based on terminate in
https://github.com/python/cpython/blob/3.5/Lib/multiprocessing/popen_fork.py
"""

if hasattr(self.process, 'kill'):
self.process.kill()
else:
if self.process.is_alive():
try:
os.kill(self.process.pid, signal.SIGKILL)
except ProcessLookupError:
pass
except OSError:
if self.process.wait(timeout=0.1) is None:
raise


def run_local_nlopt(user_specs, comm_queue, x0, f0, child_can_read, parent_can_read):
"""
Expand Down
181 changes: 94 additions & 87 deletions libensemble/gen_funcs/persistent_aposmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,107 +134,113 @@ def aposmm(H, persis_info, gen_specs, libE_info):
persis_info['old_runs']: Sequence of indices of points in finished runs
"""
user_specs = gen_specs['user']

n, n_s, rk_const, ld, mu, nu, comm, local_H = initialize_APOSMM(H, user_specs, libE_info)
local_opters, sim_id_to_child_inds, run_order, run_pts, total_runs, fields_to_pass = initialize_children(user_specs)

if user_specs['initial_sample_size'] != 0:
# Send our initial sample. We don't need to check that n_s is large enough:
# the alloc_func only returns when the initial sample has function values.
persis_info = add_k_sample_points_to_local_H(user_specs['initial_sample_size'], user_specs,
persis_info, n, comm, local_H,
sim_id_to_child_inds)
if not user_specs.get('standalone'):
send_mgr_worker_msg(comm, local_H[-user_specs['initial_sample_size']:][[i[0] for i in gen_specs['out']]])
something_sent = True
else:
something_sent = False

tag = None
first_pass = True
while 1:
new_opt_inds_to_send_mgr = []
new_inds_to_send_mgr = []
try:
user_specs = gen_specs['user']
n, n_s, rk_const, ld, mu, nu, comm, local_H = initialize_APOSMM(H, user_specs, libE_info)
local_opters, sim_id_to_child_inds, run_order, run_pts, total_runs, fields_to_pass = initialize_children(user_specs)
if user_specs['initial_sample_size'] != 0:
# Send our initial sample. We don't need to check that n_s is large enough:
# the alloc_func only returns when the initial sample has function values.
persis_info = add_k_sample_points_to_local_H(user_specs['initial_sample_size'], user_specs,
persis_info, n, comm, local_H,
sim_id_to_child_inds)
if not user_specs.get('standalone'):
send_mgr_worker_msg(comm, local_H[-user_specs['initial_sample_size']:][[i[0] for i in gen_specs['out']]])
something_sent = True
else:
something_sent = False

if something_sent:
if user_specs.get('standalone'):
tag, Work, calc_in = simulate_recv_from_manager(local_H, gen_specs)
else:
tag, Work, calc_in = get_mgr_worker_msg(comm)

if tag in [STOP_TAG, PERSIS_STOP]:
clean_up_and_stop(local_H, local_opters, run_order)
persis_info['run_order'] = run_order
break

n_s, n_r = update_local_H_after_receiving(local_H, n, n_s, user_specs, Work, calc_in, fields_to_pass)

for row in calc_in:
if sim_id_to_child_inds.get(row['sim_id']):
# Point came from a child local opt run
for child_idx in sim_id_to_child_inds[row['sim_id']]:
x_new = local_opters[child_idx].iterate(row[fields_to_pass])
if isinstance(x_new, ConvergedMsg):
x_opt = x_new.x
opt_flag = x_new.opt_flag
opt_ind = update_history_optimal(x_opt, opt_flag, local_H, run_order[child_idx])
new_opt_inds_to_send_mgr.append(opt_ind)
local_opters.pop(child_idx)
else:
add_to_local_H(local_H, x_new, user_specs, local_flag=1, on_cube=True)
new_inds_to_send_mgr.append(len(local_H)-1)

run_order[child_idx].append(local_H[-1]['sim_id'])
run_pts[child_idx].append(x_new)
if local_H[-1]['sim_id'] in sim_id_to_child_inds:
sim_id_to_child_inds[local_H[-1]['sim_id']] += (child_idx, )
tag = None
first_pass = True
while 1:
new_opt_inds_to_send_mgr = []
new_inds_to_send_mgr = []

if something_sent:
if user_specs.get('standalone'):
tag, Work, calc_in = simulate_recv_from_manager(local_H, gen_specs)
else:
tag, Work, calc_in = get_mgr_worker_msg(comm)

if tag in [STOP_TAG, PERSIS_STOP]:
# clean_up_and_stop(local_H, local_opters)
clean_up_and_stop(local_opters)
persis_info['run_order'] = run_order
break

n_s, n_r = update_local_H_after_receiving(local_H, n, n_s, user_specs, Work, calc_in, fields_to_pass)

for row in calc_in:
if sim_id_to_child_inds.get(row['sim_id']):
# Point came from a child local opt run
for child_idx in sim_id_to_child_inds[row['sim_id']]:
x_new = local_opters[child_idx].iterate(row[fields_to_pass])
if isinstance(x_new, ConvergedMsg):
x_opt = x_new.x
opt_flag = x_new.opt_flag
opt_ind = update_history_optimal(x_opt, opt_flag, local_H, run_order[child_idx])
new_opt_inds_to_send_mgr.append(opt_ind)
local_opters.pop(child_idx)
else:
sim_id_to_child_inds[local_H[-1]['sim_id']] = (child_idx, )
add_to_local_H(local_H, x_new, user_specs, local_flag=1, on_cube=True)
new_inds_to_send_mgr.append(len(local_H)-1)

starting_inds = decide_where_to_start_localopt(local_H, n, n_s, rk_const, ld, mu, nu)
run_order[child_idx].append(local_H[-1]['sim_id'])
run_pts[child_idx].append(x_new)
if local_H[-1]['sim_id'] in sim_id_to_child_inds:
sim_id_to_child_inds[local_H[-1]['sim_id']] += (child_idx, )
else:
sim_id_to_child_inds[local_H[-1]['sim_id']] = (child_idx, )

for ind in starting_inds:
if len([p for p in local_opters.values() if p.is_running]) < user_specs.get('max_active_runs', np.inf):
local_H['started_run'][ind] = 1
starting_inds = decide_where_to_start_localopt(local_H, n, n_s, rk_const, ld, mu, nu)

# Initialize a local opt run
local_opter = LocalOptInterfacer(user_specs, local_H[ind]['x_on_cube'],
local_H[ind]['f'] if 'f' in fields_to_pass else local_H[ind]['fvec'],
local_H[ind]['grad'] if 'grad' in fields_to_pass else None)
for ind in starting_inds:
if len([p for p in local_opters.values() if p.is_running]) < user_specs.get('max_active_runs', np.inf):
local_H['started_run'][ind] = 1

local_opters[total_runs] = local_opter
# Initialize a local opt run
local_opter = LocalOptInterfacer(user_specs, local_H[ind]['x_on_cube'],
local_H[ind]['f'] if 'f' in fields_to_pass else local_H[ind]['fvec'],
local_H[ind]['grad'] if 'grad' in fields_to_pass else None)

x_new = local_opter.iterate(local_H[ind][fields_to_pass]) # Assuming the second point can't be ruled optimal
local_opters[total_runs] = local_opter

add_to_local_H(local_H, x_new, user_specs, local_flag=1, on_cube=True)
new_inds_to_send_mgr.append(len(local_H)-1)
x_new = local_opter.iterate(local_H[ind][fields_to_pass]) # Assuming the second point can't be ruled optimal

run_order[total_runs] = [ind, local_H[-1]['sim_id']]
run_pts[total_runs] = [local_H['x_on_cube'], x_new]
add_to_local_H(local_H, x_new, user_specs, local_flag=1, on_cube=True)
new_inds_to_send_mgr.append(len(local_H)-1)

if local_H[-1]['sim_id'] in sim_id_to_child_inds:
sim_id_to_child_inds[local_H[-1]['sim_id']] += (total_runs, )
else:
sim_id_to_child_inds[local_H[-1]['sim_id']] = (total_runs, )
run_order[total_runs] = [ind, local_H[-1]['sim_id']]
run_pts[total_runs] = [local_H['x_on_cube'], x_new]

total_runs += 1
if local_H[-1]['sim_id'] in sim_id_to_child_inds:
sim_id_to_child_inds[local_H[-1]['sim_id']] += (total_runs, )
else:
sim_id_to_child_inds[local_H[-1]['sim_id']] = (total_runs, )

if first_pass:
num_samples_needed = persis_info['nworkers'] - 1 - len(new_inds_to_send_mgr)
first_pass = False
else:
num_samples_needed = n_r-len(new_inds_to_send_mgr)
total_runs += 1

if first_pass:
num_samples_needed = persis_info['nworkers'] - 1 - len(new_inds_to_send_mgr)
first_pass = False
else:
num_samples_needed = n_r-len(new_inds_to_send_mgr)

if num_samples_needed > 0:
persis_info = add_k_sample_points_to_local_H(num_samples_needed, user_specs, persis_info, n, comm, local_H, sim_id_to_child_inds)
new_inds_to_send_mgr = new_inds_to_send_mgr + list(range(len(local_H)-num_samples_needed, len(local_H)))
if num_samples_needed > 0:
persis_info = add_k_sample_points_to_local_H(num_samples_needed, user_specs, persis_info, n, comm, local_H, sim_id_to_child_inds)
new_inds_to_send_mgr = new_inds_to_send_mgr + list(range(len(local_H)-num_samples_needed, len(local_H)))

if not user_specs.get('standalone'):
send_mgr_worker_msg(comm, local_H[new_inds_to_send_mgr + new_opt_inds_to_send_mgr][[i[0] for i in gen_specs['out']]])
something_sent = True
if not user_specs.get('standalone'):
send_mgr_worker_msg(comm, local_H[new_inds_to_send_mgr + new_opt_inds_to_send_mgr][[i[0] for i in gen_specs['out']]])
something_sent = True

return local_H, persis_info, FINISHED_PERSISTENT_GEN_TAG
return local_H, persis_info, FINISHED_PERSISTENT_GEN_TAG
finally:
try:
clean_up_and_stop(local_opters)
except NameError:
pass


def update_local_H_after_receiving(local_H, n, n_s, user_specs, Work, calc_in, fields_to_pass):
Expand Down Expand Up @@ -667,14 +673,15 @@ def add_k_sample_points_to_local_H(k, user_specs, persis_info, n, comm, local_H,
return persis_info


def clean_up_and_stop(local_H, local_opters, run_order):
# def clean_up_and_stop(local_H, local_opters):
def clean_up_and_stop(local_opters):
# FIXME: This has to be a clean exit.

# print('[Parent]: The optimal points and values are:\n',
# local_H[np.where(local_H['local_min'])][['x', 'f']], flush=True)

for i, p in local_opters.items():
p.destroy(local_H['x_on_cube'][run_order[i][-1]])
p.destroy()


# def display_exception(e):
Expand Down
Loading

0 comments on commit 05e6361

Please sign in to comment.