Skip to content

Commit

Permalink
Merge branch 'develop' into feature/add_gpu_env_fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
shuds13 committed Aug 2, 2023
2 parents b172181 + 42ad8e7 commit 09d1169
Show file tree
Hide file tree
Showing 17 changed files with 122 additions and 119 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ New capabilities:
* Allow users to specify their own system configurations.
* These changes remove a number of tweaks that were needed for particular platforms.

* Resource management supports GPU and non-GPU simulations in the same ensemble. #993
* Resource management supports GPU and non-GPU simulations in the same ensemble. #993
* User's can specify `num_procs` and `num_gpus` in the generator for each evaluation.

* Pydantic models are used for validating major libE input (input can be provided as classes or dictionaries). #878
Expand Down
36 changes: 19 additions & 17 deletions docs/platforms/platforms_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ When using the MPI Executor, it is possible to override the detected information

.. _funcx_ref:

funcX - Remote User functions
-----------------------------
Globus Compute - Remote User functions
--------------------------------------

*Alternatively to much of the above*, if libEnsemble is running on some resource with
internet access (laptops, login nodes, other servers, etc.), workers can be instructed to
launch generator or simulator user function instances to separate resources from
themselves via funcX_, a distributed, high-performance function-as-a-service platform:
themselves via `Globus Compute`_, a distributed, high-performance function-as-a-service platform:

.. image:: ../images/funcxmodel.png
:alt: running_with_funcx
Expand All @@ -162,17 +162,17 @@ themselves via funcX_, a distributed, high-performance function-as-a-service pla
This is useful for running ensembles across machines and heterogeneous resources, but
comes with several caveats:

1. User functions registered with funcX must be *non-persistent*, since
1. User functions registered with Globus Compute must be *non-persistent*, since
manager-worker communicators can't be serialized or used by a remote resource.

2. Likewise, the ``Executor.manager_poll()`` capability is disabled. The only
available control over remote functions by workers is processing return values
or exceptions when they complete.

3. funcX imposes a `handful of task-rate and data limits`_ on submitted functions.
3. Globus Compute imposes a `handful of task-rate and data limits`_ on submitted functions.

4. Users are responsible for authenticating via Globus_ and maintaining their
`funcX endpoints`_ on their target systems.
`Globus Compute endpoints`_ on their target systems.

Users can still define Executor instances within their user functions and submit
MPI applications normally, as long as libEnsemble and the target application are
Expand All @@ -184,15 +184,17 @@ accessible on the remote system::
exctr.register_app(full_path="/home/user/forces.x", app_name="forces")
task = exctr.submit(app_name="forces", num_procs=64)

Specify a funcX endpoint in either :class:`sim_specs<libensemble.specs.SimSpecs>` or :class:`gen_specs<libensemble.specs.GenSpecs>` via the ``funcx_endpoint``
key. For example::
Specify a Globus Compute endpoint in either :class:`sim_specs<libensemble.specs.SimSpecs>` or :class:`gen_specs<libensemble.specs.GenSpecs>` via the ``globus_compute_endpoint``
argument. For example::

from libensemble.specs import SimSpecs

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
"funcx_endpoint": "3af6dc24-3f27-4c49-8d11-e301ade15353",
}
sim_specs = SimSpecs(
sim_f = sim_f,
inputs = ["x"],
out = [("f", float)],
globus_compute_endpoint = "3af6dc24-3f27-4c49-8d11-e301ade15353",
)

See the ``libensemble/tests/scaling_tests/funcx_forces`` directory for a complete
remote-simulation example.
Expand All @@ -219,7 +221,7 @@ libEnsemble on specific HPC systems.

.. _Balsam: https://balsam.readthedocs.io/en/latest/
.. _Cooley: https://www.alcf.anl.gov/support-center/cooley
.. _funcX: https://funcx.org/
.. _`funcX endpoints`: https://funcx.readthedocs.io/en/latest/endpoints.html
.. _`Globus Compute`: https://www.globus.org/compute
.. _`Globus Compute endpoints`: https://globus-compute.readthedocs.io/en/latest/endpoints.html
.. _Globus: https://www.globus.org/
.. _`handful of task-rate and data limits`: https://funcx.readthedocs.io/en/latest/limits.html
.. _`handful of task-rate and data limits`: https://globus-compute.readthedocs.io/en/latest/limits.html
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sphinx
sphinx<7
sphinxcontrib-bibtex
autodoc_pydantic
sphinx-design
Expand Down
2 changes: 1 addition & 1 deletion docs/running_libE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Reverse-ssh interface
By specifying ``--comms ssh`` on the command line, libEnsemble workers can
be launched to remote ssh-accessible systems without needing to specify ``"port"`` or ``"authkey"``. This allows users
to colocate workers, simulation, or generator functions, and any applications they submit on the same machine. Such user
functions can also be persistent, unlike when launching remote functions via :ref:`funcX<funcx_ref>`.
functions can also be persistent, unlike when launching remote functions via :ref:`Globus Compute<funcx_ref>`.

The working directory and Python to run on the remote system need to be specified. Running a calling script may resemble::

Expand Down
2 changes: 1 addition & 1 deletion install/misc_feature_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PyYAML==6.0
tomli==2.0.1
funcx==1.0.4
globus-compute-sdk==2.2.4
2 changes: 1 addition & 1 deletion libensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def _parse_spec(self, loaded_spec):
"inputs": self._get_normal,
"persis_in": self._get_normal,
"out": self._get_outputs,
"funcx_endpoint": self._get_normal,
"globus_compute_endpoint": self._get_normal,
"user": self._get_normal,
}

Expand Down
6 changes: 1 addition & 5 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
import os
import platform
import pstats
import socket
import sys
import time
Expand Down Expand Up @@ -132,10 +131,7 @@ def manager_main(
if libE_specs.get("profile"):
pr.disable()
profile_stats_fname = "manager.prof"

with open(profile_stats_fname, "w") as f:
ps = pstats.Stats(pr, stream=f).sort_stats("cumulative")
ps.print_stats()
pr.dump_stats(profile_stats_fname)

return result

Expand Down
2 changes: 1 addition & 1 deletion libensemble/resources/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def calc_rsets_even_grps(self, rsets_req, max_grpsize, max_groups, extend):
return rsets_req, num_groups_req, rsets_per_group

def check_params(self, user_params, ngroups):
"""Retrun True if all user params divide by number of groups, else False"""
"""Return True if all user params divide by number of groups, else False"""
for param in user_params:
if param % ngroups != 0:
return False
Expand Down
14 changes: 8 additions & 6 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ class SimSpecs(BaseModel):
Also used to construct the complete dtype for libEnsemble's history array
"""

funcx_endpoint: Optional[str] = ""
globus_compute_endpoint: Optional[str] = ""
"""
A funcX (https://funcx.org/) ID corresponding to an active endpoint on a remote system. libEnsemble's workers
will submit simulator function instances to this endpoint to be executed, instead of calling them locally
A Globus Compute (https://www.globus.org/compute) ID corresponding to an active endpoint on a remote system.
libEnsemble's workers will submit simulator function instances to this endpoint to be executed, instead of
calling them locally
"""

user: Optional[dict] = {}
Expand Down Expand Up @@ -128,10 +129,11 @@ class GenSpecs(BaseModel):
history array
"""

funcx_endpoint: Optional[str] = ""
globus_compute_endpoint: Optional[str] = ""
"""
A funcX (https://funcx.org/) ID corresponding to an active endpoint on a remote system. libEnsemble's workers
will submit generator function instances to this endpoint to be executed, instead of being called in-place
A Globus Compute (https://globus_compute.org/) ID corresponding to an active endpoint on a remote system.
libEnsemble's workers will submit generator function instances to this endpoint to be executed, instead of
calling them locally
"""

user: Optional[dict] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
nworkers, is_manager, libE_specs, _ = parse_args()

libE_specs["profile"] = True
libE_specs["safe_mode"] = False
libE_specs["kill_canceled_sims"] = False

sim_specs = {
"sim_f": sim_f,
Expand All @@ -48,7 +50,7 @@

persis_info = add_unique_random_streams({}, nworkers + 1)

exit_criteria = {"gen_max": 501}
exit_criteria = {"sim_max": 1e5}

# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)
Expand All @@ -58,7 +60,7 @@
print("\nlibEnsemble with random sampling has generated enough points")

assert "manager.prof" in os.listdir(), "Expected manager profile not found after run"
os.remove("manager.prof")
# os.remove("manager.prof")

prof_files = [f"worker_{i+1}.prof" for i in range(nworkers)]

Expand All @@ -67,11 +69,11 @@

for file in prof_files:
assert file in os.listdir(), "Expected profile {file} not found after run"
with open(file, "r") as f:
data = f.read().split()
num_worker_funcs_profiled = sum(["worker" in i for i in data])
assert num_worker_funcs_profiled >= 8, (
"Insufficient number of " + "worker functions profiled: " + str(num_worker_funcs_profiled)
)

os.remove(file)
# with open(file, "r") as f:
# data = f.read().split()
# num_worker_funcs_profiled = sum(["worker" in i for i in data])
# assert num_worker_funcs_profiled >= 8, (
# "Insufficient number of " + "worker functions profiled: " + str(num_worker_funcs_profiled)
# )

# os.remove(file)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def run_forces_funcx(H, persis_info, sim_specs, libE_info):
def run_forces_globus_compute(H, persis_info, sim_specs, libE_info):
import os
import secrets
import time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ exit_criteria:
sim_max: 8

sim_specs:
sim_f: libensemble.tests.scaling_tests.forces.funcx_forces.forces_simf.run_forces_funcx
sim_f: libensemble.tests.scaling_tests.forces.funcx_forces.forces_simf.run_forces_globus_compute
inputs:
- x
out:
energy:
type: float
funcx_endpoint: 0bb32388-b7f0-4a10-b329-a84a39c4218e
globus_compute_endpoint: ca766d22-49df-466a-8b51-cd0190c58bb0
user:
keys:
- seed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

if __name__ == "__main__":
forces = Ensemble()
forces.from_yaml("funcx_forces.yaml")
forces.from_yaml("globus_compute_forces.yaml")

forces.sim_specs.user["remote_ensemble_dir"] += secrets.token_hex(nbytes=3)

Expand Down
2 changes: 1 addition & 1 deletion libensemble/tests/unit_tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_sim_gen_alloc_exit_specs_invalid():
"sim_f": "path.to.module", # sim: _UFUNC_INVALID_ERR, gen: _UNRECOGNIZED_ERR
"in": [("f", float), ("fvec", float, 3)], # _IN_INVALID_ERR
"out": ["x_on_cube"], # _OUT_DTYPE_ERR
"funcx_endpoint": 1234,
"globus_compute_endpoint": 1234,
"user": np.zeros(100), # 'value is not a valid dict'
}

Expand Down
60 changes: 30 additions & 30 deletions libensemble/tests/unit_tests/test_ufunc_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def test_normal_runners():

runners = Runners(sim_specs, gen_specs)
assert (
not runners.has_funcx_sim and not runners.has_funcx_gen
), "funcX use should not be detected without setting endpoint fields"
not runners.has_globus_compute_sim and not runners.has_globus_compute_gen
), "Globus Compute use should not be detected without setting endpoint fields"

ro = runners.make_runners()
assert all(
Expand All @@ -54,61 +54,61 @@ def test_normal_no_gen():


@pytest.mark.extra
def test_funcx_runner_init():
def test_globus_compute_runner_init():
calc_in, sim_specs, gen_specs = get_ufunc_args()

sim_specs["funcx_endpoint"] = "1234"
sim_specs["globus_compute_endpoint"] = "1234"

with mock.patch("funcx.FuncXExecutor"):
with mock.patch("globus_compute_sdk.Executor"):
runners = Runners(sim_specs, gen_specs)

assert (
runners.sim_funcx_executor is not None
), "FuncXExecutor should have been instantiated when funcx_endpoint found in specs"
runners.sim_globus_compute_executor is not None
), "Globus ComputeExecutor should have been instantiated when globus_compute_endpoint found in specs"


@pytest.mark.extra
def test_funcx_runner_pass():
def test_globus_compute_runner_pass():
calc_in, sim_specs, gen_specs = get_ufunc_args()

sim_specs["funcx_endpoint"] = "1234"
sim_specs["globus_compute_endpoint"] = "1234"

with mock.patch("funcx.FuncXExecutor"):
with mock.patch("globus_compute_sdk.Executor"):
runners = Runners(sim_specs, gen_specs)

# Creating Mock funcXExecutor and funcX future object - no exception
funcx_mock = mock.Mock()
funcx_future = mock.Mock()
funcx_mock.submit_to_registered_function.return_value = funcx_future
funcx_future.exception.return_value = None
funcx_future.result.return_value = (True, True)
# Creating Mock Globus ComputeExecutor and Globus Compute future object - no exception
globus_compute_mock = mock.Mock()
globus_compute_future = mock.Mock()
globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future
globus_compute_future.exception.return_value = None
globus_compute_future.result.return_value = (True, True)

runners.sim_funcx_executor = funcx_mock
runners.sim_globus_compute_executor = globus_compute_mock
ro = runners.make_runners()

libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"}

out, persis_info = ro[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1})

assert all([out, persis_info]), "funcX runner correctly returned results"
assert all([out, persis_info]), "Globus Compute runner correctly returned results"


@pytest.mark.extra
def test_funcx_runner_fail():
def test_globus_compute_runner_fail():
calc_in, sim_specs, gen_specs = get_ufunc_args()

gen_specs["funcx_endpoint"] = "4321"
gen_specs["globus_compute_endpoint"] = "4321"

with mock.patch("funcx.FuncXExecutor"):
with mock.patch("globus_compute_sdk.Executor"):
runners = Runners(sim_specs, gen_specs)

# Creating Mock funcXExecutor and funcX future object - yes exception
funcx_mock = mock.Mock()
funcx_future = mock.Mock()
funcx_mock.submit_to_registered_function.return_value = funcx_future
funcx_future.exception.return_value = Exception
# Creating Mock Globus ComputeExecutor and Globus Compute future object - yes exception
globus_compute_mock = mock.Mock()
globus_compute_future = mock.Mock()
globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future
globus_compute_future.exception.return_value = Exception

runners.gen_funcx_executor = funcx_mock
runners.gen_globus_compute_executor = globus_compute_mock
ro = runners.make_runners()

libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"}
Expand All @@ -121,6 +121,6 @@ def test_funcx_runner_fail():
if __name__ == "__main__":
test_normal_runners()
test_normal_no_gen()
test_funcx_runner_init()
test_funcx_runner_pass()
test_funcx_runner_fail()
test_globus_compute_runner_init()
test_globus_compute_runner_pass()
test_globus_compute_runner_fail()

0 comments on commit 09d1169

Please sign in to comment.