Skip to content

Commit

Permalink
Merge branch 'develop' into docs/prev1_propose_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jlnav committed Sep 6, 2023
2 parents 1ba09a9 + 1b8ac02 commit 2752917
Show file tree
Hide file tree
Showing 17 changed files with 366 additions and 62 deletions.
1 change: 1 addition & 0 deletions .spell
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ apoints
numer
hist
inout
slac
10 changes: 10 additions & 0 deletions docs/data_structures/libE_specs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl
By default resources will be divided by workers (excluding
``zero_resource_workers``).

"gen_num_procs" [int] = ``0``:
The default number of processors (MPI ranks) required by generators. Unless
overridden by equivalent `persis_info` settings, generators will be allocated
this many processors for applications launched via the MPIExecutor.

"gen_num_gpus" [int] = ``0``:
The default number of GPUs required by generators. Unless overridden by
the equivalent `persis_info` settings, generators will be allocated this
many GPUs.

"enforce_worker_core_bounds" [bool] = ``False``:
Permit submission of tasks with a
higher processor count than the CPUs available to the worker.
Expand Down
15 changes: 11 additions & 4 deletions docs/resource_manager/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,17 @@ if ``split2fit`` is *False*, as this could otherwise never be scheduled.
Varying generator resources
^^^^^^^^^^^^^^^^^^^^^^^^^^^

For all supporting allocation functions, setting the ``persis_info["gen_resources"]``
to an integer value will provide resource sets to generators when they are started,
with the default to provide no resources. This could be set in the calling script
or inside the allocation function.
By default, generators are not allocated resources in dynamic mode. Fixed resources
for the generator can be set using the *libE_specs* options
``gen_num_procs`` and ``gen_num_gpus``, which takes an integer value.
If only ``gen_num_gpus`` is set, then number of processors will match.

To vary generator resources, ``persis_info`` settings can be used in allocation
functions before calling the ``gen_work`` support function. This takes the
same options (``gen_num_procs`` and ``gen_num_gpus``)

Alternatively, the setting ``persis_info["gen_resources"]`` can also be set to
a number of resource sets.

Note that persistent workers maintain their resources until coming out of a
persistent state.
Expand Down
14 changes: 11 additions & 3 deletions docs/resource_manager/zero_resource_workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ worker for the persistent generator - a common use-case.

In general, the number of resource sets should be set to enable the maximum
concurrency desired by the ensemble, taking into account generators and simulators.
Users can set generator resources by setting ``persis_info["gen_resources"]``
to an integer value, representing the number of resource sets to give to the
generator. The default is zero.

Users can set generator resources using the *libE_specs* options
``gen_num_procs`` and/or ``gen_num_gpus``, which take an integer values.
If only ``gen_num_gpus`` is set, then number of processors will match.

To vary generator resources, ``persis_info`` settings can be used in allocation
functions before calling the ``gen_work`` support function. This takes the
same options (``gen_num_procs`` and ``gen_num_gpus``).

Alternatively, the setting ``persis_info["gen_resources"]`` can also be set to
a number of resource sets.

The available nodes are always divided by the number of resource sets, and there
may be multiple nodes or a partition of a node in each resource set. If the split
Expand Down
23 changes: 18 additions & 5 deletions libensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml

from libensemble import logger
from libensemble.executors import Executor
from libensemble.libE import libE
from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs
from libensemble.tools import add_unique_random_streams
Expand Down Expand Up @@ -228,18 +229,23 @@ class Ensemble:
Tell libEnsemble when to stop a run
persis_info: :obj:`dict`, optional
libE_specs: :obj:`dict` or :class:`LibeSpecs<libensemble.specs.libeSpecs>`, optional
Persistent information to be passed between user function instances
:doc:`(example)<data_structures/persis_info>`
Specifications for libEnsemble
alloc_specs: :obj:`dict` or :class:`AllocSpecs<libensemble.specs.AllocSpecs>`, optional
Specifications for the allocation function
libE_specs: :obj:`dict` or :class:`LibeSpecs<libensemble.specs.libeSpecs>`, optional
Specifications for libEnsemble
persis_info: :obj:`dict`, optional
Persistent information to be passed between user function instances
:doc:`(example)<data_structures/persis_info>`
executor: :class:`Executor<libensemble.executors.executor.executor>`, optional
libEnsemble Executor instance for use within simulation or generator functions
H0: `NumPy structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_, optional
Expand All @@ -262,6 +268,7 @@ def __init__(
libE_specs: Optional[LibeSpecs] = None,
alloc_specs: Optional[AllocSpecs] = AllocSpecs(),
persis_info: Optional[dict] = {},
executor: Optional[Executor] = None,
H0: Optional[npt.NDArray] = None,
parse_args: Optional[bool] = False,
):
Expand All @@ -271,6 +278,7 @@ def __init__(
self._libE_specs = libE_specs
self.alloc_specs = alloc_specs
self.persis_info = persis_info
self.executor = executor
self.H0 = H0

self._util_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -327,6 +335,9 @@ def libE_specs(self, new_specs):
else:
self._libE_specs.__dict__.update(**new_specs)

def _refresh_executor(self):
Executor.executor = self.executor or Executor.executor

def run(self) -> (npt.NDArray, dict, int):
"""
Initializes libEnsemble.
Expand Down Expand Up @@ -366,6 +377,8 @@ def run(self) -> (npt.NDArray, dict, int):
3 = Current process is not in libEnsemble MPI communicator
"""

self._refresh_executor()

self.H, self.persis_info, self.flag = libE(
self.sim_specs,
self.gen_specs,
Expand Down
51 changes: 51 additions & 0 deletions libensemble/gen_funcs/persistent_sampling_var_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@

import numpy as np

from libensemble.executors.executor import Executor
from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG
from libensemble.tools.persistent_support import PersistentSupport
from libensemble.tools.test_support import check_gpu_setting

__all__ = [
"uniform_sample",
"uniform_sample_with_procs_gpus",
"uniform_sample_with_var_priorities",
"uniform_sample_diff_simulations",
"uniform_sample_with_sim_gen_resources",
]


Expand Down Expand Up @@ -145,3 +148,51 @@ def uniform_sample_diff_simulations(_, persis_info, gen_specs, libE_info):
b = len(calc_in)

return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG


def uniform_sample_with_sim_gen_resources(_, persis_info, gen_specs, libE_info):
"""
Randomly requests a different number of processors and gpus to be used in the
evaluation of the generated points.
.. seealso::
`test_GPU_variable_resources.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/regression_tests/test_GPU_variable_resources.py>`_
""" # noqa

b, n, lb, ub = _get_user_params(gen_specs["user"])
rng = persis_info["rand_stream"]
ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
tag = None

dry_run = gen_specs["user"].get("dry_run", False) # logs run lines instead of running

while tag not in [STOP_TAG, PERSIS_STOP]:
H_o = np.zeros(b, dtype=gen_specs["out"])
H_o["x"] = rng.uniform(lb, ub, (b, n))

# Run an app using resources given by libE_specs or persis_info (test purposes only)
task = Executor.executor.submit(
app_name="six_hump_camel",
app_args="-0.99 -0.19",
stdout="out.txt",
stderr="err.txt",
dry_run=dry_run,
)

if not dry_run:
task.wait() # Wait for run to complete

# Asserts GPU set correctly (for known MPI runners)
check_gpu_setting(task, print_setting=True)

# Set resources for sims
nprocs = rng.integers(1, gen_specs["user"]["max_procs"] + 1, b)
H_o["num_procs"] = nprocs # This would get matched to GPUs anyway, if no other config given
H_o["num_gpus"] = nprocs
print(f"GEN created {b} sims requiring {nprocs} procs. One GPU per proc", flush=True)

tag, Work, calc_in = ps.send_recv(H_o)
if hasattr(calc_in, "__len__"):
b = len(calc_in)

return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG
4 changes: 4 additions & 0 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ def __init__(
dyn_keys = ("resource_sets", "num_procs", "num_gpus")
dyn_keys_in_H = any(k in self.hist.H.dtype.names for k in dyn_keys)
self.use_resource_sets = dyn_keys_in_H or self.libE_specs.get("num_resource_sets")
self.gen_num_procs = libE_specs.get("gen_num_procs", 0)
self.gen_num_gpus = libE_specs.get("gen_num_gpus", 0)

self.W = np.zeros(len(self.wcomms), dtype=Manager.worker_dtype)
self.W["worker_id"] = np.arange(len(self.wcomms)) + 1
Expand Down Expand Up @@ -571,6 +573,8 @@ def _get_alloc_libE_info(self) -> dict:
"sim_ended_count": self.hist.sim_ended_count,
"sim_max_given": self._sim_max_given(),
"use_resource_sets": self.use_resource_sets,
"gen_num_procs": self.gen_num_procs,
"gen_num_gpus": self.gen_num_gpus,
}

def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict:
Expand Down
5 changes: 2 additions & 3 deletions libensemble/sim_funcs/executor_hworld.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np

from libensemble.executors.mpi_executor import MPIExecutor
from libensemble.message_numbers import (
MAN_SIGNAL_FINISH,
TASK_FAILED,
Expand Down Expand Up @@ -66,9 +65,9 @@ def custom_polling_loop(exctr, task, timeout_sec=5.0, delay=0.3):
return task, calc_status


def executor_hworld(H, _, sim_specs):
def executor_hworld(H, _, sim_specs, info):
"""Tests launching and polling task and exiting on task finish"""
exctr = MPIExecutor.executor
exctr = info["executor"]
cores = sim_specs["user"]["cores"]
ELAPSED_TIMEOUT = "elapsed_timeout" in sim_specs["user"]

Expand Down
13 changes: 6 additions & 7 deletions libensemble/sim_funcs/var_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import numpy as np

from libensemble.executors.executor import Executor
from libensemble.message_numbers import TASK_FAILED, UNSET_TAG, WORKER_DONE
from libensemble.resources.resources import Resources
from libensemble.sim_funcs.six_hump_camel import six_hump_camel_func
Expand All @@ -50,7 +49,7 @@ def gpu_variable_resources(H, persis_info, sim_specs, libE_info):
dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"]

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down Expand Up @@ -87,7 +86,7 @@ def gpu_variable_resources_from_gen(H, persis_info, sim_specs, libE_info):
dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down Expand Up @@ -147,7 +146,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info):
env_script_path = sim_specs["user"]["env_script"] # Script to run in subprocess
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via given MPI runner, using assigned resources.
_launch_with_env_and_mpi(exctr, inpt, dry_run, env_script_path, "openmpi")
Expand Down Expand Up @@ -181,7 +180,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info):
return H_o, persis_info, calc_status


def multi_points_with_variable_resources(H, _, sim_specs):
def multi_points_with_variable_resources(H, _, sim_specs, libE_info):
"""
Evaluates either helloworld or six hump camel for a collection of points
given in ``H["x"]`` via the MPI executor, supporting variable sized
Expand All @@ -204,7 +203,7 @@ def multi_points_with_variable_resources(H, _, sim_specs):
set_cores_by_rsets = True # If True use rset count to set num procs, else use all available to this worker.
core_multiplier = 1 # Only used with set_cores_by_rsets as a multiplier.

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor
task_states = []
for i, x in enumerate(H["x"]):
nprocs = None # Will be as if argument is not present
Expand Down Expand Up @@ -288,7 +287,7 @@ def CUDA_variable_resources(H, _, sim_specs, libE_info):

# Create application input file
inpt = " ".join(map(str, x))
exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down
14 changes: 14 additions & 0 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,20 @@ class LibeSpecs(BaseModel):
If not set, resources will be divided evenly (excluding zero_resource_workers).
"""

gen_num_procs: Optional[int]
"""
The default number of processors (MPI ranks) required by generators. Unless
overridden by the equivalent `persis_info` settings, generators will be
allocated this many processors for applications launched via the MPIExecutor.
"""

gen_num_gpus: Optional[int]
"""
The default number of GPUs required by generators. Unless overridden by
the equivalent `persis_info` settings, generators will be allocated this
many GPUs.
"""

enforce_worker_core_bounds: Optional[bool] = False
"""
If ``False``, the Executor will permit submission of tasks with a
Expand Down

0 comments on commit 2752917

Please sign in to comment.