Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np

from libensemble.utils.timer import Timer
from libensemble.utils.misc import extract_H_ranges

from libensemble.message_numbers import (
EVAL_SIM_TAG,
Expand Down Expand Up @@ -341,7 +342,7 @@ def _send_work_order(self, Work, w):
work_name = calc_type_strings[Work["tag"]]
logger.debug(
"Manager sending {} work to worker {}. Rows {}".format(
work_name, w, EnsembleDirectory.extract_H_ranges(Work) or None
work_name, w, extract_H_ranges(Work) or None
)
)
if len(work_rows):
Expand Down
22 changes: 2 additions & 20 deletions libensemble/output_directory.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import os
import re
import shutil
from itertools import groupby
from operator import itemgetter

from libensemble.utils.loc_stack import LocationStack
from libensemble.utils.misc import extract_H_ranges
from libensemble.tools.fields_keys import libE_spec_sim_dir_keys, libE_spec_gen_dir_keys, libE_spec_calc_dir_misc
from libensemble.message_numbers import EVAL_SIM_TAG, calc_type_strings

Expand Down Expand Up @@ -84,23 +83,6 @@ def use_calc_dirs(self, type):
else:
return self.gen_use

@staticmethod
def extract_H_ranges(Work):
"""Convert received H_rows into ranges for labeling"""
work_H_rows = Work["libE_info"]["H_rows"]
if len(work_H_rows) == 1:
return str(work_H_rows[0])
else:
# From https://stackoverflow.com/a/30336492
ranges = []
for diff, group in groupby(enumerate(work_H_rows.tolist()), lambda x: x[0] - x[1]):
group = list(map(itemgetter(1), group))
if len(group) > 1:
ranges.append(str(group[0]) + "-" + str(group[-1]))
else:
ranges.append(str(group[0]))
return "_".join(ranges)

def _make_calc_dir(self, workerID, H_rows, calc_str, locs):
"""Create calc dirs and intermediate dirs, copy inputs, based on libE_specs"""

Expand Down Expand Up @@ -182,7 +164,7 @@ def prep_calc_dir(self, Work, calc_iter, workerID, calc_type):
self.loc_stack = LocationStack()

if calc_type == EVAL_SIM_TAG:
H_rows = self.extract_H_ranges(Work)
H_rows = extract_H_ranges(Work)
else:
H_rows = str(calc_iter[calc_type])

Expand Down
1 change: 1 addition & 0 deletions libensemble/tests/.coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ omit =
*/legacy_balsam_executor.py
*/forkable_pdb.py
*/parse_args.py
*/runners.py
*/gen_funcs/old_aposmm.py
*/alloc_funcs/fast_alloc_to_aposmm.py
exclude_lines =
Expand Down
9 changes: 5 additions & 4 deletions libensemble/tests/unit_tests/test_sim_dir_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
import numpy as np
from libensemble.output_directory import EnsembleDirectory
from libensemble.utils.loc_stack import LocationStack
from libensemble.utils.misc import extract_H_ranges


def test_range_single_element():
"""Single H_row labeling"""

work = {"H_fields": ["x", "num_nodes", "procs_per_node"], "libE_info": {"H_rows": np.array([5]), "workerID": 1}}
assert EnsembleDirectory.extract_H_ranges(work) == "5", "Failed to correctly parse single H row"
assert extract_H_ranges(work) == "5", "Failed to correctly parse single H row"


def test_range_two_separate_elements():
"""Multiple H_rows, non-sequential"""

work = {"H_fields": ["x", "num_nodes", "procs_per_node"], "libE_info": {"H_rows": np.array([2, 8]), "workerID": 1}}
assert EnsembleDirectory.extract_H_ranges(work) == "2_8", "Failed to correctly parse nonsequential H rows"
assert extract_H_ranges(work) == "2_8", "Failed to correctly parse nonsequential H rows"


def test_range_two_ranges():
Expand All @@ -26,7 +27,7 @@ def test_range_two_ranges():
"H_fields": ["x", "num_nodes", "procs_per_node"],
"libE_info": {"H_rows": np.array([0, 1, 2, 3, 7, 8]), "workerID": 1},
}
assert EnsembleDirectory.extract_H_ranges(work) == "0-3_7-8", "Failed to correctly parse multiple H ranges"
assert extract_H_ranges(work) == "0-3_7-8", "Failed to correctly parse multiple H ranges"


def test_range_mixes():
Expand All @@ -37,7 +38,7 @@ def test_range_mixes():
"libE_info": {"H_rows": np.array([2, 3, 4, 6, 8, 9, 11, 14]), "workerID": 1},
}
assert (
EnsembleDirectory.extract_H_ranges(work) == "2-4_6_8-9_11_14"
extract_H_ranges(work) == "2-4_6_8-9_11_14"
), "Failed to correctly parse H row single elements and ranges."


Expand Down
6 changes: 3 additions & 3 deletions libensemble/tools/alloc_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from libensemble.message_numbers import EVAL_SIM_TAG, EVAL_GEN_TAG
from libensemble.resources.resources import Resources
from libensemble.resources.scheduler import ResourceScheduler, InsufficientFreeResources # noqa: F401
from libensemble.output_directory import EnsembleDirectory
from libensemble.utils.misc import extract_H_ranges

logger = logging.getLogger(__name__)
# For debug messages - uncomment
Expand Down Expand Up @@ -180,7 +180,7 @@ def sim_work(self, wid, H, H_fields, H_rows, persis_info, **libE_info):

logger.debug(
"Alloc func packing SIM work for worker {}. Packing sim_ids: {}".format(
wid, EnsembleDirectory.extract_H_ranges(work) or None
wid, extract_H_ranges(work) or None
)
)
return work
Expand Down Expand Up @@ -224,7 +224,7 @@ def gen_work(self, wid, H_fields, H_rows, persis_info, **libE_info):

logger.debug(
"Alloc func packing GEN work for worker {}. Packing sim_ids: {}".format(
wid, EnsembleDirectory.extract_H_ranges(work) or None
wid, extract_H_ranges(work) or None
)
)
return work
Expand Down
23 changes: 23 additions & 0 deletions libensemble/utils/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Misc internal functions
"""

from itertools import groupby
from operator import itemgetter


def extract_H_ranges(Work):
"""Convert received H_rows into ranges for labeling"""
work_H_rows = Work["libE_info"]["H_rows"]
if len(work_H_rows) == 1:
return str(work_H_rows[0])
else:
# From https://stackoverflow.com/a/30336492
ranges = []
for diff, group in groupby(enumerate(work_H_rows.tolist()), lambda x: x[0] - x[1]):
group = list(map(itemgetter(1), group))
if len(group) > 1:
ranges.append(str(group[0]) + "-" + str(group[-1]))
else:
ranges.append(str(group[0]))
return "_".join(ranges)
68 changes: 68 additions & 0 deletions libensemble/utils/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
import logging.handlers

from libensemble.message_numbers import EVAL_SIM_TAG, EVAL_GEN_TAG

logger = logging.getLogger(__name__)


def _funcx_result(funcx_exctr, user_f, calc_in, persis_info, specs, libE_info):
from libensemble.worker import Worker
libE_info["comm"] = None # 'comm' object not pickle-able
Worker._set_executor(0, None) # ditto for executor

future = funcx_exctr.submit(user_f, calc_in, persis_info, specs, libE_info, endpoint_id=specs["funcx_endpoint"])
remote_exc = future.exception() # blocks until exception or None
if remote_exc is None:
return future.result()
else:
raise remote_exc


def _get_funcx_exctr(sim_specs, gen_specs):
funcx_sim = len(sim_specs.get("funcx_endpoint", "")) > 0
funcx_gen = len(gen_specs.get("funcx_endpoint", "")) > 0

if any([funcx_sim, funcx_gen]):
try:
from funcx import FuncXClient
from funcx.sdk.executor import FuncXExecutor

return FuncXExecutor(FuncXClient()), funcx_sim, funcx_gen
except ModuleNotFoundError:
logger.warning("funcX use detected but funcX not importable. Is it installed?")
return None, False, False
except Exception:
return None, False, False
else:
return None, False, False


def make_runners(sim_specs, gen_specs):
"""Creates functions to run a sim or gen. These functions are either
called directly by the worker or submitted to a funcX endpoint."""

funcx_exctr, funcx_sim, funcx_gen = _get_funcx_exctr(sim_specs, gen_specs)
sim_f = sim_specs["sim_f"]

def run_sim(calc_in, persis_info, libE_info):
"""Calls or submits the sim func."""
if funcx_sim and funcx_exctr:
return _funcx_result(funcx_exctr, sim_f, calc_in, persis_info, sim_specs, libE_info)
else:
return sim_f(calc_in, persis_info, sim_specs, libE_info)

if gen_specs:
gen_f = gen_specs["gen_f"]

def run_gen(calc_in, persis_info, libE_info):
"""Calls or submits the gen func."""
if funcx_gen and funcx_exctr:
return _funcx_result(funcx_exctr, gen_f, calc_in, persis_info, gen_specs, libE_info)
else:
return gen_f(calc_in, persis_info, gen_specs, libE_info)

else:
run_gen = []

return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen}
67 changes: 4 additions & 63 deletions libensemble/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from libensemble.message_numbers import calc_type_strings, calc_status_strings
from libensemble.output_directory import EnsembleDirectory

from libensemble.utils.misc import extract_H_ranges
from libensemble.utils.timer import Timer
from libensemble.utils.runners import make_runners
from libensemble.executors.executor import Executor
from libensemble.resources.resources import Resources
from libensemble.comms.logs import worker_logging_config
Expand Down Expand Up @@ -134,72 +136,11 @@ def __init__(self, comm, dtypes, workerID, sim_specs, gen_specs, libE_specs):
self.stats_fmt = libE_specs.get("stats_fmt", {})

self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0}
self._run_calc = Worker._make_runners(sim_specs, gen_specs)
self._run_calc = make_runners(sim_specs, gen_specs)
Worker._set_executor(self.workerID, self.comm)
Worker._set_resources(self.workerID, self.comm)
self.EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs)

@staticmethod
def _funcx_result(funcx_exctr, user_f, calc_in, persis_info, specs, libE_info):
libE_info["comm"] = None # 'comm' object not pickle-able
Worker._set_executor(0, None) # ditto for executor

future = funcx_exctr.submit(user_f, calc_in, persis_info, specs, libE_info, endpoint_id=specs["funcx_endpoint"])
remote_exc = future.exception() # blocks until exception or None
if remote_exc is None:
return future.result()
else:
raise remote_exc

@staticmethod
def _get_funcx_exctr(sim_specs, gen_specs):
funcx_sim = len(sim_specs.get("funcx_endpoint", "")) > 0
funcx_gen = len(gen_specs.get("funcx_endpoint", "")) > 0

if any([funcx_sim, funcx_gen]):
try:
from funcx import FuncXClient
from funcx.sdk.executor import FuncXExecutor

return FuncXExecutor(FuncXClient()), funcx_sim, funcx_gen
except ModuleNotFoundError:
logger.warning("funcX use detected but funcX not importable. Is it installed?")
return None, False, False
except Exception:
return None, False, False
else:
return None, False, False

@staticmethod
def _make_runners(sim_specs, gen_specs):
"""Creates functions to run a sim or gen. These functions are either
called directly by the worker or submitted to a funcX endpoint."""

funcx_exctr, funcx_sim, funcx_gen = Worker._get_funcx_exctr(sim_specs, gen_specs)
sim_f = sim_specs["sim_f"]

def run_sim(calc_in, persis_info, libE_info):
"""Calls or submits the sim func."""
if funcx_sim and funcx_exctr:
return Worker._funcx_result(funcx_exctr, sim_f, calc_in, persis_info, sim_specs, libE_info)
else:
return sim_f(calc_in, persis_info, sim_specs, libE_info)

if gen_specs:
gen_f = gen_specs["gen_f"]

def run_gen(calc_in, persis_info, libE_info):
"""Calls or submits the gen func."""
if funcx_gen and funcx_exctr:
return Worker._funcx_result(funcx_exctr, gen_f, calc_in, persis_info, gen_specs, libE_info)
else:
return gen_f(calc_in, persis_info, gen_specs, libE_info)

else:
run_gen = []

return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen}

@staticmethod
def _set_rset_team(rset_team):
"""Pass new rset_team to worker resources"""
Expand Down Expand Up @@ -257,7 +198,7 @@ def _handle_calc(self, Work, calc_in):
# from output_directory.py
if calc_type == EVAL_SIM_TAG:
enum_desc = "sim_id"
calc_id = EnsembleDirectory.extract_H_ranges(Work)
calc_id = extract_H_ranges(Work)
else:
enum_desc = "Gen no"
# Use global gen count if available
Expand Down