Skip to content

Commit

Permalink
Merge branch 'develop' into experimental/warpX
Browse files Browse the repository at this point in the history
  • Loading branch information
jmlarson1 committed Apr 17, 2020
2 parents 1377972 + 41198d6 commit b58c58e
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 47 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ install:
# Begin: Dependencies only for regression tests
- pip install DFO-LS
- pip install deap
- pip install scikit-build packaging Tasmanian --user
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
pip install scikit-build packaging Tasmanian --user;
fi
# End: dependencies only for regression tests
#
- pip install flake8
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Resources
.. after_resources_rst_tag
.. _Balsam: https://www.alcf.anl.gov/balsam
.. _Balsam: https://www.alcf.anl.gov/support-center/theta/balsam
.. _Coveralls: https://coveralls.io/github/Libensemble/libensemble?branch=master
.. _DFO-LS: https://github.com/numericalalgorithmsgroup/dfols
.. _GitHub: https://github.com/Libensemble/libensemble
Expand Down
13 changes: 6 additions & 7 deletions docs/executor/mpi_executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ Class-specific attributes can be set directly to alter the behavior of the MPI
executor. However, they should be used with caution, because they may not
be implemented in other executors.

:max_submit_attempts: (int) Maximum number of submission attempts for a given
:max_submit_attempts: (int) Maximum number of launch attempts for a given
task. *Default: 5*.
:fail_time: (int) *Only if wait_on_run is set.* Maximum run time to failure in
:fail_time: (int or float) *Only if wait_on_run is set.* Maximum run time to failure in
seconds that results in relaunch. *Default: 2*.
:retry_delay_incr: (int or float) Delay increment between launch attempts in seconds.
*Default: 5*. (E.g. First retry after 5 seconds, then 10 seconds, then 15, etc...)

Example. To increase resilience against submission failures::

taskctrl = MPIExecutor()
taskctrl.max_launch_attempts = 10
taskctrl.max_launch_attempts = 8
taskctrl.fail_time = 5

Note that the retry delay on launches starts at 5 seconds and increments by
5 seconds for each retry. So the 4th retry will wait for 20 seconds before
relaunching.
taskctrl.retry_delay_incr = 10
2 changes: 1 addition & 1 deletion docs/introduction_latex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ We now present further information on running and testing libEnsemble.
:start-after: before_dependencies_rst_tag
:end-before: after_resources_rst_tag

.. _Balsam: https://www.alcf.anl.gov/balsam
.. _Balsam: https://www.alcf.anl.gov/support-center/theta/balsam
.. _Coveralls: https://coveralls.io/github/Libensemble/libensemble?branch=master
.. _DFO-LS: https://github.com/numericalalgorithmsgroup/dfols
.. _GitHub: https://github.com/Libensemble/libensemble
Expand Down
8 changes: 4 additions & 4 deletions docs/platforms/theta.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ queue interactively, use ::
Additional Information
----------------------

See the ALCF guides_ on XC40 systems for more information about Theta.
See the ALCF `Support Center`_ for more information about Theta.

Read the documentation for Balsam here_.

.. _ALCF: https://www.alcf.anl.gov/
.. _Theta: https://www.alcf.anl.gov/theta
.. _Balsam: https://www.alcf.anl.gov/balsam
.. _Cobalt: https://www.alcf.anl.gov/cobalt-scheduler
.. _guides: https://www.alcf.anl.gov/user-guides/computational-systems
.. _Balsam: https://www.alcf.anl.gov/support-center/theta/balsam
.. _Cobalt: https://www.alcf.anl.gov/support-center/theta/submit-job-theta
.. _`Support Center`: https://www.alcf.anl.gov/support-center/theta
.. _here: https://balsam.readthedocs.io/en/latest/
.. _Miniconda: https://docs.conda.io/en/latest/miniconda.html
.. _conda: https://conda.io/en/latest/
Expand Down
1 change: 1 addition & 0 deletions libensemble/alloc_funcs/persistent_aposmm_alloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info
elif persis_info.get('gen_started') is None:
# Finally, call a persistent generator as there is nothing else to do.
persis_info['gen_started'] = True
persis_info[i]['nworkers'] = len(W)

gen_work(Work, i, gen_specs['in'], range(len(H)), persis_info[i],
persistent=True)
Expand Down
3 changes: 2 additions & 1 deletion libensemble/executors/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(self, auto_resources=True,
# MPI launch settings
self.max_launch_attempts = 5
self.fail_time = 2
self.retry_delay_incr = 5 # Incremented wait after each launch attempt

mpi_commands = {
'mpich': ['mpirun', '--env {env}', '-machinefile {machinefile}',
Expand Down Expand Up @@ -204,7 +205,7 @@ def _launch_with_retries(self, task, runline, wait_on_run):

if retry and retry_count < self.max_launch_attempts:
logger.debug('Retry number {} for task {}')
time.sleep(retry_count*5)
time.sleep(retry_count*self.retry_delay_incr)
task.reset() # Some cases may require user cleanup - currently not supported (could use callback)
else:
break
Expand Down
22 changes: 21 additions & 1 deletion libensemble/gen_funcs/persistent_aposmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def aposmm(H, persis_info, gen_specs, libE_info):
total_runs += 1

if first_pass:
num_samples_needed = user_specs.get('num_pts_first_pass', 1) - len(new_inds_to_send_mgr)
num_samples_needed = persis_info['nworkers'] - 1 - len(new_inds_to_send_mgr)
first_pass = False
else:
num_samples_needed = n_r-len(new_inds_to_send_mgr)
Expand Down Expand Up @@ -858,6 +858,24 @@ def update_history_dist(H, n):
# if not H['local_pt'][new_ind]:
# H['worse_within_rk'][H['dist_to_all'] > r_k] = False

if np.any(~H['local_pt']) and not np.any(np.isinf(H['dist_to_better_s'][~H['local_pt']])):
# Our best sample point was not identified because the min was not unique.
min_inds = H['f'][~H['local_pt']] == np.min(H['f'][~H['local_pt']])
assert len(min_inds) >= 2, "Check this"
# Take the first point with this value to be the best sample point
best_samp = H['sim_id'][~H['local_pt']][min_inds][0]
H['dist_to_better_s'][best_samp] = np.inf
H['ind_of_better_s'][best_samp] = -1

# if np.any(H['local_pt']) and not np.any(np.isinf(H['dist_to_better_l'][H['local_pt']])):
# # Our best sample point was not identified because the min was not unique.
# min_inds = H['f'][H['local_pt']] == np.min(H['f'][H['local_pt']])
# assert len(min_inds) >= 2, "Check this"
# # Take the first point with this value to be the best sample point
# best_local = H['sim_id'][H['local_pt']][min_inds][0]
# H['dist_to_better_l'][best_local] = np.inf
# H['ind_of_better_l'][best_local] = -1


def update_history_optimal(x_opt, H, run_inds):
"""
Expand Down Expand Up @@ -1086,6 +1104,8 @@ def initialize_APOSMM(H, user_specs, libE_info):

n_s = np.sum(~local_H['local_pt'])

assert n_s > 0 or user_specs['initial_sample_size'] > 0, "APOSMM requires a positive initial_sample_size, or some existing points in order to determine where to start local optimization runs."

return n, n_s, rk_c, ld, mu, nu, comm, local_H


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
'xtol_rel': 1e-6,
'ftol_rel': 1e-6,
'max_active_runs': 6,
'num_pts_first_pass': nworkers-1,
'lb': np.array([-3, -2]),
'ub': np.array([3, 2])}
}
Expand All @@ -74,6 +73,8 @@
('x_on_cube', float, n), ('returned', bool),
('f', float), ('given_back', bool), ('given', bool)])

# Two points in the following sample have the same best function value, which
# tests the corner case for some APOSMM logic
H0['x'] = np.round(minima, 1)
H0['x_on_cube'] = (H0['x']-gen_specs['user']['lb']) / (gen_specs['user']['ub']-gen_specs['user']['lb'])
H0['sim_id'] = range(sample_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: local
# TESTSUITE_NPROCS: 4
# TESTSUITE_OS_SKIP: OSX

import sys
import numpy as np
Expand Down
2 changes: 1 addition & 1 deletion libensemble/tests/scaling_tests/forces/cleanup.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rm -r ensemble sim_* *.npy *.pickle ensemble.log lib*.txt
rm -r ensemble *.npy *.pickle ensemble.log lib*.txt
27 changes: 16 additions & 11 deletions libensemble/tests/scaling_tests/forces/forces_simf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
MAX_SEED = 32767


class ForcesException(Exception):
"""Custom forces exception"""


def perturb(particles, seed, max_fraction):
"""Modify particle count"""
seed_fraction = seed/MAX_SEED
Expand Down Expand Up @@ -42,11 +46,12 @@ def run_forces(H, persis_info, sim_specs, libE_info):
# keys = variable names
# x = variable values
# output = what will be returned to libE
if sim_specs['user']['fail_on_sim']:
raise ForcesException

calc_status = 0 # Returns to worker

x = H['x']
# simdir_basename = sim_specs['user']['simdir_basename']
# keys = sim_specs['user']['keys']
sim_particles = sim_specs['user']['sim_particles']
sim_timesteps = sim_specs['user']['sim_timesteps']
Expand All @@ -65,22 +70,24 @@ def run_forces(H, persis_info, sim_specs, libE_info):
print('seed: {} particles: {}'.format(seed, sim_particles))

# At this point you will be in the sim directory (really worker dir) for this worker (eg. sim_1).
# The simdir below is created for each task for this worker.
# Any input needs to be copied into this directory. Currently there is none.
# simdir = simdir_basename + '_' + keys[0] + '_' + str(seed)
# simdir = make_unique_simdir(simdir)
# os.mkdir(simdir)
# os.chdir(simdir)
exctr = Executor.executor # Get Executor

args = str(int(sim_particles)) + ' ' + str(sim_timesteps) + ' ' + str(seed) + ' ' + str(kill_rate)
# task = exctr.submit(calc_type='sim', num_procs=cores, app_args=args, stdout='out.txt', stderr='err.txt')

machinefile = None
if sim_specs['user']['fail_on_submit']:
machinefile = 'fail'

# Machinefile only used here for exception testing
if cores:
task = exctr.submit(calc_type='sim', num_procs=cores, app_args=args,
stdout='out.txt', stderr='err.txt', wait_on_run=True)
stdout='out.txt', stderr='err.txt', wait_on_run=True,
machinefile=machinefile)
else:
task = exctr.submit(calc_type='sim', app_args=args, stdout='out.txt',
stderr='err.txt', wait_on_run=True) # Auto-partition
stderr='err.txt', wait_on_run=True, hyperthreads=True,
machinefile=machinefile) # Auto-partition

# Stat file to check for bad runs
statfile = 'forces.stat'
Expand Down Expand Up @@ -115,8 +122,6 @@ def run_forces(H, persis_info, sim_specs, libE_info):
else:
print("Warning: Task {} in unknown state {}. Error code {}".format(task.name, task.state, task.errcode))

# os.chdir('../')

time.sleep(0.2)
try:
data = np.loadtxt(filepath)
Expand Down
50 changes: 32 additions & 18 deletions libensemble/tests/scaling_tests/forces/run_libe_forces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# Import libEnsemble modules
from libensemble.libE import libE
from libensemble.libE_manager import ManagerException
from libensemble.tools import parse_args, save_libE_output, add_unique_random_streams
from libensemble import libE_logger

Expand All @@ -18,6 +19,14 @@
from libensemble.gen_funcs.sampling import uniform_random_sample as gen_f
from libensemble.alloc_funcs.give_sim_work_first import give_sim_work_first as alloc_f


def test_libe_stats(status):
with open('libE_stats.txt', 'r') as ls:
out = ls.readlines()
assert all([line.endswith(status) for line in out if 'sim' in line]), \
"Deliberate error status not logged or raised for all sim instances."


libE_logger.set_level('INFO') # INFO is now default

nworkers, is_master, libE_specs, _ = parse_args()
Expand Down Expand Up @@ -52,33 +61,28 @@
sim_specs = {'sim_f': run_forces, # Function whose output is being minimized
'in': ['x'], # Name of input for sim_f
'out': [('energy', float)], # Name, type of output from sim_f
'user': {'simdir_basename': 'forces',
'keys': ['seed'],
'user': {'keys': ['seed'],
'cores': 2,
'sim_particles': 1e3,
'sim_timesteps': 5,
'sim_kill_minutes': 10.0,
'particle_variance': 0.2,
'kill_rate': 0.5}
'kill_rate': 0.5,
'fail_on_sim': False,
'fail_on_submit': False} # Won't occur if 'fail_on_sim' True
}
# end_sim_specs_rst_tag

# State the generating function, its arguments, output, and necessary parameters.
gen_specs = {'gen_f': gen_f, # Generator function
'in': ['sim_id'], # Generator input
'in': [], # Generator input
'out': [('x', float, (1,))], # Name, type and size of data produced (must match sim_specs 'in')
'user': {'lb': np.array([0]), # Lower bound for random sample array (1D)
'ub': np.array([32767]), # Upper bound for random sample array (1D)
'gen_batch_size': 1000, # How many random samples to generate in one call
}
}

alloc_specs = {'alloc_f': alloc_f,
'out': [('allocated', bool)],
'user': {'batch_mode': True, # If true wait for all sims to process before generate more
'num_active_gens': 1} # Only one active generator at a time
}

if PERSIS_GEN:
alloc_specs = {'alloc_f': alloc_f, 'out': [('given_back', bool)]}
else:
Expand All @@ -101,11 +105,21 @@
persis_info = {}
persis_info = add_unique_random_streams(persis_info, nworkers + 1)

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria,
persis_info=persis_info,
alloc_specs=alloc_specs,
libE_specs=libE_specs)

# Save results to numpy file
if is_master:
save_libE_output(H, persis_info, __file__, nworkers)
try:
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria,
persis_info=persis_info,
alloc_specs=alloc_specs,
libE_specs=libE_specs)

except ManagerException:
if is_master and sim_specs['user']['fail_on_sim']:
with open('ensemble.log', 'r') as el:
out = el.readlines()
assert 'forces_simf.ForcesException\n' in out, \
"ForcesException not received by manager or logged."
test_libe_stats('Exception occurred\n')
else:
if is_master:
save_libE_output(H, persis_info, __file__, nworkers)
if sim_specs['user']['fail_on_submit']:
test_libe_stats('Task Failed\n')

0 comments on commit b58c58e

Please sign in to comment.