Skip to content

Commit

Permalink
Merge pull request #989 from Libensemble/refactor/discussion_fixes
Browse files Browse the repository at this point in the history
Don't error on COMM_NULL, restore splitcomm/subcomm tests
  • Loading branch information
jlnav committed Apr 6, 2023
2 parents a6a73d7 + 90e2df0 commit 659b936
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 20 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,6 @@ jobs:
run: |
mv libensemble/tests/unit_tests/mpich-only_test_api.py libensemble/tests/unit_tests/test_alt_api.py
- name: Experiment with running multiple pytest workers
run: |
pip install pytest-xdist
cd libensemble/tests/unit_tests; pytest -n auto --runextra .
cd -
- name: Remove ytopt-heffte test on Python 3.11 (easy way)
if: matrix.python-version == '3.11'
run: |
Expand Down
15 changes: 15 additions & 0 deletions libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,24 @@ def comms_abort(mpi_comm):
mpi_comm.Abort(1) # Exit code 1 to represent an abort


def libE_mpi_defaults(libE_specs):
"""Fill in default values for MPI-based communicators."""
from mpi4py import MPI

if "mpi_comm" not in libE_specs:
libE_specs["mpi_comm"] = MPI.COMM_WORLD # Will be duplicated immediately

return libE_specs, MPI.COMM_NULL


def libE_mpi(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0):
"""MPI version of the libE main routine"""

libE_specs, mpi_comm_null = libE_mpi_defaults(libE_specs)

if libE_specs["mpi_comm"] == mpi_comm_null:
return [], persis_info, 3 # Process not in mpi_comm

with DupComm(libE_specs["mpi_comm"]) as mpi_comm:
is_manager = mpi_comm.Get_rank() == 0

Expand Down
4 changes: 2 additions & 2 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,9 @@ def check_any_workers_and_disable_rm_if_tcp(cls, values: Dict[str, Any]) -> Dict
@root_validator
def set_defaults_on_mpi(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if values.get("comms") == "mpi":
if not values.get("mpi_comm"):
from mpi4py import MPI
from mpi4py import MPI

if values.get("mpi_comm") is None: # not values.get("mpi_comm") is True ???
values["mpi_comm"] = MPI.COMM_WORLD
return values

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_COMMS: mpi local
# TESTSUITE_NPROCS: 2 4
# TESTSUITE_EXTRA: true

Expand Down
63 changes: 63 additions & 0 deletions libensemble/tests/functionality_tests/test_1d_splitcomm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Runs libEnsemble with Latin hypercube sampling on a simple 1D problem
Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_1d_sampling.py
python test_1d_sampling.py --nworkers 3 --comms local
python test_1d_sampling.py --nworkers 3 --comms tcp
The number of concurrent evaluations of the objective function will be 4-1=3.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi
# TESTSUITE_NPROCS: 4

import numpy as np

from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f

# Import libEnsemble items for this test
from libensemble.libE import libE
from libensemble.sim_funcs.one_d_func import one_d_example as sim_f
from libensemble.tests.regression_tests.common import mpi_comm_split
from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":
num_comms = 2 # Must have at least num_comms*2 processors
nworkers, is_manager, libE_specs, _ = parse_args()
libE_specs["mpi_comm"], sub_comm_number = mpi_comm_split(num_comms)
is_manager = libE_specs["mpi_comm"].Get_rank() == 0

libE_specs["save_every_k_gens"] = 300
libE_specs["safe_mode"] = False
libE_specs["disable_log_files"] = True

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
}

gen_specs = {
"gen_f": gen_f,
"out": [("x", float, (1,))],
"user": {
"gen_batch_size": 500,
"lb": np.array([-3]),
"ub": np.array([3]),
},
}

persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234)

exit_criteria = {"gen_max": 501}

# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

if is_manager:
assert len(H) >= 501
print("\nlibEnsemble with random sampling has generated enough points")
save_libE_output(H, persis_info, __file__, nworkers)
73 changes: 73 additions & 0 deletions libensemble/tests/functionality_tests/test_1d_subcomm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Runs libEnsemble with Latin hypercube sampling on a simple 1D problem
Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_1d_sampling.py
python test_1d_sampling.py --nworkers 3 --comms local
python test_1d_sampling.py --nworkers 3 --comms tcp
The number of concurrent evaluations of the objective function will be 4-1=3.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi
# TESTSUITE_NPROCS: 4

import numpy as np

from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f

# Import libEnsemble items for this test
from libensemble.libE import libE
from libensemble.sim_funcs.one_d_func import one_d_example as sim_f
from libensemble.tests.regression_tests.common import mpi_comm_excl
from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":

nworkers, is_manager, libE_specs, _ = parse_args()
libE_specs["mpi_comm"], mpi_comm_null = mpi_comm_excl()

if libE_specs["mpi_comm"] == mpi_comm_null:
is_excluded = True
is_manager = False
else:
is_manager = libE_specs["mpi_comm"].Get_rank() == 0
is_excluded = False

libE_specs["save_every_k_gens"] = 300
libE_specs["safe_mode"] = False
libE_specs["disable_log_files"] = True

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
}

gen_specs = {
"gen_f": gen_f,
"out": [("x", float, (1,))],
"user": {
"gen_batch_size": 500,
"lb": np.array([-3]),
"ub": np.array([3]),
},
}

persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234)

exit_criteria = {"gen_max": 501}

# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

if is_manager:
assert len(H) >= 501
assert flag == 0
print("\nlibEnsemble with random sampling has generated enough points")
save_libE_output(H, persis_info, __file__, nworkers)

elif is_excluded:
assert flag == 3
19 changes: 10 additions & 9 deletions libensemble/tests/unit_tests/test_libE_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from libensemble.libE import libE
from libensemble.manager import LoggedException
from libensemble.resources.resources import Resources
from libensemble.tests.regression_tests.common import mpi_comm_excl


class MPIAbortException(Exception):
Expand Down Expand Up @@ -141,14 +142,14 @@ def test_exception_raising_check_inputs():
pytest.fail("Expected ValidationError exception")


# def test_proc_not_in_communicator():
# """Checking proc not in communicator returns exit status of 3"""
# libE_specs = {}
# libE_specs["mpi_comm"], mpi_comm_null = mpi_comm_excl()
# H, _, flag = libE(
# {"in": ["x"], "out": [("f", float)]}, {"out": [("x", float)]}, {"sim_max": 1}, libE_specs=libE_specs
# )
# assert flag == 3, "libE return flag should be 3. Returned: " + str(flag)
def test_proc_not_in_communicator():
"""Checking proc not in communicator returns exit status of 3"""
libE_specs = {}
libE_specs["mpi_comm"], mpi_comm_null = mpi_comm_excl()
H, _, flag = libE(
{"in": ["x"], "out": [("f", float)]}, {"out": [("x", float)]}, {"sim_max": 1}, libE_specs=libE_specs
)
assert flag == 3, "libE return flag should be 3. Returned: " + str(flag)


# def test_exception_raising_worker():
Expand Down Expand Up @@ -187,5 +188,5 @@ def test_logging_disabling():
test_exception_raising_manager_with_abort()
test_exception_raising_manager_no_abort()
test_exception_raising_check_inputs()
# test_proc_not_in_communicator()
test_proc_not_in_communicator()
test_logging_disabling()
10 changes: 8 additions & 2 deletions libensemble/utils/specs_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
Reference the models in that file.
"""

import logging

import numpy as np

from libensemble.tools.fields_keys import libE_fields

logger = logging.getLogger(__name__)


def _check_exit_criteria(values: dict) -> dict:
if "stop_val" in values.get("exit_criteria"):
Expand Down Expand Up @@ -101,6 +105,8 @@ def __get_validators__(cls):
def validate(cls, comm: "MPIComm") -> "MPIComm": # noqa: F821
from mpi4py import MPI

assert comm != MPI.COMM_NULL, "Provided MPI communicator is invalid, is MPI.COMM_NULL"
assert comm.Get_size() > 1, "Manager only - must be at least one worker (2 MPI tasks)"
if comm == MPI.COMM_NULL:
logger.manager_warning("*WARNING* libEnsemble detected a NULL communicator")
else:
assert comm.Get_size() > 1, "Manager only - must be at least one worker (2 MPI tasks)"
return comm

0 comments on commit 659b936

Please sign in to comment.