Skip to content

Commit

Permalink
Merge branch 'develop' into feature/ufunc_field_decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
jlnav committed Sep 6, 2023
2 parents 9136746 + 1b8ac02 commit 6719fd9
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 15 deletions.
23 changes: 18 additions & 5 deletions libensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml

from libensemble import logger
from libensemble.executors import Executor
from libensemble.libE import libE
from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs
from libensemble.tools import add_unique_random_streams
Expand Down Expand Up @@ -215,18 +216,23 @@ class Ensemble:
Tell libEnsemble when to stop a run
persis_info: :obj:`dict`, optional
libE_specs: :obj:`dict` or :class:`LibeSpecs<libensemble.specs.libeSpecs>`, optional
Persistent information to be passed between user function instances
:doc:`(example)<data_structures/persis_info>`
Specifications for libEnsemble
alloc_specs: :obj:`dict` or :class:`AllocSpecs<libensemble.specs.AllocSpecs>`, optional
Specifications for the allocation function
libE_specs: :obj:`dict` or :class:`LibeSpecs<libensemble.specs.libeSpecs>`, optional
Specifications for libEnsemble
persis_info: :obj:`dict`, optional
Persistent information to be passed between user function instances
:doc:`(example)<data_structures/persis_info>`
executor: :class:`Executor<libensemble.executors.executor.executor>`, optional
libEnsemble Executor instance for use within simulation or generator functions
H0: `NumPy structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_, optional
Expand All @@ -248,6 +254,7 @@ def __init__(
libE_specs: Optional[LibeSpecs] = None,
alloc_specs: Optional[AllocSpecs] = AllocSpecs(),
persis_info: Optional[dict] = {},
executor: Optional[Executor] = None,
H0: Optional[npt.NDArray] = None,
parse_args: Optional[bool] = False,
):
Expand All @@ -257,6 +264,7 @@ def __init__(
self._libE_specs = libE_specs
self.alloc_specs = alloc_specs
self.persis_info = persis_info
self.executor = executor
self.H0 = H0

self._util_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -313,6 +321,9 @@ def libE_specs(self, new_specs):
else:
self._libE_specs.__dict__.update(**new_specs)

def _refresh_executor(self):
Executor.executor = self.executor or Executor.executor

def run(self) -> (npt.NDArray, dict, int):
"""
Initializes libEnsemble.
Expand Down Expand Up @@ -352,6 +363,8 @@ def run(self) -> (npt.NDArray, dict, int):
3 = Current process is not in libEnsemble MPI communicator
"""

self._refresh_executor()

self.H, self.persis_info, self.flag = libE(
self.sim_specs,
self.gen_specs,
Expand Down
5 changes: 2 additions & 3 deletions libensemble/sim_funcs/executor_hworld.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np

from libensemble.executors.mpi_executor import MPIExecutor
from libensemble.message_numbers import (
MAN_SIGNAL_FINISH,
TASK_FAILED,
Expand Down Expand Up @@ -66,9 +65,9 @@ def custom_polling_loop(exctr, task, timeout_sec=5.0, delay=0.3):
return task, calc_status


def executor_hworld(H, _, sim_specs):
def executor_hworld(H, _, sim_specs, info):
"""Tests launching and polling task and exiting on task finish"""
exctr = MPIExecutor.executor
exctr = info["executor"]
cores = sim_specs["user"]["cores"]
ELAPSED_TIMEOUT = "elapsed_timeout" in sim_specs["user"]

Expand Down
13 changes: 6 additions & 7 deletions libensemble/sim_funcs/var_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import numpy as np

from libensemble.executors.executor import Executor
from libensemble.message_numbers import TASK_FAILED, UNSET_TAG, WORKER_DONE
from libensemble.resources.resources import Resources
from libensemble.sim_funcs.six_hump_camel import six_hump_camel_func
Expand All @@ -50,7 +49,7 @@ def gpu_variable_resources(H, persis_info, sim_specs, libE_info):
dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"]

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down Expand Up @@ -87,7 +86,7 @@ def gpu_variable_resources_from_gen(H, persis_info, sim_specs, libE_info):
dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down Expand Up @@ -147,7 +146,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info):
env_script_path = sim_specs["user"]["env_script"] # Script to run in subprocess
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via given MPI runner, using assigned resources.
_launch_with_env_and_mpi(exctr, inpt, dry_run, env_script_path, "openmpi")
Expand Down Expand Up @@ -181,7 +180,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info):
return H_o, persis_info, calc_status


def multi_points_with_variable_resources(H, _, sim_specs):
def multi_points_with_variable_resources(H, _, sim_specs, libE_info):
"""
Evaluates either helloworld or six hump camel for a collection of points
given in ``H["x"]`` via the MPI executor, supporting variable sized
Expand All @@ -204,7 +203,7 @@ def multi_points_with_variable_resources(H, _, sim_specs):
set_cores_by_rsets = True # If True use rset count to set num procs, else use all available to this worker.
core_multiplier = 1 # Only used with set_cores_by_rsets as a multiplier.

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor
task_states = []
for i, x in enumerate(H["x"]):
nprocs = None # Will be as if argument is not present
Expand Down Expand Up @@ -288,7 +287,7 @@ def CUDA_variable_resources(H, _, sim_specs, libE_info):

# Create application input file
inpt = " ".join(map(str, x))
exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# Ax uses a deprecated warn command.
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

from libensemble.gen_funcs.persistent_ax_multitask import persistent_gp_mt_ax_gen_f

Expand Down
1 change: 1 addition & 0 deletions libensemble/tools/persistent_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def send(self, output: npt.NDArray, calc_status: int = UNSET_TAG, keep_state=Fal
# Need to make copy before remove comm as original could be reused
libE_info = dict(self.libE_info)
libE_info.pop("comm")
libE_info.pop("executor")
else:
libE_info = self.libE_info

Expand Down
4 changes: 4 additions & 0 deletions libensemble/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def _handle(self, Work: dict) -> dict:
libE_info["comm"] = self.comm
libE_info["workerID"] = self.workerID
libE_info["rset_team"] = libE_info.get("rset_team", [])
libE_info["executor"] = Executor.executor
Worker._set_rset_team(libE_info)

calc_out, persis_info, calc_status = self._handle_calc(Work, calc_in)
Expand All @@ -356,6 +357,9 @@ def _handle(self, Work: dict) -> dict:
if "comm" in libE_info:
del libE_info["comm"]

if "executor" in libE_info:
del libE_info["executor"]

# If there was a finish signal, bail
if calc_status == MAN_SIGNAL_FINISH:
return None
Expand Down

0 comments on commit 6719fd9

Please sign in to comment.