Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions libensemble/gen_funcs/persistent_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

__all__ = [
"persistent_uniform",
"persistent_uniform_final_update",
"persistent_request_shutdown",
"uniform_nonblocking",
"batched_history_matching",
Expand Down Expand Up @@ -51,38 +52,74 @@ def persistent_uniform(_, persis_info, gen_specs, libE_info):
return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG


# SH TODO Do we set f_est as we go -> if so, need to keep all H rows here - init by e.g. persis_info('max_rows')
def persistent_uniform_final_update(_, persis_info, gen_specs, libE_info):
"""
This generation function is similar in structure to persistent_uniform,
but it sets the sim_ids for the manager, and returns a final update
of the value ``f_est``.
Assuming the value ``"f"`` returned from sim_f is stochastic, this
generation is updating an estimated mean ``"f_est"`` of the sim_f output at
each of the corners of the domain.

.. seealso::
`test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_adv.py>`_
`test_persistent_uniform_sampling_running_mean.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_running_mean.py>`_
""" # noqa

b, n, lb, ub = _get_user_params(gen_specs["user"])
ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
next_id = 0

# Send batches until manager sends stop tag
def generate_corners(x, y):
n = len(x)
corner_indices = np.arange(2**n)
corners = []
for index in corner_indices:
corner = [x[i] if index & (1 << i) else y[i] for i in range(n)]
corners.append(corner)
return corners

def sample_corners_with_probability(corners, p, b):
selected_corners = np.random.choice(len(corners), size=b, p=p)
sampled_corners = [corners[i] for i in selected_corners]
return sampled_corners, selected_corners

corners = generate_corners(lb, ub)

# Start with equal probabilies
p = np.ones(2**n) / 2**n

running_total = np.nan * np.ones(2**n)
number_of_samples = np.zeros(2**n)
sent = np.array([], dtype=int)

# Send batches of `b` points until manager sends stop tag
tag = None
next_id = 0
while tag not in [STOP_TAG, PERSIS_STOP]:
H_o = np.zeros(b, dtype=gen_specs["out"])
H_o["sim_id"] = range(next_id, next_id + b)
next_id += b
H_o["x"] = persis_info["rand_stream"].uniform(lb, ub, (b, n))
H_o["f_est"] = 0

sampled_corners, corner_ids = sample_corners_with_probability(corners, p, b)

H_o["corner_id"] = corner_ids
H_o["x"] = sampled_corners
sent = np.append(sent, corner_ids)

tag, Work, calc_in = ps.send_recv(H_o)
if hasattr(calc_in, "__len__"):
b = len(calc_in)
for row in calc_in:
number_of_samples[row["corner_id"]] += 1
if np.isnan(running_total[row["corner_id"]]):
running_total[row["corner_id"]] = row["f"]
else:
running_total[row["corner_id"]] += row["f"]

# Having received a PERSIS_STOP, update f_est field for all points and return
# For manager to honor final H_o return, must have set libE_specs["use_persis_return_gen"] = True
H_o = np.zeros(next_id, dtype=[("sim_id", int), ("f_est", float)])
H_o["sim_id"] = range(next_id)
H_o["f_est"] = -1.23
f_est = running_total / number_of_samples
H_o = np.zeros(len(sent), dtype=[("sim_id", int), ("corner_id", int), ("f_est", float)])
for count, i in enumerate(sent):
H_o["sim_id"][count] = count
H_o["corner_id"][count] = i
H_o["f_est"][count] = f_est[i]

return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG

Expand Down
7 changes: 5 additions & 2 deletions libensemble/sim_funcs/six_hump_camel.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ def six_hump_camel_simple(x, _, sim_specs):

H_o = np.zeros(1, dtype=sim_specs["out"])

H_o["f"] = six_hump_camel_func(x[0][0])
H_o["f"] = six_hump_camel_func(x[0][0][:2]) # Ignore more than 2 entries of x

if "pause_time" in sim_specs["user"]:
if sim_specs["user"].get("pause_time"):
time.sleep(sim_specs["user"]["pause_time"])

if sim_specs["user"].get("rand"):
H_o["f"] += np.random.normal(0, 1)

return H_o


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@

base_libE_specs = libE_specs.copy()
for run in range(5):

# reset
libE_specs = base_libE_specs.copy()
persis_info = add_unique_random_streams({}, nworkers + 1)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Tests the ability of libEnsemble to
- give back history entries from the a shutting-down persistent gen

Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_persistent_uniform_sampling_adv.py
python test_persistent_uniform_sampling_running_mean.py --nworkers 3 --comms local
python test_persistent_uniform_sampling_running_mean.py --nworkers 3 --comms tcp

When running with the above commands, the number of concurrent evaluations of
the objective function will be 2, as one of the three workers will be the
persistent generator.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local
# TESTSUITE_NPROCS: 3 4

import sys

import numpy as np

from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f
from libensemble.gen_funcs.persistent_sampling import persistent_uniform_final_update as gen_f
from libensemble.libE import libE
from libensemble.sim_funcs.six_hump_camel import six_hump_camel_simple as sim_f
from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":
nworkers, is_manager, libE_specs, _ = parse_args()

libE_specs["use_persis_return_gen"] = True

if nworkers < 2:
sys.exit("Cannot run with a persistent worker if only one worker -- aborting...")

n = 3
sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
"user": {"rand": True},
}

gen_specs = {
"gen_f": gen_f,
"persis_in": ["f", "x", "corner_id", "sim_id"],
"out": [("sim_id", int), ("corner_id", int), ("x", float, (n,)), ("f_est", float)],
"user": {
"initial_batch_size": 20,
"lb": np.array([-3, -2, -1]),
"ub": np.array([3, 2, 1]),
},
}

alloc_specs = {"alloc_f": alloc_f}

sim_max = 120
exit_criteria = {"sim_max": sim_max}
libE_specs["final_gen_send"] = True

persis_info = add_unique_random_streams({}, nworkers + 1)
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs)

if is_manager:
assert np.all(H["f_est"][0:sim_max] != 0), "The persistent gen should have set these at shutdown"
assert np.all(H["gen_informed"][0:sim_max]), "Need to mark the gen having been informed."
save_libE_output(H, persis_info, __file__, nworkers)
1 change: 0 additions & 1 deletion libensemble/tests/functionality_tests/test_workflow_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
stats_lens = []

for i in range(2):

libE_specs["workflow_dir_path"] = (
"./test_workflow" + str(i) + "_nworkers" + str(nworkers) + "_comms-" + libE_specs["comms"]
)
Expand Down
1 change: 0 additions & 1 deletion libensemble/tests/regression_tests/test_1d_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

sampling = Ensemble(parse_args=True)
sampling.libE_specs = LibeSpecs(save_every_k_gens=300, safe_mode=False, disable_log_files=True)
sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], outputs=[("f", float)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

# Get paths for applications to run
six_hump_camel_app = six_hump_camel.__file__
exctr = MPIExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

# Get paths for applications to run
six_hump_camel_app = six_hump_camel.__file__
exctr = MPIExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

n_samp = 1000
H0 = np.zeros(n_samp, dtype=[("x", float, 8), ("sim_id", int), ("sim_started", bool)])
np.random.seed(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

samp = 1000
n = 8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

x0 = np.array([1.23, 4.56]) # point about which we are calculating finite difference parameters
f0 = noisy_function(x0)
n = len(x0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from libensemble.tools import add_unique_random_streams

if __name__ == "__main__":

n_init_thetas = 15 # Initial batch of thetas
n_x = 25 # No. of x values
nparams = 4 # No. of theta params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,28 @@
# TESTSUITE_NPROCS: 4
# TESTSUITE_EXTRA: true

import sys
import multiprocessing
import sys

import numpy as np

from libensemble.alloc_funcs.persistent_aposmm_alloc import persistent_aposmm_alloc as alloc_f
from libensemble.executors import MPIExecutor
from libensemble.gen_funcs.persistent_aposmm import aposmm as gen_f

# Import libEnsemble items for this test
from libensemble.libE import libE
from libensemble.sim_funcs import six_hump_camel
from libensemble.sim_funcs.var_resources import multi_points_with_variable_resources as sim_f
from libensemble.tools import add_unique_random_streams, parse_args

# For OpenMPI must not have these lines, allowing PETSc to import in function.
# import libensemble.gen_funcs
# libensemble.gen_funcs.rc.aposmm_optimizers = "petsc"

from libensemble.gen_funcs.persistent_aposmm import aposmm as gen_f
from libensemble.sim_funcs.var_resources import multi_points_with_variable_resources as sim_f
from libensemble.alloc_funcs.persistent_aposmm_alloc import persistent_aposmm_alloc as alloc_f
from libensemble.tools import parse_args, add_unique_random_streams
from libensemble.executors import MPIExecutor
from libensemble.sim_funcs import six_hump_camel

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

multiprocessing.set_start_method("fork", force=True)

nworkers, is_manager, libE_specs, _ = parse_args()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
from libensemble.gen_funcs.persistent_sampling import persistent_uniform as gen_f
from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs


if __name__ == "__main__":

# Initialize MPI Executor
exctr = MPIExecutor()
sim_app = os.path.join(os.getcwd(), "../forces_app/forces.x")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
from libensemble.gen_funcs.persistent_sampling_var_resources import uniform_sample_with_var_gpus as gen_f
from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs


if __name__ == "__main__":

# Initialize MPI Executor
exctr = MPIExecutor()
sim_app = os.path.join(os.getcwd(), "../forces_app/forces.x")
Expand Down
Loading