diff --git a/libensemble/manager.py b/libensemble/manager.py index 6cd5d1d853..c0f7e18ebf 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -12,6 +12,7 @@ import numpy as np from libensemble.utils.timer import Timer +from libensemble.utils.misc import extract_H_ranges from libensemble.message_numbers import ( EVAL_SIM_TAG, @@ -341,7 +342,7 @@ def _send_work_order(self, Work, w): work_name = calc_type_strings[Work["tag"]] logger.debug( "Manager sending {} work to worker {}. Rows {}".format( - work_name, w, EnsembleDirectory.extract_H_ranges(Work) or None + work_name, w, extract_H_ranges(Work) or None ) ) if len(work_rows): diff --git a/libensemble/output_directory.py b/libensemble/output_directory.py index da8a46bbc2..df52d688c2 100644 --- a/libensemble/output_directory.py +++ b/libensemble/output_directory.py @@ -1,10 +1,9 @@ import os import re import shutil -from itertools import groupby -from operator import itemgetter from libensemble.utils.loc_stack import LocationStack +from libensemble.utils.misc import extract_H_ranges from libensemble.tools.fields_keys import libE_spec_sim_dir_keys, libE_spec_gen_dir_keys, libE_spec_calc_dir_misc from libensemble.message_numbers import EVAL_SIM_TAG, calc_type_strings @@ -84,23 +83,6 @@ def use_calc_dirs(self, type): else: return self.gen_use - @staticmethod - def extract_H_ranges(Work): - """Convert received H_rows into ranges for labeling""" - work_H_rows = Work["libE_info"]["H_rows"] - if len(work_H_rows) == 1: - return str(work_H_rows[0]) - else: - # From https://stackoverflow.com/a/30336492 - ranges = [] - for diff, group in groupby(enumerate(work_H_rows.tolist()), lambda x: x[0] - x[1]): - group = list(map(itemgetter(1), group)) - if len(group) > 1: - ranges.append(str(group[0]) + "-" + str(group[-1])) - else: - ranges.append(str(group[0])) - return "_".join(ranges) - def _make_calc_dir(self, workerID, H_rows, calc_str, locs): """Create calc dirs and intermediate dirs, copy inputs, based on libE_specs""" @@ -182,7 +164,7 @@ def prep_calc_dir(self, Work, calc_iter, workerID, calc_type): self.loc_stack = LocationStack() if calc_type == EVAL_SIM_TAG: - H_rows = self.extract_H_ranges(Work) + H_rows = extract_H_ranges(Work) else: H_rows = str(calc_iter[calc_type]) diff --git a/libensemble/tests/.coveragerc b/libensemble/tests/.coveragerc index 6156169def..3f24d55bbd 100644 --- a/libensemble/tests/.coveragerc +++ b/libensemble/tests/.coveragerc @@ -24,6 +24,7 @@ omit = */legacy_balsam_executor.py */forkable_pdb.py */parse_args.py + */runners.py */gen_funcs/old_aposmm.py */alloc_funcs/fast_alloc_to_aposmm.py exclude_lines = diff --git a/libensemble/tests/unit_tests/test_sim_dir_properties.py b/libensemble/tests/unit_tests/test_sim_dir_properties.py index 71fc5759b7..e84e8f7ca0 100644 --- a/libensemble/tests/unit_tests/test_sim_dir_properties.py +++ b/libensemble/tests/unit_tests/test_sim_dir_properties.py @@ -3,20 +3,21 @@ import numpy as np from libensemble.output_directory import EnsembleDirectory from libensemble.utils.loc_stack import LocationStack +from libensemble.utils.misc import extract_H_ranges def test_range_single_element(): """Single H_row labeling""" work = {"H_fields": ["x", "num_nodes", "procs_per_node"], "libE_info": {"H_rows": np.array([5]), "workerID": 1}} - assert EnsembleDirectory.extract_H_ranges(work) == "5", "Failed to correctly parse single H row" + assert extract_H_ranges(work) == "5", "Failed to correctly parse single H row" def test_range_two_separate_elements(): """Multiple H_rows, non-sequential""" work = {"H_fields": ["x", "num_nodes", "procs_per_node"], "libE_info": {"H_rows": np.array([2, 8]), "workerID": 1}} - assert EnsembleDirectory.extract_H_ranges(work) == "2_8", "Failed to correctly parse nonsequential H rows" + assert extract_H_ranges(work) == "2_8", "Failed to correctly parse nonsequential H rows" def test_range_two_ranges(): @@ -26,7 +27,7 @@ def test_range_two_ranges(): "H_fields": ["x", "num_nodes", "procs_per_node"], "libE_info": {"H_rows": np.array([0, 1, 2, 3, 7, 8]), "workerID": 1}, } - assert EnsembleDirectory.extract_H_ranges(work) == "0-3_7-8", "Failed to correctly parse multiple H ranges" + assert extract_H_ranges(work) == "0-3_7-8", "Failed to correctly parse multiple H ranges" def test_range_mixes(): @@ -37,7 +38,7 @@ def test_range_mixes(): "libE_info": {"H_rows": np.array([2, 3, 4, 6, 8, 9, 11, 14]), "workerID": 1}, } assert ( - EnsembleDirectory.extract_H_ranges(work) == "2-4_6_8-9_11_14" + extract_H_ranges(work) == "2-4_6_8-9_11_14" ), "Failed to correctly parse H row single elements and ranges." diff --git a/libensemble/tools/alloc_support.py b/libensemble/tools/alloc_support.py index e6d3edfb18..fee6304181 100644 --- a/libensemble/tools/alloc_support.py +++ b/libensemble/tools/alloc_support.py @@ -3,7 +3,7 @@ from libensemble.message_numbers import EVAL_SIM_TAG, EVAL_GEN_TAG from libensemble.resources.resources import Resources from libensemble.resources.scheduler import ResourceScheduler, InsufficientFreeResources # noqa: F401 -from libensemble.output_directory import EnsembleDirectory +from libensemble.utils.misc import extract_H_ranges logger = logging.getLogger(__name__) # For debug messages - uncomment @@ -180,7 +180,7 @@ def sim_work(self, wid, H, H_fields, H_rows, persis_info, **libE_info): logger.debug( "Alloc func packing SIM work for worker {}. Packing sim_ids: {}".format( - wid, EnsembleDirectory.extract_H_ranges(work) or None + wid, extract_H_ranges(work) or None ) ) return work @@ -224,7 +224,7 @@ def gen_work(self, wid, H_fields, H_rows, persis_info, **libE_info): logger.debug( "Alloc func packing GEN work for worker {}. Packing sim_ids: {}".format( - wid, EnsembleDirectory.extract_H_ranges(work) or None + wid, extract_H_ranges(work) or None ) ) return work diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py new file mode 100644 index 0000000000..583e7edfdf --- /dev/null +++ b/libensemble/utils/misc.py @@ -0,0 +1,23 @@ +""" +Misc internal functions +""" + +from itertools import groupby +from operator import itemgetter + + +def extract_H_ranges(Work): + """Convert received H_rows into ranges for labeling""" + work_H_rows = Work["libE_info"]["H_rows"] + if len(work_H_rows) == 1: + return str(work_H_rows[0]) + else: + # From https://stackoverflow.com/a/30336492 + ranges = [] + for diff, group in groupby(enumerate(work_H_rows.tolist()), lambda x: x[0] - x[1]): + group = list(map(itemgetter(1), group)) + if len(group) > 1: + ranges.append(str(group[0]) + "-" + str(group[-1])) + else: + ranges.append(str(group[0])) + return "_".join(ranges) diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py new file mode 100644 index 0000000000..22275c2a32 --- /dev/null +++ b/libensemble/utils/runners.py @@ -0,0 +1,68 @@ +import logging +import logging.handlers + +from libensemble.message_numbers import EVAL_SIM_TAG, EVAL_GEN_TAG + +logger = logging.getLogger(__name__) + + +def _funcx_result(funcx_exctr, user_f, calc_in, persis_info, specs, libE_info): + from libensemble.worker import Worker + libE_info["comm"] = None # 'comm' object not pickle-able + Worker._set_executor(0, None) # ditto for executor + + future = funcx_exctr.submit(user_f, calc_in, persis_info, specs, libE_info, endpoint_id=specs["funcx_endpoint"]) + remote_exc = future.exception() # blocks until exception or None + if remote_exc is None: + return future.result() + else: + raise remote_exc + + +def _get_funcx_exctr(sim_specs, gen_specs): + funcx_sim = len(sim_specs.get("funcx_endpoint", "")) > 0 + funcx_gen = len(gen_specs.get("funcx_endpoint", "")) > 0 + + if any([funcx_sim, funcx_gen]): + try: + from funcx import FuncXClient + from funcx.sdk.executor import FuncXExecutor + + return FuncXExecutor(FuncXClient()), funcx_sim, funcx_gen + except ModuleNotFoundError: + logger.warning("funcX use detected but funcX not importable. Is it installed?") + return None, False, False + except Exception: + return None, False, False + else: + return None, False, False + + +def make_runners(sim_specs, gen_specs): + """Creates functions to run a sim or gen. These functions are either + called directly by the worker or submitted to a funcX endpoint.""" + + funcx_exctr, funcx_sim, funcx_gen = _get_funcx_exctr(sim_specs, gen_specs) + sim_f = sim_specs["sim_f"] + + def run_sim(calc_in, persis_info, libE_info): + """Calls or submits the sim func.""" + if funcx_sim and funcx_exctr: + return _funcx_result(funcx_exctr, sim_f, calc_in, persis_info, sim_specs, libE_info) + else: + return sim_f(calc_in, persis_info, sim_specs, libE_info) + + if gen_specs: + gen_f = gen_specs["gen_f"] + + def run_gen(calc_in, persis_info, libE_info): + """Calls or submits the gen func.""" + if funcx_gen and funcx_exctr: + return _funcx_result(funcx_exctr, gen_f, calc_in, persis_info, gen_specs, libE_info) + else: + return gen_f(calc_in, persis_info, gen_specs, libE_info) + + else: + run_gen = [] + + return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen} diff --git a/libensemble/worker.py b/libensemble/worker.py index ba1cdb1145..ad9751bdf4 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -17,7 +17,9 @@ from libensemble.message_numbers import calc_type_strings, calc_status_strings from libensemble.output_directory import EnsembleDirectory +from libensemble.utils.misc import extract_H_ranges from libensemble.utils.timer import Timer +from libensemble.utils.runners import make_runners from libensemble.executors.executor import Executor from libensemble.resources.resources import Resources from libensemble.comms.logs import worker_logging_config @@ -134,72 +136,11 @@ def __init__(self, comm, dtypes, workerID, sim_specs, gen_specs, libE_specs): self.stats_fmt = libE_specs.get("stats_fmt", {}) self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0} - self._run_calc = Worker._make_runners(sim_specs, gen_specs) + self._run_calc = make_runners(sim_specs, gen_specs) Worker._set_executor(self.workerID, self.comm) Worker._set_resources(self.workerID, self.comm) self.EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) - @staticmethod - def _funcx_result(funcx_exctr, user_f, calc_in, persis_info, specs, libE_info): - libE_info["comm"] = None # 'comm' object not pickle-able - Worker._set_executor(0, None) # ditto for executor - - future = funcx_exctr.submit(user_f, calc_in, persis_info, specs, libE_info, endpoint_id=specs["funcx_endpoint"]) - remote_exc = future.exception() # blocks until exception or None - if remote_exc is None: - return future.result() - else: - raise remote_exc - - @staticmethod - def _get_funcx_exctr(sim_specs, gen_specs): - funcx_sim = len(sim_specs.get("funcx_endpoint", "")) > 0 - funcx_gen = len(gen_specs.get("funcx_endpoint", "")) > 0 - - if any([funcx_sim, funcx_gen]): - try: - from funcx import FuncXClient - from funcx.sdk.executor import FuncXExecutor - - return FuncXExecutor(FuncXClient()), funcx_sim, funcx_gen - except ModuleNotFoundError: - logger.warning("funcX use detected but funcX not importable. Is it installed?") - return None, False, False - except Exception: - return None, False, False - else: - return None, False, False - - @staticmethod - def _make_runners(sim_specs, gen_specs): - """Creates functions to run a sim or gen. These functions are either - called directly by the worker or submitted to a funcX endpoint.""" - - funcx_exctr, funcx_sim, funcx_gen = Worker._get_funcx_exctr(sim_specs, gen_specs) - sim_f = sim_specs["sim_f"] - - def run_sim(calc_in, persis_info, libE_info): - """Calls or submits the sim func.""" - if funcx_sim and funcx_exctr: - return Worker._funcx_result(funcx_exctr, sim_f, calc_in, persis_info, sim_specs, libE_info) - else: - return sim_f(calc_in, persis_info, sim_specs, libE_info) - - if gen_specs: - gen_f = gen_specs["gen_f"] - - def run_gen(calc_in, persis_info, libE_info): - """Calls or submits the gen func.""" - if funcx_gen and funcx_exctr: - return Worker._funcx_result(funcx_exctr, gen_f, calc_in, persis_info, gen_specs, libE_info) - else: - return gen_f(calc_in, persis_info, gen_specs, libE_info) - - else: - run_gen = [] - - return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen} - @staticmethod def _set_rset_team(rset_team): """Pass new rset_team to worker resources""" @@ -257,7 +198,7 @@ def _handle_calc(self, Work, calc_in): # from output_directory.py if calc_type == EVAL_SIM_TAG: enum_desc = "sim_id" - calc_id = EnsembleDirectory.extract_H_ranges(Work) + calc_id = extract_H_ranges(Work) else: enum_desc = "Gen no" # Use global gen count if available