diff --git a/README.rst b/README.rst index ebe080989..f78457651 100644 --- a/README.rst +++ b/README.rst @@ -82,10 +82,9 @@ Optional dependencies: * Balsam_ -If running on the the compute nodes of three-tier systems -like OLCF's Summit_ or ALCF's Theta_, libEnsemble's workers may use the Balsam service -to schedule and launch MPI applications. Otherwise, libEnsemble can be run with -multiprocessing on the intermediate launch nodes. +As of v0.8.0+dev, libEnsemble features an updated `Balsam Executor`_ +for workers to schedule and launch applications to *anywhere* with a running +Balsam site, including to remote machines. * pyyaml_ @@ -296,7 +295,8 @@ See a complete list of `example user scripts`_. .. _across: https://libensemble.readthedocs.io/en/develop/platforms/platforms_index.html#funcx-remote-user-functions .. _APOSMM: https://link.springer.com/article/10.1007/s12532-017-0131-4 .. _AWA: https://link.springer.com/article/10.1007/s12532-017-0131-4 -.. _Balsam: https://www.alcf.anl.gov/support-center/theta/balsam +.. _Balsam: https://balsam.readthedocs.io/en/latest/ +.. _Balsam Executor: https://libensemble.readthedocs.io/en/develop/executor/balsam_2_executor.html .. _Community Examples repository: https://github.com/Libensemble/libe-community-examples .. _Conda: https://docs.conda.io/en/latest/ .. _conda-forge: https://conda-forge.org/ diff --git a/docs/executor/balsam_2_executor.rst b/docs/executor/balsam_2_executor.rst new file mode 100644 index 000000000..e4fb86eed --- /dev/null +++ b/docs/executor/balsam_2_executor.rst @@ -0,0 +1,14 @@ +Balsam Executor - Remote apps +============================= + +.. automodule:: balsam_executor + :no-undoc-members: + +.. autoclass:: BalsamExecutor + :show-inheritance: + :members: __init__, register_app, submit_allocation, revoke_allocation, submit + +.. autoclass:: BalsamTask + :show-inheritance: + :member-order: bysource + :members: poll, wait, kill diff --git a/docs/executor/balsam_executor.rst b/docs/executor/balsam_executor.rst deleted file mode 100644 index 83b7a0482..000000000 --- a/docs/executor/balsam_executor.rst +++ /dev/null @@ -1,17 +0,0 @@ -Balsam MPI Executor -=================== - -.. automodule:: balsam_executor - :no-undoc-members: - -.. autoclass:: BalsamMPIExecutor - :show-inheritance: -.. :inherited-members: -.. :member-order: bysource -.. :members: __init__, submit, poll, manager_poll, kill, set_kill_mode - -.. autoclass:: BalsamTask - :show-inheritance: - :member-order: bysource -.. :members: workdir_exists, file_exists_in_workdir, read_file_in_workdir, stdout_exists, read_stdout -.. :inherited-members: diff --git a/docs/executor/ex_index.rst b/docs/executor/ex_index.rst index ded4f9061..b1967804f 100644 --- a/docs/executor/ex_index.rst +++ b/docs/executor/ex_index.rst @@ -1,16 +1,18 @@ .. _executor_index: -Executor -======== +Executors +========= -libEnsemble's Executor can be used within the simulator (and, potentially, the generator) -functions to provide a simple, portable interface for running and managing user -applications. +libEnsemble's Executors can be used within user functions to provide a simple, +portable interface for running and managing user applications. .. toctree:: :maxdepth: 2 :titlesonly: - :caption: libEnsemble Executor: + :caption: libEnsemble Executors: overview executor + mpi_executor + legacy_balsam_executor + balsam_2_executor diff --git a/docs/executor/executor.rst b/docs/executor/executor.rst index 94ee71e59..ddfbbc96f 100644 --- a/docs/executor/executor.rst +++ b/docs/executor/executor.rst @@ -1,5 +1,5 @@ -Executor Modules -================ +Base Executor - Local apps +========================== .. automodule:: executor :no-undoc-members: @@ -13,7 +13,8 @@ See the Executor APIs for optional arguments. :caption: Alternative Executors: mpi_executor - balsam_executor + legacy_balsam_executor + balsam_2_executor Executor Class --------------- diff --git a/docs/executor/legacy_balsam_executor.rst b/docs/executor/legacy_balsam_executor.rst new file mode 100644 index 000000000..18bd11cc7 --- /dev/null +++ b/docs/executor/legacy_balsam_executor.rst @@ -0,0 +1,17 @@ +Legacy Balsam MPI Executor +========================== + +.. automodule:: legacy_balsam_executor + :no-undoc-members: + +.. autoclass:: LegacyBalsamMPIExecutor + :show-inheritance: + :inherited-members: + :member-order: bysource + :members: __init__, submit, poll, manager_poll, kill, set_kill_mode + +.. autoclass:: LegacyBalsamTask + :show-inheritance: + :member-order: bysource + :members: workdir_exists, file_exists_in_workdir, read_file_in_workdir, stdout_exists, read_stdout + :inherited-members: diff --git a/docs/executor/mpi_executor.rst b/docs/executor/mpi_executor.rst index bf95c9884..60fc1cc78 100644 --- a/docs/executor/mpi_executor.rst +++ b/docs/executor/mpi_executor.rst @@ -1,5 +1,5 @@ -MPI Executor -============ +MPI Executor - MPI apps +======================= .. automodule:: mpi_executor :no-undoc-members: diff --git a/docs/executor/overview.rst b/docs/executor/overview.rst index c0943c9e0..42ca0542f 100644 --- a/docs/executor/overview.rst +++ b/docs/executor/overview.rst @@ -23,9 +23,9 @@ to an application instance instead of a callable. They feature the ``cancel()``, from the standard. The main ``Executor`` class is an abstract class, inherited by the ``MPIExecutor`` -for direct running of MPI applications, and the ``BalsamMPIExecutor`` -for submitting MPI run requests from a worker running on a compute node to a -Balsam service running on a launch node. This second approach is suitable for +for direct running of MPI applications, and the ``BalsamExecutor`` +for submitting MPI run requests from a worker running on a compute node to the +Balsam service. This second approach is suitable for systems that don't allow submitting MPI applications from compute nodes. Typically, users choose and parameterize their ``Executor`` objects in their @@ -46,8 +46,8 @@ In calling script:: USE_BALSAM = False if USE_BALSAM: - from libensemble.executors.balsam_executor import BalsamMPIExecutor - exctr = BalsamMPIExecutor() + from libensemble.executors.balsam_executor import LegacyBalsamMPIExecutor + exctr = LegacyBalsamMPIExecutor() else: from libensemble.executors.mpi_executor import MPIExecutor exctr = MPIExecutor() diff --git a/docs/introduction_latex.rst b/docs/introduction_latex.rst index 6eee1c949..f6b2c42da 100644 --- a/docs/introduction_latex.rst +++ b/docs/introduction_latex.rst @@ -25,7 +25,8 @@ We now present further information on running and testing libEnsemble. .. _across: https://libensemble.readthedocs.io/en/develop/platforms/platforms_index.html#funcx-remote-user-functions .. _APOSMM: https://link.springer.com/article/10.1007/s12532-017-0131-4 .. _AWA: https://link.springer.com/article/10.1007/s12532-017-0131-4 -.. _Balsam: https://www.alcf.anl.gov/support-center/theta/balsam +.. _Balsam: https://balsam.readthedocs.io/en/latest/ +.. _Balsam Executor: https://libensemble.readthedocs.io/en/develop/executor/balsam_2_executor.html .. _Community Examples repository: https://github.com/Libensemble/libe-community-examples .. _Conda: https://docs.conda.io/en/latest/ .. _conda-forge: https://conda-forge.org/ diff --git a/docs/overview_usecases.rst b/docs/overview_usecases.rst index 588fe718f..069434cb2 100644 --- a/docs/overview_usecases.rst +++ b/docs/overview_usecases.rst @@ -111,7 +111,7 @@ its capabilities. * **Executor**: The executor can be used within user functions to provide a simple, portable interface for running and managing user tasks (applications). - There are multiple executors including the ``MPIExecutor`` and ``BalsamMPIExecutor``. + There are multiple executors including the ``MPIExecutor`` and ``LegacyBalsamMPIExecutor``. The base ``Executor`` class allows local sub-processing of serial tasks. * **Submit**: Enqueue or indicate that one or more jobs or tasks needs to be diff --git a/docs/platforms/platforms_index.rst b/docs/platforms/platforms_index.rst index 45476d331..1de4d76a1 100644 --- a/docs/platforms/platforms_index.rst +++ b/docs/platforms/platforms_index.rst @@ -84,26 +84,46 @@ Systems with Launch/MOM nodes Some large systems have a 3-tier node setup. That is, they have a separate set of launch nodes (known as MOM nodes on Cray Systems). User batch jobs or interactive sessions run on a launch node. Most such systems supply a special MPI runner which has some application-level scheduling -capability (eg. aprun, jsrun). MPI applications can only be submitted from these nodes. Examples +capability (eg. ``aprun``, ``jsrun``). MPI applications can only be submitted from these nodes. Examples of these systems include: Summit, Sierra and Theta. There are two ways of running libEnsemble on these kind of systems. The first, and simplest, -is to run libEnsemble on the launch nodes. This is often sufficient if the worker's sim or -gen scripts are not doing too much work (other than launching applications). This approach +is to run libEnsemble on the launch nodes. This is often sufficient if the worker's simulation +or generation functions are not doing much work (other than launching applications). This approach is inherently centralized. The entire node allocation is available for the worker-launched tasks. -To run libEnsemble on the compute nodes of these systems requires an alternative Executor, -such as :doc:`Balsam<../executor/balsam_executor>`, which runs on the -launch nodes and launches tasks submitted by workers. Running libEnsemble on the compute -nodes is potentially more scalable and will better manage ``sim_f`` and ``gen_f`` functions -that contain considerable computational work or I/O. +However, running libEnsemble on the compute nodes is potentially more scalable and +will better manage simulation and generation functions that contain considerable +computational work or I/O. Therefore the second option is to use proxy task-execution +services like Balsam_. - .. image:: ../images/centralized_new_detailed_balsam.png +Balsam - Externally managed applications +---------------------------------------- + +Running libEnsemble on the compute nodes while still submitting additional applications +requires alternative Executors that connect to external services like Balsam_. Balsam +can take tasks submitted by workers and execute them on the remaining compute nodes, +or if using Balsam 2, *to entirely different systems*. + + .. figure:: ../images/centralized_new_detailed_balsam.png :alt: central_balsam :scale: 30 :align: center + Single-System: libEnsemble + LegacyBalsamMPIExecutor + + .. figure:: ../images/balsam2.png + :alt: balsam2 + :scale: 40 + :align: center + + (New) Multi-System: libEnsemble + BalsamExecutor + +As of v0.8.0+dev, libEnsemble supports both "legacy" Balsam via the +:doc:`LegacyBalsamMPIExecutor<../executor/legacy_balsam_executor>` +and Balsam 2 via the :doc:`BalsamExecutor<../executor/balsam_2_executor>`. + Submission scripts for running on launch/MOM nodes and for using Balsam, can be be found in the :doc:`examples`. @@ -178,7 +198,7 @@ key. For example:: 'sim_f': sim_f, 'in': ['x'], 'out': [('f', float)], - 'funcx_endpoint': 3af6dc24-3f27-4c49-8d11-e301ade15353, + 'funcx_endpoint': '3af6dc24-3f27-4c49-8d11-e301ade15353', } See the ``libensemble/tests/scaling_tests/funcx_forces`` directory for a complete diff --git a/docs/running_libE.rst b/docs/running_libE.rst index fc57a5c4c..6a9851467 100644 --- a/docs/running_libE.rst +++ b/docs/running_libE.rst @@ -41,7 +41,7 @@ Limitations of MPI mode If you are launching MPI applications from workers, then MPI is being nested. This is not supported with Open MPI. This can be overcome by using a proxy launcher -(see :doc:`Balsam`). This nesting does work, however, +(see :doc:`Balsam`). This nesting does work, however, with MPICH and its derivative MPI implementations. It is also unsuitable to use this mode when running on the **launch** nodes of three-tier diff --git a/docs/tutorials/executor_forces_tutorial.rst b/docs/tutorials/executor_forces_tutorial.rst index b742a98bd..6027e713f 100644 --- a/docs/tutorials/executor_forces_tutorial.rst +++ b/docs/tutorials/executor_forces_tutorial.rst @@ -241,7 +241,7 @@ and evaluated in a variety of helpful ways. For now, we're satisfied with waitin for the task to complete via ``task.wait()``. We can assume that afterward, any results are now available to parse. Our application -produces a ``forces[particles].stat`` file that contains either energy +produces a ``forces.stat`` file that contains either energy computations for every time-step or a "kill" message if particles were lost, which indicates a failed simulation. @@ -254,7 +254,7 @@ to ``WORKER_DONE``. Otherwise, send back ``NAN`` and a ``TASK_FAILED`` status: :linenos: # Stat file to check for bad runs - statfile = "forces{}.stat".format(particles) + statfile = "forces.stat" # Try loading final energy reading, set the sim's status try: diff --git a/examples/calling_scripts/run_libe_forces.py b/examples/calling_scripts/run_libe_forces.py index ec19d93cb..d60d20cba 120000 --- a/examples/calling_scripts/run_libe_forces.py +++ b/examples/calling_scripts/run_libe_forces.py @@ -1 +1 @@ -../../libensemble/tests/scaling_tests/forces/run_libe_forces.py \ No newline at end of file +../../libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces.py \ No newline at end of file diff --git a/examples/calling_scripts/run_libe_forces_from_yaml.py b/examples/calling_scripts/run_libe_forces_from_yaml.py index 3f4d7b4b0..bc210f22d 120000 --- a/examples/calling_scripts/run_libe_forces_from_yaml.py +++ b/examples/calling_scripts/run_libe_forces_from_yaml.py @@ -1 +1 @@ -../../libensemble/tests/scaling_tests/forces/run_libe_forces_from_yaml.py \ No newline at end of file +../../libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces_from_yaml.py \ No newline at end of file diff --git a/examples/tutorials/forces_with_executor/build_forces.sh b/examples/tutorials/forces_with_executor/build_forces.sh index 713b765de..b2f0ad557 120000 --- a/examples/tutorials/forces_with_executor/build_forces.sh +++ b/examples/tutorials/forces_with_executor/build_forces.sh @@ -1 +1 @@ -../../../libensemble/tests/scaling_tests/forces/build_forces.sh \ No newline at end of file +../../../libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh \ No newline at end of file diff --git a/examples/tutorials/forces_with_executor/cleanup.sh b/examples/tutorials/forces_with_executor/cleanup.sh deleted file mode 100755 index 6c02df691..000000000 --- a/examples/tutorials/forces_with_executor/cleanup.sh +++ /dev/null @@ -1 +0,0 @@ -rm *.stat libE_stats.txt *.err *.out forces.x ensemble.log diff --git a/examples/tutorials/forces_with_executor/forces.c b/examples/tutorials/forces_with_executor/forces.c index 944d7b790..6179f7edf 120000 --- a/examples/tutorials/forces_with_executor/forces.c +++ b/examples/tutorials/forces_with_executor/forces.c @@ -1 +1 @@ -../../../libensemble/tests/scaling_tests/forces/forces.c \ No newline at end of file +../../../libensemble/tests/scaling_tests/forces/forces_app/forces.c \ No newline at end of file diff --git a/examples/tutorials/forces_with_executor/forces_simf_simple.py b/examples/tutorials/forces_with_executor/forces_simf_simple.py new file mode 120000 index 000000000..08e54c77e --- /dev/null +++ b/examples/tutorials/forces_with_executor/forces_simf_simple.py @@ -0,0 +1 @@ +../../../libensemble/tests/scaling_tests/forces/forces_simple/forces_simf_simple.py \ No newline at end of file diff --git a/examples/tutorials/forces_with_executor/run_libe_forces_simple.py b/examples/tutorials/forces_with_executor/run_libe_forces_simple.py new file mode 120000 index 000000000..8ce57fdd3 --- /dev/null +++ b/examples/tutorials/forces_with_executor/run_libe_forces_simple.py @@ -0,0 +1 @@ +../../../libensemble/tests/scaling_tests/forces/forces_simple/run_libe_forces_simple.py \ No newline at end of file diff --git a/libensemble/executors/__init__.py b/libensemble/executors/__init__.py index 45584a513..85cdcf430 100644 --- a/libensemble/executors/__init__.py +++ b/libensemble/executors/__init__.py @@ -1,9 +1,18 @@ from libensemble.executors.executor import Executor from libensemble.executors.mpi_executor import MPIExecutor -import os -import sys -if 'BALSAM_DB_PATH' in os.environ and int(sys.version[2]) >= 6: - from libensemble.executors.balsam_executor import BalsamMPIExecutor +import pkg_resources -__all__ = ['BalsamMPIExecutor', 'Executor', 'MPIExecutor'] +try: + if pkg_resources.get_distribution("balsam"): # Balsam 0.7.0 onward (Balsam 2) + from libensemble.executors.balsam_executor import BalsamExecutor + +except (ModuleNotFoundError, ImportError, pkg_resources.DistributionNotFound): + try: + if pkg_resources.get_distribution("balsam-flow"): # Balsam up through 0.5.0 + from libensemble.executors.legacy_balsam_executor import LegacyBalsamMPIExecutor + except (ModuleNotFoundError, ImportError, pkg_resources.DistributionNotFound): + pass + + +__all__ = ["LegacyBalsamMPIExecutor", "Executor", "MPIExecutor", "BalsamExecutor"] diff --git a/libensemble/executors/balsam_executor.py b/libensemble/executors/balsam_executor.py index 6bfa6e2a1..1df98a464 100644 --- a/libensemble/executors/balsam_executor.py +++ b/libensemble/executors/balsam_executor.py @@ -1,23 +1,71 @@ """ -This module launches and controls the running of tasks with Balsam_ 0.5.0. Balsam -is especially useful when running libEnsemble on three-tier systems with intermediate -launch nodes. Typically on such systems, MPI processes are themselves unable -to submit further MPI tasks to the batch scheduler. Therefore when libEnsemble's -workers have been launched in a distributed fashion via MPI, they must communicate -with an intermediate service like Balsam running on the launch nodes. The Balsam -service then reserves compute resources and launches tasks from libEnsemble's workers -that are using the Balsam MPI Executor. +This module launches and controls the running of tasks with Balsam_, and most +notably can submit tasks from any machine, to any machine running a Balsam site_. -In order to create a Balsam executor, the calling script should contain :: +.. image:: ../images/balsam2.png + :alt: central_balsam + :scale: 40 + :align: center - exctr = BalsamMPIExecutor() +At this time, access to Balsam is limited to those with valid organizational logins +authenticated through Globus_. -The Balsam executor inherits from the MPI executor. See the -:doc:`MPIExecutor` for shared API. Any differences are -shown below. +In order to initiate a Balsam executor, the calling script should contain :: -.. _Balsam: https://balsam.readthedocs.io/en/master/ + from libensemble.executors import BalsamExecutor + exctr = BalsamExecutor() +Key differences to consider between this executor and libEnsemble's others is +Balsam ``ApplicationDefinition`` instances are registered instead of paths and task +submissions will not run until Balsam reserves compute resources at a site. + +This process may resemble:: + + from libensemble.executors import BalsamExecutor + from balsam.api import ApplicationDefinition + + class HelloApp(ApplicationDefinition): + site = "my-balsam-site" + command_template = "/path/to/hello.app {{ my_name }}" + + exctr = BalsamExecutor() + exctr.register_app(HelloApp, app_name="hello") + + exctr.submit_allocation( + site_id=999, # corresponds to "my-balsam-site", found via ``balsam site ls`` + num_nodes=4, # Total number of nodes requested for *all jobs* + wall_time_min=30, + queue="debug-queue", + project="my-project", + ) + +Task submissions of registered apps aren't too different from the other executors, +except Balsam expects application arguments in dictionary form. Note that these fields +must match the templating syntax in each ``ApplicationDefinition``'s ``command_template`` +field:: + + args = {"my_name": "World"} + + task = exctr.submit( + app_name="hello", + app_args=args, + num_procs=4, + num_nodes=1, + procs_per_node=4, + ) + +Application instances submitted by the executor to the Balsam service will get +scheduled within the reserved resource allocation. **Each Balsam app can only be +submitted to the site specified in its class definition.** Output files will appear +in the Balsam site's ``data`` directory, but can be automatically `transferred back`_ +via Globus. + +**Reading Balsam's documentation is highly recommended.** + +.. _site: https://balsam.readthedocs.io/en/latest/user-guide/site-config/ +.. _Balsam: https://balsam.readthedocs.io/en/latest/ +.. _`transferred back`: https://balsam.readthedocs.io/en/latest/user-guide/transfer/ +.. _Globus: https://www.globus.org/ """ import os @@ -25,13 +73,17 @@ import time import datetime -from libensemble.resources import mpi_resources -from libensemble.executors.executor import \ - Application, Task, ExecutorException, TimeoutExpired, jassert, STATES -from libensemble.executors.mpi_executor import MPIExecutor +from libensemble.executors.executor import ( + Application, + Task, + ExecutorException, + TimeoutExpired, + jassert, + STATES, +) +from libensemble.executors import Executor -import balsam.launcher.dag as dag -from balsam.core import models +from balsam.api import Job, BatchJob, EventLog logger = logging.getLogger(__name__) # To change logging level for just this module @@ -39,40 +91,40 @@ class BalsamTask(Task): - """Wraps a Balsam Task from the Balsam service + """Wraps a Balsam ``Job`` from the Balsam service. - The same attributes and query routines are implemented. + The same attributes and query routines are implemented. Use ``task.process`` + to refer to the matching Balsam ``Job`` initialized by the ``BalsamExecutor``, + with every Balsam ``Job`` method invocable on it. Otherwise, libEnsemble task methods + like ``poll()`` can be used directly. """ - def __init__(self, app=None, app_args=None, workdir=None, - stdout=None, stderr=None, workerid=None): - """Instantiate a new BalsamTask instance. - - A new BalsamTask object is created with an id, status and + def __init__( + self, + app=None, + app_args=None, + workdir=None, + stdout=None, + stderr=None, + workerid=None, + ): + """Instantiate a new ``BalsamTask`` instance. + + A new ``BalsamTask`` object is created with an id, status and configuration attributes. This will normally be created by the executor on a submission. """ # May want to override workdir with Balsam value when it exists Task.__init__(self, app, app_args, workdir, stdout, stderr, workerid) - def read_file_in_workdir(self, filename): - return self.process.read_file_in_workdir(filename) - - def read_stdout(self): - return self.process.read_file_in_workdir(self.stdout) - - def read_stderr(self): - return self.process.read_file_in_workdir(self.stderr) - def _get_time_since_balsam_submit(self): - """Return time since balsam task entered RUNNING state""" + """Return time since balsam task entered ``RUNNING`` state""" - # If wait_on_start then can could calculate runtime same a base executor - # but otherwise that will return time from task submission. Get from Balsam. - - # self.runtime = self.process.runtime_seconds # Only reports at end of run currently - balsam_launch_datetime = self.process.get_state_times().get('RUNNING', None) + event_query = EventLog.objects.filter(job_id=self.process.id, to_state="RUNNING") + if not len(event_query): + return 0 + balsam_launch_datetime = event_query[0].timestamp current_datetime = datetime.datetime.now() if balsam_launch_datetime: return (current_datetime - balsam_launch_datetime).total_seconds() @@ -97,28 +149,30 @@ def _set_complete(self, dry_run=False): self.finished = True if dry_run: self.success = True - self.state = 'FINISHED' + self.state = "FINISHED" else: balsam_state = self.process.state self.workdir = self.workdir or self.process.working_directory self.calc_task_timing() - self.success = (balsam_state == 'JOB_FINISHED') - if balsam_state == 'JOB_FINISHED': - self.state = 'FINISHED' - elif balsam_state == 'PARENT_KILLED': # Not currently used - self.state = 'USER_KILLED' + if balsam_state in [ + "RUN_DONE", + "POSTPROCESSED", + "STAGED_OUT", + "JOB_FINISHED", + ]: + self.success = True + self.state = "FINISHED" elif balsam_state in STATES: # In my states self.state = balsam_state else: - logger.warning("Task finished, but in unrecognized " - "Balsam state {}".format(balsam_state)) - self.state = 'UNKNOWN' + logger.warning("Task finished, but in unrecognized " "Balsam state {}".format(balsam_state)) + self.state = "UNKNOWN" - logger.info("Task {} ended with state {}". - format(self.name, self.state)) + logger.info("Task {} ended with state {}".format(self.name, self.state)) def poll(self): - """Polls and updates the status attributes of the supplied task""" + """Polls and updates the status attributes of the supplied task. Requests + Job information from Balsam service.""" if self.dry_run: return @@ -130,32 +184,41 @@ def poll(self): balsam_state = self.process.state self.runtime = self._get_time_since_balsam_submit() - if balsam_state in models.END_STATES: + if balsam_state in ["RUN_DONE", "POSTPROCESSED", "STAGED_OUT", "JOB_FINISHED"]: self._set_complete() - elif balsam_state in models.ACTIVE_STATES: - self.state = 'RUNNING' + elif balsam_state in ["RUNNING"]: + self.state = "RUNNING" self.workdir = self.workdir or self.process.working_directory - elif (balsam_state in models.PROCESSABLE_STATES or - balsam_state in models.RUNNABLE_STATES): - self.state = 'WAITING' + elif balsam_state in [ + "CREATED", + "AWAITING_PARENTS", + "READY", + "STAGED_IN", + "PREPROCESSED", + ]: + self.state = "WAITING" + + elif balsam_state in ["RUN_ERROR", "RUN_TIMEOUT", "FAILED"]: + self.state = "FAILED" else: raise ExecutorException( "Task state returned from Balsam is not in known list of " - "Balsam states. Task state is {}".format(balsam_state)) + "Balsam states. Task state is {}".format(balsam_state) + ) def wait(self, timeout=None): - """Waits on completion of the task or raises TimeoutExpired exception + """Waits on completion of the task or raises ``TimeoutExpired``. Status attributes of task are updated on completion. Parameters ---------- - timeout: - Time in seconds after which a TimeoutExpired exception is raised""" + timeout: float + Time in seconds after which a ``TimeoutExpired`` exception is raised""" if self.dry_run: return @@ -166,7 +229,12 @@ def wait(self, timeout=None): # Wait on the task start = time.time() self.process.refresh_from_db() - while self.process.state not in models.END_STATES: + while self.process.state not in [ + "RUN_DONE", + "POSTPROCESSED", + "STAGED_OUT", + "JOB_FINISHED", + ]: time.sleep(0.2) self.process.refresh_from_db() if timeout and time.time() - start > timeout: @@ -176,100 +244,260 @@ def wait(self, timeout=None): self.runtime = self._get_time_since_balsam_submit() self._set_complete() - def kill(self, wait_time=None): - """ Kills or cancels the supplied task """ - - dag.kill(self.process) + def kill(self): + """Cancels the supplied task. Killing is unsupported at this time.""" - # Could have Wait here and check with Balsam its killed - - # but not implemented yet. + self.process.delete() logger.info("Killing task {}".format(self.name)) - self.state = 'USER_KILLED' + self.state = "USER_KILLED" self.finished = True self.calc_task_timing() -class BalsamMPIExecutor(MPIExecutor): - """Inherits from MPIExecutor and wraps the Balsam task management service +class BalsamExecutor(Executor): + """Inherits from ``Executor`` and wraps the Balsam service. Via this Executor, + Balsam ``Jobs`` can be submitted to Balsam sites, either local or on remote machines. .. note:: Task kills are not configurable in the Balsam executor. """ - def __init__(self, custom_info={}): - """Instantiate a new BalsamMPIExecutor instance. - - A new BalsamMPIExecutor object is created with an application - registry and configuration attributes - """ - if custom_info: - logger.warning("The Balsam executor does not support custom_info - ignoring") + def __init__(self): + """Instantiate a new ``BalsamExecutor`` instance.""" - super().__init__(custom_info) + super().__init__() self.workflow_name = "libe_workflow" + self.allocations = [] def serial_setup(self): - """Balsam serial setup includes empyting database and adding applications""" - BalsamMPIExecutor.del_apps() - BalsamMPIExecutor.del_tasks() - - for app in self.apps.values(): - calc_name = app.gname - desc = app.desc - full_path = app.full_path - self.add_app(calc_name, full_path, desc) - - @staticmethod - def del_apps(): - """Deletes all Balsam apps in the libe_app namespace""" - AppDef = models.ApplicationDefinition - - # Some error handling on deletes.... is it internal - for app_type in [Application.prefix]: - deletion_objs = AppDef.objects.filter(name__contains=app_type) - if deletion_objs: - for del_app in deletion_objs.iterator(): - logger.debug("Deleting app {}".format(del_app.name)) - deletion_objs.delete() - - @staticmethod - def del_tasks(): - """Deletes all Balsam tasks """ - for app_type in [Task.prefix]: - deletion_objs = models.BalsamJob.objects.filter( - name__contains=app_type) - if deletion_objs: - for del_task in deletion_objs.iterator(): - logger.debug("Deleting task {}".format(del_task.name)) - deletion_objs.delete() - - @staticmethod - def add_app(name, exepath, desc): - """ Add application to Balsam database """ - AppDef = models.ApplicationDefinition - app = AppDef() - app.name = name - app.executable = exepath - app.description = desc - # app.default_preprocess = '' # optional - # app.default_postprocess = '' # optional - app.save() - logger.debug("Added App {}".format(app.name)) + """Balsam serial setup includes emptying database and adding applications""" + pass + + def add_app(self, name, site, exepath, desc): + """Sync application with Balsam service""" + pass + + def register_app(self, BalsamApp, app_name, calc_type=None, desc=None): + """Registers a Balsam ``ApplicationDefinition`` to libEnsemble. This class + instance *must* have a ``site`` and ``command_template`` specified. See + the Balsam docs for information on other optional fields. + + Parameters + ---------- + + BalsamApp: ``ApplicationDefinition`` object + A Balsam ``ApplicationDefinition`` instance. + + app_name: String, optional + Name to identify this application. + + calc_type: String, optional + Calculation type: Set this application as the default ``'sim'`` + or ``'gen'`` function. + + desc: String, optional + Description of this application + + """ + if not app_name: + app_name = BalsamApp.command_template.split(" ")[0] + self.apps[app_name] = Application(" ", app_name, calc_type, desc, BalsamApp) + + # Default sim/gen apps will be deprecated. Just use names. + if calc_type is not None: + jassert( + calc_type in self.default_apps, + "Unrecognized calculation type", + calc_type, + ) + self.default_apps[calc_type] = self.apps[app_name] + + def submit_allocation( + self, + site_id, + num_nodes, + wall_time_min, + job_mode="mpi", + queue="local", + project="local", + optional_params={}, + filter_tags={}, + partitions=[], + ): + """ + Submits a Balsam ``BatchJob`` machine allocation request to Balsam. + Corresponding Balsam applications with a matching site can be submitted to + this allocation. Effectively a wrapper for ``BatchJob.objects.create()``. + + Parameters + ---------- + + site_id: int + The corresponding ``site_id`` for a Balsam site. Retrieve via ``balsam site ls`` + + num_nodes: int + The number of nodes to request from a machine with a running Balsam site + + wall_time_min: int + The number of walltime minutes to request for the ``BatchJob`` allocation + + job_mode: String, optional + Either ``"serial"`` or ``"mpi"``. Default: ``"mpi"`` + + queue: String, optional + Specifies the queue from which the ``BatchJob`` should request nodes. Default: ``"local"`` + + project: String, optional + Specifies the project that should be charged for the requested machine time. Default: ``"local"`` + + optional_params: dict, optional + Additional system-specific parameters to set, based on fields in Balsam's ``job-template.sh`` + + filter_tags: dict, optional + Directs the resultant ``BatchJob`` to only run Jobs with matching tags. + + partitions: list of dicts, optional + Divides the allocation into multiple launcher partitions, with differing + ``job_mode``, ``num_nodes``. ``filter_tags``, etc. See the Balsam docs. + + Returns + ------- + + The corresponding ``BatchJob`` object. + """ + + allocation = BatchJob.objects.create( + site_id=site_id, + num_nodes=num_nodes, + wall_time_min=wall_time_min, + job_mode=job_mode, + queue=queue, + project=project, + optional_params=optional_params, + filter_tags=filter_tags, + partitions=partitions + ) + + self.allocations.append(allocation) + + logger.info( + "Submitted Batch allocation to site {}: " + "nodes {} queue {} project {}".format(site_id, num_nodes, queue, project) + ) + + return allocation + + def revoke_allocation(self, allocation): + """ + Terminates a Balsam ``BatchJob`` machine allocation remotely. Balsam apps should + no longer be submitted to this allocation. Best to run after libEnsemble + completes, or after this ``BatchJob`` is no longer needed. Helps save machine time. + + Parameters + ---------- + + allocation: ``BatchJob`` object + a ``BatchJob`` with a corresponding machine allocation that should be cancelled. + """ + allocation.refresh_from_db() + + while not allocation.scheduler_id: + time.sleep(1) + allocation.refresh_from_db() + + batchjob = BatchJob.objects.get(scheduler_id=allocation.scheduler_id) + batchjob.state = "pending_deletion" + batchjob.save() def set_resources(self, resources): self.resources = resources - def submit(self, calc_type=None, app_name=None, num_procs=None, - num_nodes=None, procs_per_node=None, machinefile=None, - app_args=None, stdout=None, stderr=None, stage_inout=None, - hyperthreads=False, dry_run=False, wait_on_start=False, - extra_args=''): - """Creates a new task, and either executes or schedules to execute - in the executor + def submit( + self, + calc_type=None, + app_name=None, + app_args=None, + num_procs=None, + num_nodes=None, + procs_per_node=None, + max_tasks_per_node=None, + machinefile=None, + gpus_per_rank=0, + transfers={}, + workdir="", + dry_run=False, + wait_on_start=False, + extra_args={}, + tags={}, + ): + """Initializes and submits a Balsam ``Job`` based on a registered ``ApplicationDefinition`` + and requested resources. A corresponding libEnsemble ``Task`` object is returned. + + calc_type: String, optional + The calculation type: ``'sim'`` or ``'gen'`` + Only used if ``app_name`` is not supplied. Uses default sim or gen application. + + app_name: String, optional + The application name. Must be supplied if ``calc_type`` is not. + + app_args: dict + A dictionary of options that correspond to fields to template in the + ApplicationDefinition's ``command_template`` field. + + num_procs: int, optional + The total number of MPI ranks on which to submit the task + + num_nodes: int, optional + The number of nodes on which to submit the task + + procs_per_node: int, optional + The processes per node for this task + + max_tasks_per_node: int + Instructs Balsam to schedule at most this many Jobs per node. + + machinefile: string, optional + Name of a machinefile for this task to use. Unused by Balsam + + gpus_per_rank: int, optional + Number of GPUs to reserve for each MPI rank + + transfers: dict, optional + A Job-specific Balsam transfers dictionary that corresponds with an + ``ApplicationDefinition`` ``transfers`` field. See the Balsam docs for + more information. + + workdir: String + Specifies as name for the Job's output directory within the Balsam site's + data directory. Default: ``libe_workflow`` + + dry_run: boolean, optional + Whether this is a dry run - no task will be launched; instead + runline is printed to logger (at ``INFO`` level) + + wait_on_start: boolean, optional + Whether to block, and wait for task to be polled as ``RUNNING`` (or other + active/end state) before continuing + + extra_args: dict, optional + Additional arguments to supply to MPI runner. + + tags: dict, optional + Additional tags to organize the ``Job`` or restrict which ``BatchJobs`` run it. + + Returns + ------- + + task: obj: Task + The launched task object + + Note that since Balsam Jobs are often sent to entirely different machines + than where libEnsemble is running, how libEnsemble's resource manager + has divided local resources among workers doesn't impact what resources + can be requested for a Balsam ``Job`` running on an entirely different machine. - The created task object is returned. """ if app_name is not None: @@ -279,60 +507,56 @@ def submit(self, calc_type=None, app_name=None, num_procs=None, else: raise ExecutorException("Either app_name or calc_type must be set") - # Specific to this class + if len(workdir): + workdir = os.path.join(self.workflow_name, workdir) + else: + workdir = self.workflow_name + if machinefile is not None: logger.warning("machinefile arg ignored - not supported in Balsam") - jassert(num_procs or num_nodes or procs_per_node, - "No procs/nodes provided - aborting") - - num_procs, num_nodes, procs_per_node = \ - mpi_resources.task_partition(num_procs, num_nodes, procs_per_node) - - if stdout is not None or stderr is not None: - logger.warning("Balsam does not currently accept a stdout " - "or stderr name - ignoring") - stdout = None - stderr = None - - # Will be possible to override with arg when implemented - # (or can have option to let Balsam assign) - default_workdir = os.getcwd() - task = BalsamTask(app, app_args, default_workdir, - stdout, stderr, self.workerID) - - add_task_args = {'name': task.name, - 'workflow': self.workflow_name, - 'user_workdir': default_workdir, - 'application': app.gname, - 'args': task.app_args, - 'num_nodes': num_nodes, - 'procs_per_node': procs_per_node, - 'mpi_flags': extra_args} - - if stage_inout is not None: - # For now hardcode staging - for testing - add_task_args['stage_in_url'] = "local:" + stage_inout + "/*" - add_task_args['stage_out_url'] = "local:" + stage_inout - add_task_args['stage_out_files'] = "*.out" + jassert( + num_procs or num_nodes or procs_per_node, + "No procs/nodes provided - aborting", + ) + + task = BalsamTask(app, app_args, workdir, None, None, self.workerID) if dry_run: task.dry_run = True - logger.info('Test (No submit) Runline: {}'.format(' '.join(add_task_args))) + logger.info("Test (No submit) Balsam app {}".format(app_name)) task._set_complete(dry_run=True) else: - task.process = dag.add_job(**add_task_args) - - if (wait_on_start): + App = app.pyobj + + try: + App.sync() # if App source-code available, send to Balsam service + except OSError: + pass # App retrieved from Balsam service, assume no access to source-code + + task.process = Job( + app_id=App, + workdir=workdir, + parameters=app_args, + num_nodes=num_nodes, + ranks_per_node=procs_per_node, + launch_params=extra_args, + gpus_per_rank=gpus_per_rank, + node_packing_count=max_tasks_per_node, + transfers=transfers, + ) + + task.process.save() + + if wait_on_start: self._wait_on_start(task) if not task.timer.timing: task.timer.start() task.submit_time = task.timer.tstart # Time not date - may not need if using timer. - logger.info("Added task to Balsam database {}: " - "nodes {} ppn {}". - format(task.name, num_nodes, procs_per_node)) + logger.info( + "Submitted Balsam App to site {}: " "nodes {} ppn {}".format(App.site, num_nodes, procs_per_node) + ) - # task.workdir = task.process.working_directory # Might not be set yet! self.list_of_tasks.append(task) return task diff --git a/libensemble/executors/executor.py b/libensemble/executors/executor.py index 76a1a98ba..70c44cabe 100644 --- a/libensemble/executors/executor.py +++ b/libensemble/executors/executor.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) # To change logging level for just this module -# logger.setLevel(logging.DEBUG) +logger.setLevel(logging.DEBUG) STATES = """ UNKNOWN @@ -77,7 +77,7 @@ class Application: prefix = 'libe_app' - def __init__(self, full_path, name=None, calc_type='sim', desc=None): + def __init__(self, full_path, name=None, calc_type='sim', desc=None, pyobj=None): """Instantiates a new Application instance.""" self.full_path = full_path self.calc_type = calc_type @@ -86,6 +86,7 @@ def __init__(self, full_path, name=None, calc_type='sim', desc=None): if self.exe.endswith('.py'): self.full_path = ' '.join((sys.executable, full_path)) self.name = name or self.exe + self.pyobj = pyobj self.desc = desc or (self.exe + ' app') self.gname = '_'.join([Application.prefix, self.name]) diff --git a/libensemble/executors/legacy_balsam_executor.py b/libensemble/executors/legacy_balsam_executor.py new file mode 100644 index 000000000..cdfcf6271 --- /dev/null +++ b/libensemble/executors/legacy_balsam_executor.py @@ -0,0 +1,338 @@ +""" +This module launches and controls the running of tasks with Balsam_ versions up to 0.5.0. Balsam +is especially useful when running libEnsemble on three-tier systems with intermediate +launch nodes. Typically on such systems, MPI processes are themselves unable +to submit further MPI tasks to the batch scheduler. Therefore when libEnsemble's +workers have been launched in a distributed fashion via MPI, they must communicate +with an intermediate service like Balsam running on the launch nodes. The Balsam +service then reserves compute resources and launches tasks from libEnsemble's workers +that are using the Balsam MPI Executor. + +In order to create a Balsam executor, the calling script should contain :: + + exctr = LegacyBalsamMPIExecutor() + +The Balsam executor inherits from the MPI executor. See the +:doc:`MPIExecutor` for shared API. Any differences are +shown below. + +.. _Balsam: https://balsam.readthedocs.io/en/master/ + +""" + +import os +import logging +import time +import datetime + +from libensemble.resources import mpi_resources +from libensemble.executors.executor import \ + Application, Task, ExecutorException, TimeoutExpired, jassert, STATES +from libensemble.executors.mpi_executor import MPIExecutor + +import balsam.launcher.dag as dag +from balsam.core import models + +logger = logging.getLogger(__name__) +# To change logging level for just this module +# logger.setLevel(logging.DEBUG) + + +class LegacyBalsamTask(Task): + """Wraps a Balsam Task from the Balsam service + + The same attributes and query routines are implemented. + + """ + + def __init__(self, app=None, app_args=None, workdir=None, + stdout=None, stderr=None, workerid=None): + """Instantiate a new LegacyBalsamTask instance. + + A new LegacyBalsamTask object is created with an id, status and + configuration attributes. This will normally be created by the + executor on a submission. + """ + # May want to override workdir with Balsam value when it exists + Task.__init__(self, app, app_args, workdir, stdout, stderr, workerid) + + def read_file_in_workdir(self, filename): + return self.process.read_file_in_workdir(filename) + + def read_stdout(self): + return self.process.read_file_in_workdir(self.stdout) + + def read_stderr(self): + return self.process.read_file_in_workdir(self.stderr) + + def _get_time_since_balsam_submit(self): + """Return time since balsam task entered RUNNING state""" + + # If wait_on_start then can could calculate runtime same a base executor + # but otherwise that will return time from task submission. Get from Balsam. + + # self.runtime = self.process.runtime_seconds # Only reports at end of run currently + balsam_launch_datetime = self.process.get_state_times().get('RUNNING', None) + current_datetime = datetime.datetime.now() + if balsam_launch_datetime: + return (current_datetime - balsam_launch_datetime).total_seconds() + else: + return 0 + + def calc_task_timing(self): + """Calculate timing information for this task""" + + # Get runtime from Balsam + self.runtime = self._get_time_since_balsam_submit() + + if self.submit_time is None: + logger.warning("Cannot calc task total_time - submit time not set") + return + + if self.total_time is None: + self.total_time = time.time() - self.submit_time + + def _set_complete(self, dry_run=False): + """Set task as complete""" + self.finished = True + if dry_run: + self.success = True + self.state = 'FINISHED' + else: + balsam_state = self.process.state + self.workdir = self.workdir or self.process.working_directory + self.calc_task_timing() + self.success = (balsam_state == 'JOB_FINISHED') + if balsam_state == 'JOB_FINISHED': + self.state = 'FINISHED' + elif balsam_state == 'PARENT_KILLED': # Not currently used + self.state = 'USER_KILLED' + elif balsam_state in STATES: # In my states + self.state = balsam_state + else: + logger.warning("Task finished, but in unrecognized " + "Balsam state {}".format(balsam_state)) + self.state = 'UNKNOWN' + + logger.info("Task {} ended with state {}". + format(self.name, self.state)) + + def poll(self): + """Polls and updates the status attributes of the supplied task""" + if self.dry_run: + return + + if not self._check_poll(): + return + + # Get current state of tasks from Balsam database + self.process.refresh_from_db() + balsam_state = self.process.state + self.runtime = self._get_time_since_balsam_submit() + + if balsam_state in models.END_STATES: + self._set_complete() + + elif balsam_state in models.ACTIVE_STATES: + self.state = 'RUNNING' + self.workdir = self.workdir or self.process.working_directory + + elif (balsam_state in models.PROCESSABLE_STATES or + balsam_state in models.RUNNABLE_STATES): + self.state = 'WAITING' + + else: + raise ExecutorException( + "Task state returned from Balsam is not in known list of " + "Balsam states. Task state is {}".format(balsam_state)) + + def wait(self, timeout=None): + """Waits on completion of the task or raises TimeoutExpired exception + + Status attributes of task are updated on completion. + + Parameters + ---------- + + timeout: + Time in seconds after which a TimeoutExpired exception is raised""" + + if self.dry_run: + return + + if not self._check_poll(): + return + + # Wait on the task + start = time.time() + self.process.refresh_from_db() + while self.process.state not in models.END_STATES: + time.sleep(0.2) + self.process.refresh_from_db() + if timeout and time.time() - start > timeout: + self.runtime = self._get_time_since_balsam_submit() + raise TimeoutExpired(self.name, timeout) + + self.runtime = self._get_time_since_balsam_submit() + self._set_complete() + + def kill(self, wait_time=None): + """ Kills or cancels the supplied task """ + + dag.kill(self.process) + + # Could have Wait here and check with Balsam its killed - + # but not implemented yet. + + logger.info("Killing task {}".format(self.name)) + self.state = 'USER_KILLED' + self.finished = True + self.calc_task_timing() + + +class LegacyBalsamMPIExecutor(MPIExecutor): + """Inherits from MPIExecutor and wraps the Balsam task management service + + .. note:: Task kills are not configurable in the Balsam executor. + + """ + def __init__(self, custom_info={}): + """Instantiate a new LegacyBalsamMPIExecutor instance. + + A new LegacyBalsamMPIExecutor object is created with an application + registry and configuration attributes + """ + + if custom_info: + logger.warning("The Balsam executor does not support custom_info - ignoring") + + super().__init__(custom_info) + + self.workflow_name = "libe_workflow" + + def serial_setup(self): + """Balsam serial setup includes empyting database and adding applications""" + LegacyBalsamMPIExecutor.del_apps() + LegacyBalsamMPIExecutor.del_tasks() + + for app in self.apps.values(): + calc_name = app.gname + desc = app.desc + full_path = app.full_path + self.add_app(calc_name, full_path, desc) + + @staticmethod + def del_apps(): + """Deletes all Balsam apps in the libe_app namespace""" + AppDef = models.ApplicationDefinition + + # Some error handling on deletes.... is it internal + for app_type in [Application.prefix]: + deletion_objs = AppDef.objects.filter(name__contains=app_type) + if deletion_objs: + for del_app in deletion_objs.iterator(): + logger.debug("Deleting app {}".format(del_app.name)) + deletion_objs.delete() + + @staticmethod + def del_tasks(): + """Deletes all Balsam tasks """ + for app_type in [Task.prefix]: + deletion_objs = models.BalsamJob.objects.filter( + name__contains=app_type) + if deletion_objs: + for del_task in deletion_objs.iterator(): + logger.debug("Deleting task {}".format(del_task.name)) + deletion_objs.delete() + + @staticmethod + def add_app(name, exepath, desc): + """ Add application to Balsam database """ + AppDef = models.ApplicationDefinition + app = AppDef() + app.name = name + app.executable = exepath + app.description = desc + # app.default_preprocess = '' # optional + # app.default_postprocess = '' # optional + app.save() + logger.debug("Added App {}".format(app.name)) + + def set_resources(self, resources): + self.resources = resources + + def submit(self, calc_type=None, app_name=None, num_procs=None, + num_nodes=None, procs_per_node=None, machinefile=None, + app_args=None, stdout=None, stderr=None, stage_inout=None, + hyperthreads=False, dry_run=False, wait_on_start=False, + extra_args=''): + """Creates a new task, and either executes or schedules to execute + in the executor + + The created task object is returned. + """ + + if app_name is not None: + app = self.get_app(app_name) + elif calc_type is not None: + app = self.default_app(calc_type) + else: + raise ExecutorException("Either app_name or calc_type must be set") + + # Specific to this class + if machinefile is not None: + logger.warning("machinefile arg ignored - not supported in Balsam") + jassert(num_procs or num_nodes or procs_per_node, + "No procs/nodes provided - aborting") + + num_procs, num_nodes, procs_per_node = \ + mpi_resources.task_partition(num_procs, num_nodes, procs_per_node) + + if stdout is not None or stderr is not None: + logger.warning("Balsam does not currently accept a stdout " + "or stderr name - ignoring") + stdout = None + stderr = None + + # Will be possible to override with arg when implemented + # (or can have option to let Balsam assign) + default_workdir = os.getcwd() + task = LegacyBalsamTask(app, app_args, default_workdir, + stdout, stderr, self.workerID) + + add_task_args = {'name': task.name, + 'workflow': self.workflow_name, + 'user_workdir': default_workdir, + 'application': app.gname, + 'args': task.app_args, + 'num_nodes': num_nodes, + 'procs_per_node': procs_per_node, + 'mpi_flags': extra_args} + + if stage_inout is not None: + # For now hardcode staging - for testing + add_task_args['stage_in_url'] = "local:" + stage_inout + "/*" + add_task_args['stage_out_url'] = "local:" + stage_inout + add_task_args['stage_out_files'] = "*.out" + + if dry_run: + task.dry_run = True + logger.info('Test (No submit) Runline: {}'.format(' '.join(add_task_args))) + task._set_complete(dry_run=True) + else: + task.process = dag.add_job(**add_task_args) + + if (wait_on_start): + self._wait_on_start(task) + + if not task.timer.timing: + task.timer.start() + task.submit_time = task.timer.tstart # Time not date - may not need if using timer. + + logger.info("Added task to Balsam database {}: " + "nodes {} ppn {}". + format(task.name, num_nodes, procs_per_node)) + + # task.workdir = task.process.working_directory # Might not be set yet! + self.list_of_tasks.append(task) + return task diff --git a/libensemble/tests/balsam_tests/reset_balsam_tests.py b/libensemble/tests/balsam_tests/reset_balsam_tests.py deleted file mode 100755 index 80e34fc29..000000000 --- a/libensemble/tests/balsam_tests/reset_balsam_tests.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python - -import balsam.launcher.dag as dag - -dag.BalsamTask.objects.filter(name__contains='outfile').delete() - -for job in dag.BalsamTask.objects.filter(name__contains='job_test_balsam'): - job.update_state('CREATED') - job.save() diff --git a/libensemble/tests/balsam_tests/bash_scripts/setup_balsam_tests.sh b/libensemble/tests/deprecated_tests/balsam_tests/bash_scripts/setup_balsam_tests.sh similarity index 100% rename from libensemble/tests/balsam_tests/bash_scripts/setup_balsam_tests.sh rename to libensemble/tests/deprecated_tests/balsam_tests/bash_scripts/setup_balsam_tests.sh diff --git a/libensemble/tests/balsam_tests/env_setup_theta.sh b/libensemble/tests/deprecated_tests/balsam_tests/env_setup_theta.sh similarity index 100% rename from libensemble/tests/balsam_tests/env_setup_theta.sh rename to libensemble/tests/deprecated_tests/balsam_tests/env_setup_theta.sh diff --git a/libensemble/tests/balsam_tests/readme.balsam_tests.txt b/libensemble/tests/deprecated_tests/balsam_tests/readme.balsam_tests.txt similarity index 100% rename from libensemble/tests/balsam_tests/readme.balsam_tests.txt rename to libensemble/tests/deprecated_tests/balsam_tests/readme.balsam_tests.txt diff --git a/libensemble/tests/balsam_tests/readme.rst b/libensemble/tests/deprecated_tests/balsam_tests/readme.rst similarity index 100% rename from libensemble/tests/balsam_tests/readme.rst rename to libensemble/tests/deprecated_tests/balsam_tests/readme.rst diff --git a/libensemble/tests/deprecated_tests/balsam_tests/reset_balsam_tests.py b/libensemble/tests/deprecated_tests/balsam_tests/reset_balsam_tests.py new file mode 100755 index 000000000..01965fc15 --- /dev/null +++ b/libensemble/tests/deprecated_tests/balsam_tests/reset_balsam_tests.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +import balsam.launcher.dag as dag + +dag.LegacyBalsamTask.objects.filter(name__contains='outfile').delete() + +for job in dag.LegacyBalsamTask.objects.filter(name__contains='job_test_balsam'): + job.update_state('CREATED') + job.save() diff --git a/libensemble/tests/balsam_tests/setup_balsam_tests.py b/libensemble/tests/deprecated_tests/balsam_tests/setup_balsam_tests.py similarity index 100% rename from libensemble/tests/balsam_tests/setup_balsam_tests.py rename to libensemble/tests/deprecated_tests/balsam_tests/setup_balsam_tests.py diff --git a/libensemble/tests/balsam_tests/test_balsam_1__runjobs.py b/libensemble/tests/deprecated_tests/balsam_tests/test_balsam_1__runjobs.py similarity index 100% rename from libensemble/tests/balsam_tests/test_balsam_1__runjobs.py rename to libensemble/tests/deprecated_tests/balsam_tests/test_balsam_1__runjobs.py diff --git a/libensemble/tests/balsam_tests/test_balsam_2__workerkill.py b/libensemble/tests/deprecated_tests/balsam_tests/test_balsam_2__workerkill.py similarity index 100% rename from libensemble/tests/balsam_tests/test_balsam_2__workerkill.py rename to libensemble/tests/deprecated_tests/balsam_tests/test_balsam_2__workerkill.py diff --git a/libensemble/tests/balsam_tests/test_balsam_3__managerkill.py b/libensemble/tests/deprecated_tests/balsam_tests/test_balsam_3__managerkill.py similarity index 100% rename from libensemble/tests/balsam_tests/test_balsam_3__managerkill.py rename to libensemble/tests/deprecated_tests/balsam_tests/test_balsam_3__managerkill.py diff --git a/libensemble/tests/standalone_executor_tests/create_balsam_job.py b/libensemble/tests/deprecated_tests/standalone_executor_tests/create_balsam_job.py similarity index 100% rename from libensemble/tests/standalone_executor_tests/create_balsam_job.py rename to libensemble/tests/deprecated_tests/standalone_executor_tests/create_balsam_job.py diff --git a/libensemble/tests/standalone_executor_tests/readme.txt b/libensemble/tests/deprecated_tests/standalone_executor_tests/readme.txt similarity index 100% rename from libensemble/tests/standalone_executor_tests/readme.txt rename to libensemble/tests/deprecated_tests/standalone_executor_tests/readme.txt diff --git a/libensemble/tests/standalone_executor_tests/set.balsam.database.sh b/libensemble/tests/deprecated_tests/standalone_executor_tests/set.balsam.database.sh similarity index 100% rename from libensemble/tests/standalone_executor_tests/set.balsam.database.sh rename to libensemble/tests/deprecated_tests/standalone_executor_tests/set.balsam.database.sh diff --git a/libensemble/tests/standalone_executor_tests/simdir/my_simtask.c b/libensemble/tests/deprecated_tests/standalone_executor_tests/simdir/my_simtask.c similarity index 100% rename from libensemble/tests/standalone_executor_tests/simdir/my_simtask.c rename to libensemble/tests/deprecated_tests/standalone_executor_tests/simdir/my_simtask.c diff --git a/libensemble/tests/standalone_executor_tests/simdir/my_simtask.f90 b/libensemble/tests/deprecated_tests/standalone_executor_tests/simdir/my_simtask.f90 similarity index 100% rename from libensemble/tests/standalone_executor_tests/simdir/my_simtask.f90 rename to libensemble/tests/deprecated_tests/standalone_executor_tests/simdir/my_simtask.f90 diff --git a/libensemble/tests/standalone_executor_tests/test_executor.py b/libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor.py similarity index 96% rename from libensemble/tests/standalone_executor_tests/test_executor.py rename to libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor.py index 92d06cc89..9f54d8395 100644 --- a/libensemble/tests/standalone_executor_tests/test_executor.py +++ b/libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor.py @@ -32,9 +32,8 @@ def build_simfunc(): # Create and add exes to registry if USE_BALSAM: - from libensemble.executors.balsam_executor import BalsamMPIExecutor - - exctr = BalsamMPIExecutor() + from libensemble.executors.balsam_executor import LegacyBalsamMPIExecutor + exctr = LegacyBalsamMPIExecutor() else: from libensemble.executors.mpi_executor import MPIExecutor diff --git a/libensemble/tests/standalone_executor_tests/test_executor_manager_poll.py b/libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor_manager_poll.py similarity index 96% rename from libensemble/tests/standalone_executor_tests/test_executor_manager_poll.py rename to libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor_manager_poll.py index a4c57efd3..724b14a3d 100644 --- a/libensemble/tests/standalone_executor_tests/test_executor_manager_poll.py +++ b/libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor_manager_poll.py @@ -40,9 +40,8 @@ def build_simfunc(): # Create and add exes to registry if USE_BALSAM: - from libensemble.executors.balsam_executor import BalsamMPIExecutor - - exctr = BalsamMPIExecutor() + from libensemble.executors.balsam_executor import LegacyBalsamMPIExecutor + exctr = LegacyBalsamMPIExecutor() else: from libensemble.executors.mpi_executor import MPIExecutor diff --git a/libensemble/tests/standalone_executor_tests/test_executor_multi.py b/libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor_multi.py similarity index 97% rename from libensemble/tests/standalone_executor_tests/test_executor_multi.py rename to libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor_multi.py index 7f733e487..6718be48c 100644 --- a/libensemble/tests/standalone_executor_tests/test_executor_multi.py +++ b/libensemble/tests/deprecated_tests/standalone_executor_tests/test_executor_multi.py @@ -34,9 +34,8 @@ def build_simfunc(): # Create and add exes to registry if USE_BALSAM: - from libensemble.baslam_executor import BalsamMPIExecutor - - exctr = BalsamMPIExecutor() + from libensemble.baslam_executor import LegacyBalsamMPIExecutor + exctr = LegacyBalsamMPIExecutor() else: from libensemble.executors.mpi_executor import MPIExecutor diff --git a/libensemble/tests/deprecated_tests/test_nan_func_old_aposmm.py b/libensemble/tests/deprecated_tests/test_nan_func_old_aposmm.py index 7a024bb5c..235d6da18 100644 --- a/libensemble/tests/deprecated_tests/test_nan_func_old_aposmm.py +++ b/libensemble/tests/deprecated_tests/test_nan_func_old_aposmm.py @@ -28,7 +28,10 @@ sim_specs = { 'sim_f': sim_f, 'in': ['x'], - 'out': [('f', float), ('f_i', float)], + 'out': [ + ('f', float), + ('f_i', float), + ], } gen_out += [('x', float, n), ('x_on_cube', float, n), ('obj_component', int)] diff --git a/libensemble/tests/regression_tests/script_test_balsam_hworld.py b/libensemble/tests/regression_tests/script_test_balsam_hworld.py index 39ccf36f9..80f84ab6a 100644 --- a/libensemble/tests/regression_tests/script_test_balsam_hworld.py +++ b/libensemble/tests/regression_tests/script_test_balsam_hworld.py @@ -7,7 +7,7 @@ import mpi4py from mpi4py import MPI -from libensemble.executors.balsam_executor import BalsamMPIExecutor +from libensemble.executors.legacy_balsam_executor import LegacyBalsamMPIExecutor from libensemble.message_numbers import WORKER_DONE, WORKER_KILL_ON_ERR, WORKER_KILL_ON_TIMEOUT, TASK_FAILED from libensemble.libE import libE from libensemble.sim_funcs.executor_hworld import executor_hworld @@ -32,7 +32,7 @@ sim_app = './my_simtask.x' sim_app2 = six_hump_camel.__file__ -exctr = BalsamMPIExecutor() +exctr = LegacyBalsamMPIExecutor() exctr.register_app(full_path=sim_app, calc_type='sim') # Default 'sim' app - backward compatible exctr.register_app(full_path=sim_app2, app_name='six_hump_camel') # Named app exctr.register_app(full_path=sim_app2, app_name='sim_hump_camel_dry_run') diff --git a/libensemble/tests/scaling_tests/forces/balsam_forces/cleanup.sh b/libensemble/tests/scaling_tests/forces/balsam_forces/cleanup.sh new file mode 100755 index 000000000..6f5720f96 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/balsam_forces/cleanup.sh @@ -0,0 +1 @@ +rm -r ensemble* *.npy *.pickle ensemble.log lib*.txt *.stat diff --git a/libensemble/tests/scaling_tests/forces/balsam_forces/define_apps.py b/libensemble/tests/scaling_tests/forces/balsam_forces/define_apps.py new file mode 100644 index 000000000..c232fdb06 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/balsam_forces/define_apps.py @@ -0,0 +1,61 @@ +from balsam.api import ApplicationDefinition + +""" +This script uses the Balsam API to define and sync two types of Balsam apps: +a libEnsemble app, and a Forces app: + + - The libEnsemble app runs the calling script ``run_libe_forces_balsam.py``. + An input transfer is also specified, but parameterized in + ``submit_libe_forces_balsam.py`` as part of the Job specification process. + + - The Forces app is defined and synced with Balsam. The libEnsemble app + will submit instances of the Forces app to the Balsam service for scheduling + on a running batch session at its site. An optional output transfer is defined; + forces.stat files are transferred back to the Globus endpoint defined in + run_libe_forces_balsam.py + +Unless changes are made to these Apps, this should only need to be run once to +register each of these apps with the Balsam service. + +If not running libEnsemble remotely, feel free to comment-out ``RemoteLibensembleApp.sync()`` +""" + + +class RemoteLibensembleApp(ApplicationDefinition): + site = "jln_theta" + command_template = ( + "/home/jnavarro/.conda/envs/again/bin/python /home/jnavarro" + + "/libensemble/libensemble/tests/scaling_tests/forces/balsam_forces/run_libe_forces_balsam.py" + + " > libe_out.txt 2>&1" + ) + + +print("Defined RemoteLibensembleApp Balsam ApplicationDefinition.") + + +class RemoteForces(ApplicationDefinition): + site = "jln_theta" + command_template = ( + "/home/jnavarro" + + "/libensemble/libensemble/tests/scaling_tests/forces/forces_app/forces.x" + + " {{sim_particles}} {{sim_timesteps}} {{seed}}" + + " > out.txt 2>&1" + ) + + transfers = { + "result": { + "required": True, + "direction": "out", + "local_path": "forces.stat", + "description": "Forces stat file", + "recursive": False, + } + } + + +print("Defined RemoteForces Balsam ApplicationDefinition.") + +RemoteLibensembleApp.sync() +RemoteForces.sync() + +print("Synced each app with the Balsam service.") diff --git a/libensemble/tests/scaling_tests/forces/balsam_forces/forces_simf.py b/libensemble/tests/scaling_tests/forces/balsam_forces/forces_simf.py new file mode 100644 index 000000000..97da5e28d --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/balsam_forces/forces_simf.py @@ -0,0 +1,80 @@ +import os +import time +import numpy as np + +from libensemble.executors.executor import Executor +from libensemble.message_numbers import WORKER_DONE, TASK_FAILED + + +def run_forces_balsam(H, persis_info, sim_specs, libE_info): + + calc_status = 0 + + particles = str(int(H["x"][0][0])) + + exctr = Executor.executor + + GLOBUS_ENDPOINT = sim_specs["user"]["globus_endpoint"] + GLOBUS_DEST_DIR = sim_specs["user"]["globus_dest_dir"] + THIS_SCRIPT_ON_THETA = sim_specs["user"]["this_script_on_theta"] + + args = { + "sim_particles": particles, + "sim_timesteps": str(10), + "seed": particles, + } + + workdir = ( + "sim" + str(libE_info["H_rows"][0]) + "_worker" + str(libE_info["workerID"]) + ) + + statfile = "forces{}.stat".format(particles) + + if THIS_SCRIPT_ON_THETA: + transfer_statfile_path = GLOBUS_DEST_DIR + statfile + local_statfile_path = ( + "../" + workdir + "/" + transfer_statfile_path.split("/")[-1] + ) + else: + transfer_statfile_path = os.getcwd() + "/" + statfile + local_statfile_path = transfer_statfile_path + + transfer = {"result": GLOBUS_ENDPOINT + ":" + transfer_statfile_path} + + task = exctr.submit( + app_name="forces", + app_args=args, + num_procs=4, + num_nodes=1, + procs_per_node=4, + max_tasks_per_node=1, + transfers=transfer, + workdir=workdir, + ) + + task.wait(timeout=300) + task.poll() + + print("Task {} polled. state: {}.".format(task.name, task.state)) + + while True: + time.sleep(1) + if ( + os.path.isfile(local_statfile_path) + and os.path.getsize(local_statfile_path) > 0 + ): + break + + try: + data = np.loadtxt(local_statfile_path) + final_energy = data[-1] + calc_status = WORKER_DONE + except Exception: + final_energy = np.nan + calc_status = TASK_FAILED + + outspecs = sim_specs["out"] + output = np.zeros(1, dtype=outspecs) + output["energy"][0] = final_energy + + return output, persis_info, calc_status diff --git a/libensemble/tests/scaling_tests/forces/balsam_forces/readme.md b/libensemble/tests/scaling_tests/forces/balsam_forces/readme.md new file mode 100644 index 000000000..364ccf6dc --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/balsam_forces/readme.md @@ -0,0 +1,177 @@ +## Running test run_libe_forces_balsam.py + +Naive Electrostatics Code Test + +This is a synthetic, highly configurable simulation function. Its primary use +is to test libEnsemble's capability to submit application instances via the Balsam service, +including to separate machines from libEnsemble's processes. This means that although +this is typically a HPC scaling test, this can be run on a laptop with the `forces.x` +simulation submitted to the remote machine, and the resulting data-files transferred +back to the machine that runs the libEnsemble calling script. + +Note that this test currently requires active ALCF credentials to authenticate with +the Balsam service. + +### Forces Mini-App + +A system of charged particles is initialized and simulated over a number of time-steps. + +Particles' position and charge are initiated using a random stream. +Particles are replicated on all ranks. +**Each rank** computes forces for a subset of particles (`O(N^2)` operations). +Particle force arrays are `allreduced` across ranks. +Particles are moved (replicated on each rank). +Total energy is appended to the forces.stat file. + +To run forces as a standalone executable on `N` procs: + + mpirun -np N ./forces.x + +**This application will need to be compiled on the remote machine:** + + cd libensemble/libensemble/tests/scaling_tests/forces/forces_app + ./build_forces.sh + +### Configuring Balsam + +On the remote machine (in a conda or other virtual environment): + + pip install balsam + balsam login + balsam site init ./my-site + cd my-site; balsam site start + +You may be asked to login and authenticate with the Balsam service. Do so with +your ALCF credentials. + +On any machine you've installed and logged into Balsam, you can run `balsam site ls` +to list your sites and `balsam job rm --all` to remove extraneous jobs between runs. + +### Configuring data-transfer via Balsam and Globus + +Although the raw results of forces runs are available in Balsam sites, +this is understandably insufficient for the simulation function's capability +to evaluate results and determine the final status of an app run if it's running +on another machine. + +Balsam can coordinate data transfers via Globus between Globus endpoints. Assuming +this test is being run on a personal device, do the following to configure Globus, +then Balsam to use Globus. + +- Login to [Globus](https://www.globus.org/) using ALCF or other approved organization credentials. +- Download and run [Globus Connect Personal](https://app.globus.org/file-manager/gcp) to register your device as a Globus endpoint. Note the initialized collection name, e.g. ``test_collection``. +- Once a Globus collection has been initialized in Globus Connect Personal, login to Globus, click "Endpoints" on the left. +- Click the collection that was created on your personal device. Copy the string after "Endpoint UUID". +- Login to the remote machine, switch to your Balsam site directory, run ``balsam site globus-login``. +- Modify ``settings.yml`` to contain a new transfer_location that matches your device, with the copied endpoint UUID. e.g. ``test_collection: globus://19036a15-570a-12f8-bef8-22060b9b458d`` +- Run ``balsam site sync`` within the site directory to save these changes. +- Locally, in the calling script (``run_libe_forces_balsam.py``), set ``GLOBUS_ENDPOINT`` to the collection name for the previously-defined transfer_location. + +This should be sufficient for ``forces.stat`` files from remote Balsam app runs +to be transferred back to your personal device after every app run. The +simulation function will wait for Balsam to transfer back a stat file, then determine +the calc status based on the received output. + +*To transfer files to/from Theta*, you will need to login to Globus and activate +Theta's Managed Public Endpoint: + +- Login to Globus, click "Endpoints" on the left. +- Search for ``alcf#dtn_theta``, click on the result. +- On the right, click "Activate", then "Continue". Authenticate with ALCF. + +### Configuring libEnsemble + +There are several scripts that each need to be adjusted. To explain each: + +1. ``define_apps.py``: + + About: + + This script defines and syncs each of our Balsam apps with the Balsam service. A Balsam + app is an ``ApplicationDefinition`` class with ``site`` and + ``command_template`` fields. ``site`` specifies to Balsam on which Balsam site + the app should be run, and ``command_template`` specifies the command (as a Jinja2 + string template) that should be executed. This script contains two apps, ``RemoteLibensembleApp`` + and ``RemoteForces``. If you're running libEnsemble on your personal machine and + only submitting the Forces app via Balsam, only ``RemoteForces`` needs adjusting. + + Configuring: + + Adjust the ``site`` field in each ``ApplicationDefinition`` to match your remote + Balsam site. Adjust the various paths in the ``command_template`` fields to match + your home directory and/or Python paths **on the remote machine**. If running + libEnsemble on your personal machine, feel free comment-out ``RemoteLibensembleApp.sync()``. + + **Run this script each time you edit it,** since changes to each + ``ApplicationDefinition`` need to be synced with the Balsam service. + +2. ``run_libe_forces_balsam.py``: + + About: + + This is a typical libEnsemble plus Executor calling script, but instead of + registering paths to apps like with the MPI Executor, this script loads the + ``RemoteForces`` app synced with the Balsam service in ``define_apps.py`` + and registers it with libEnsemble's Balsam Executor. If running this + script on your personal machine, it also uses the Balsam Executor to reserve + out resources at a Balsam site. + + Configuring: + + See the Globus instructions above for setting up Globus transfers within this script. + + Adjust the ``BALSAM_SITE`` field + to match your remote Balsam site, and fields in the in the + ``batch = exctr.submit_allocation()`` block further down. For ``site_id``, + retrieve the corresponding field with ``balsam site ls``. + +3. (optional) ``submit_libe_forces_balsam.py``: + + About: + + This Python script is effectively a batch submission script. It uses the Balsam API + to check out resources (a ``BatchJob``) at a Balsam site, and submits libEnsemble as + a Balsam Job onto those resources. If transferring statfiles back to your + personal machine, it also waits until they are all returned and cancels + the remote ``BatchJob``. *Probably only needed if running libEnsemble remotely.* + + Configuring: + + Every field in UPPER_CASE can be adjusted. ``BALSAM_SITE``, ``PROJECT``, + and ``QUEUE`` among others will probably need adjusting. ``LIBE_NODES`` and ``LIBE_RANKS`` + specify a subset of resources specifically for libEnsemble out of ``BATCH_NUM_NODES``. + +### Running libEnsemble locally + +First make sure that all Balsam apps are synced with the Balsam service: + + python define_apps.py + +Then run libEnsemble with multiprocessing comms, with one manager and `N` workers: + + python run_libe_forces_balsam.py --comms local --nworkers N + +Or, run with MPI comms using one manager and `N-1` workers: + + mpirun -np N python run_libe_forces_balsam.py + + +To remove output before the next run, use: + + ./cleanup.sh + +**This runs libEnsemble itself in-place, with only Forces submitted to a Balsam site.** + +### (Optional) Running libEnsemble remotely + +The previous instructions for running libEnsemble are understandably insufficient +if running with lots of workers or if the simulation/generation +functions are computationally expensive. + +To run both libEnsemble and the Forces app on the compute nodes at Balsam site, use: + + python define_apps.py + python submit_libe_forces_balsam.py + +This routine will wait for corresponding statfiles to be transferred back from +the remote machine, then cancel the allocation. diff --git a/libensemble/tests/scaling_tests/forces/balsam_forces/run_libe_forces_balsam.py b/libensemble/tests/scaling_tests/forces/balsam_forces/run_libe_forces_balsam.py new file mode 100644 index 000000000..b6f836b8b --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/balsam_forces/run_libe_forces_balsam.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +import os +import socket +import numpy as np + +from libensemble.libE import libE +from libensemble.gen_funcs.sampling import uniform_random_sample +from forces_simf import run_forces_balsam +from libensemble.executors import BalsamExecutor +from libensemble.tools import parse_args, add_unique_random_streams + +from balsam.api import ApplicationDefinition + +BALSAM_SITE = "jln_theta" + +# Is this running on a personal machine, or a compute node? +THIS_SCRIPT_ON_THETA = any([i in socket.gethostname() for i in ["theta", "nid0"]]) + +# Use Globus to transfer output forces.stat files back +GLOBUS_ENDPOINT = "jln_laptop" + +if not THIS_SCRIPT_ON_THETA: + GLOBUS_DEST_DIR_PREFIX = os.getcwd() + "/ensemble" +else: + GLOBUS_DEST_DIR_PREFIX = "/path/to/remote/ensemble/directory" + +# Parse number of workers, comms type, etc. from arguments +nworkers, is_manager, libE_specs, _ = parse_args() + +# State the sim_f, inputs, outputs +sim_specs = { + "sim_f": run_forces_balsam, # sim_f, imported above + "in": ["x"], # Name of input for sim_f + "out": [("energy", float)], # Name, type of output from sim_f + "user": { + "globus_endpoint": GLOBUS_ENDPOINT, + "globus_dest_dir": GLOBUS_DEST_DIR_PREFIX, + "this_script_on_theta": THIS_SCRIPT_ON_THETA, + }, +} + +# State the gen_f, inputs, outputs, additional parameters +gen_specs = { + "gen_f": uniform_random_sample, # Generator function + "out": [("x", float, (1,))], # Name, type and size of data from gen_f + "user": { + "lb": np.array([1000]), # User parameters for the gen_f + "ub": np.array([3000]), + "gen_batch_size": 8, + }, +} + +# Create and work inside separate per-simulation directories +libE_specs["sim_dirs_make"] = True + +# Instruct libEnsemble to exit after this many simulations +exit_criteria = {"sim_max": 8} + +persis_info = add_unique_random_streams({}, nworkers + 1) + +apps = ApplicationDefinition.load_by_site(BALSAM_SITE) +RemoteForces = apps["RemoteForces"] + +exctr = BalsamExecutor() +exctr.register_app(RemoteForces, app_name="forces") + +if not THIS_SCRIPT_ON_THETA: + batch = exctr.submit_allocation( + site_id=246, # Check if matches BALSAM_SITE with `balsam site ls` + num_nodes=4, + wall_time_min=30, + queue="debug-flat-quad", + project="CSC250STMS07", + ) + +# Launch libEnsemble +H, persis_info, flag = libE( + sim_specs, gen_specs, exit_criteria, persis_info=persis_info, libE_specs=libE_specs +) + +if not THIS_SCRIPT_ON_THETA: + exctr.revoke_allocation(batch) diff --git a/libensemble/tests/scaling_tests/forces/balsam_forces/submit_libe_forces_remotely.py b/libensemble/tests/scaling_tests/forces/balsam_forces/submit_libe_forces_remotely.py new file mode 100644 index 000000000..3e4a52573 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/balsam_forces/submit_libe_forces_remotely.py @@ -0,0 +1,73 @@ +import os +import time +import glob +from balsam.api import ApplicationDefinition, BatchJob + +""" +This file is roughly equivalent to a traditional batch submission shell script +that used legacy Balsam commands, except it uses the Balsam API to submit jobs +to the scheduler. It can also be run from anywhere and still submit jobs to +the same machine. It loads, parameterizes, and submits the LibensembleApp for +execution. Use this script to run libEnsemble as a Balsam Job on compute nodes. + +If running libEnsemble on a laptop, this script is not needed. Just run the +corresponding libEnsemble calling script as normal. +""" + +BALSAM_SITE = "jln_theta" + +# Batch Session Parameters +BATCH_NUM_NODES = 5 +BATCH_WALL_CLOCK_TIME = 60 +PROJECT = "CSC250STMS07" +QUEUE = "debug-flat-quad" + +# libEnsemble Job Parameters - A subset of above resources dedicated to libEnsemble +LIBE_NODES = 1 +LIBE_RANKS = 5 + +# This script cancels remote allocation once SIM_MAX statfiles transferred +TRANSFER_DESTINATION = "./ensemble" +SIM_MAX = 16 + +# Retrieve the libEnsemble app from the Balsam service +apps = ApplicationDefinition.load_by_site(BALSAM_SITE) +RemoteLibensembleApp = apps["RemoteLibensembleApp"] +RemoteLibensembleApp.resolve_site_id() + + +# Submit the libEnsemble app as a Job to the Balsam service. +# It will wait for a compatible, running BatchJob session (remote allocation) +libe_job = RemoteLibensembleApp.submit( + workdir="libe_workflow", + num_nodes=LIBE_NODES, + ranks_per_node=LIBE_RANKS, +) + +print("libEnsemble App retrieved and submitted as Job to Balsam service.") + +# Submit an allocation (BatchJob) request to the libEnsemble app's site +batch = BatchJob.objects.create( + site_id=libe_job.site_id, + num_nodes=BATCH_NUM_NODES, + wall_time_min=BATCH_WALL_CLOCK_TIME, + job_mode="mpi", + project=PROJECT, + queue=QUEUE, +) + +print("BatchJob session initialized. All Balsam apps will run in this BatchJob.") + +# Wait for all forces.stat files to be transferred back, then cancel the BatchJob +os.makedirs(TRANSFER_DESTINATION, exist_ok=True) +print("Waiting for all returned forces.stat files...") + +while len(glob.glob(os.path.abspath(TRANSFER_DESTINATION) + "/*.stat")) != SIM_MAX: + time.sleep(3) + +print("All forces.stat files returned. Cancelling BatchJob session.") + +batch.state = "pending_deletion" +batch.save() + +print("BatchJob session cancelled. Success!") diff --git a/libensemble/tests/scaling_tests/forces/balsam_local.sh b/libensemble/tests/scaling_tests/forces/balsam_local.sh deleted file mode 100755 index 9e447b743..000000000 --- a/libensemble/tests/scaling_tests/forces/balsam_local.sh +++ /dev/null @@ -1,36 +0,0 @@ -# Script for running with Balsam on a local system. - -# You need to have followed the instructions to install balsam and set-up/activate a database. -# https://github.com/balsam-alcf/balsam - -# The running jobs can be seen inside the setup database dir /data/libe_workflow/ - -# Name of calling script -export EXE=run_libe_forces.py - -# Number of workers. -export NUM_WORKERS=2 - -# Name of working directory where Balsam places running jobs/output (inside the database directory) -export WORKFLOW_NAME=libe_workflow - -export SCRIPT_ARGS=$NUM_WORKERS - -export LIBE_WALLCLOCK=5 # Balsam timeout in mins - -# Add calling script to Balsam database as app and job. -export THIS_DIR=$PWD -export SCRIPT_BASENAME=${EXE%.*} - -# Delete any apps/jobs in Balsam -balsam rm apps --all --force -balsam rm jobs --all --force - -# Register your libEnsemble calling script as an app. -balsam app --name $SCRIPT_BASENAME.app --exec $EXE --desc "Run $SCRIPT_BASENAME" - -# Register as a job -balsam job --name job_$SCRIPT_BASENAME --workflow $WORKFLOW_NAME --application $SCRIPT_BASENAME.app --args $SCRIPT_ARGS --wall-time-minutes $LIBE_WALLCLOCK --num-nodes 1 --ranks-per-node $((NUM_WORKERS+1)) --url-out="local:/$THIS_DIR" --stage-out-files="*.out *.txt *.log" --url-in="local:/$THIS_DIR/*" --yes - -#Run job -balsam launcher --consume-all --job-mode=mpi --num-transition-threads=1 diff --git a/libensemble/tests/scaling_tests/forces/cleanup.sh b/libensemble/tests/scaling_tests/forces/forces_adv/cleanup.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/cleanup.sh rename to libensemble/tests/scaling_tests/forces/forces_adv/cleanup.sh diff --git a/libensemble/tests/scaling_tests/forces/clone.sh b/libensemble/tests/scaling_tests/forces/forces_adv/clone.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/clone.sh rename to libensemble/tests/scaling_tests/forces/forces_adv/clone.sh diff --git a/libensemble/tests/scaling_tests/forces/forces.yaml b/libensemble/tests/scaling_tests/forces/forces_adv/forces.yaml similarity index 100% rename from libensemble/tests/scaling_tests/forces/forces.yaml rename to libensemble/tests/scaling_tests/forces/forces_adv/forces.yaml diff --git a/libensemble/tests/scaling_tests/forces/forces_simf.py b/libensemble/tests/scaling_tests/forces/forces_adv/forces_simf.py similarity index 100% rename from libensemble/tests/scaling_tests/forces/forces_simf.py rename to libensemble/tests/scaling_tests/forces/forces_adv/forces_simf.py diff --git a/libensemble/tests/scaling_tests/forces/forces_support.py b/libensemble/tests/scaling_tests/forces/forces_adv/forces_support.py similarity index 100% rename from libensemble/tests/scaling_tests/forces/forces_support.py rename to libensemble/tests/scaling_tests/forces/forces_adv/forces_support.py diff --git a/libensemble/tests/scaling_tests/forces/readme.md b/libensemble/tests/scaling_tests/forces/forces_adv/readme.md similarity index 83% rename from libensemble/tests/scaling_tests/forces/readme.md rename to libensemble/tests/scaling_tests/forces/forces_adv/readme.md index e34fbce7d..09eb50c3a 100644 --- a/libensemble/tests/scaling_tests/forces/readme.md +++ b/libensemble/tests/scaling_tests/forces/forces_adv/readme.md @@ -1,25 +1,28 @@ ## Running test run_libe_forces.py -Naive Electostatics Code Test +Naive Electrostatics Code Test -This is designed only as an artificial, highly conifurable test -code for a libEnsemble sim func. +This is a synthetic, highly configurable simulation function. Its primary use +is to test libEnsemble's capability to launch application instances via the `MPIExecutor`. ### Forces Mini-App -A system of charged particles is set up and simulated over a number of time-steps. +A system of charged particles is initialized and simulated over a number of time-steps. -Particles position and charge are initiated by a random stream. +Particles' position and charge are initiated using a random stream. Particles are replicated on all ranks. -**Each rank** computes forces for a subset of particles (O(N^2)) -Particle force arrays are allreduced across ranks. -Particles are moved (replicated on each rank) -Total energy is appended to file forces.stat +**Each rank** computes forces for a subset of particles (`O(N^2)` operations). +Particle force arrays are `allreduced` across ranks. +Particles are moved (replicated on each rank). +Total energy is appended to the forces.stat file. -To run forces as a standalone executable on N procs: +To run forces as a standalone executable on `N` procs: mpirun -np N ./forces.x +This application will need to be compiled on the remote machine where the sim_f will run. +See below. + ### Running with libEnsemble. A random sample of seeds is taken and used as input to the sim func (forces miniapp). diff --git a/libensemble/tests/scaling_tests/forces/run_libe_forces.py b/libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces.py similarity index 93% rename from libensemble/tests/scaling_tests/forces/run_libe_forces.py rename to libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces.py index e2c28f54a..6fa64a2e8 100644 --- a/libensemble/tests/scaling_tests/forces/run_libe_forces.py +++ b/libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces.py @@ -9,8 +9,8 @@ from libensemble.tools import parse_args, save_libE_output, add_unique_random_streams from libensemble import logger from forces_support import test_libe_stats, test_ensemble_dir, check_log_exception +from libensemble.executors.mpi_executor import MPIExecutor -USE_BALSAM = False PERSIS_GEN = False if PERSIS_GEN: @@ -37,17 +37,10 @@ subprocess.check_call(['./build_forces.sh']) -# Create executor and register sim to it. -if USE_BALSAM: - from libensemble.executors.balsam_executor import BalsamMPIExecutor - - exctr = BalsamMPIExecutor() -else: - from libensemble.executors.mpi_executor import MPIExecutor - - exctr = MPIExecutor() +exctr = MPIExecutor() exctr.register_app(full_path=sim_app, app_name='forces') + # Note: Attributes such as kill_rate are to control forces tests, this would not be a typical parameter. # State the objective function, its arguments, output, and necessary parameters (and their sizes) diff --git a/libensemble/tests/scaling_tests/forces/run_libe_forces_from_yaml.py b/libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces_from_yaml.py similarity index 100% rename from libensemble/tests/scaling_tests/forces/run_libe_forces_from_yaml.py rename to libensemble/tests/scaling_tests/forces/forces_adv/run_libe_forces_from_yaml.py diff --git a/libensemble/tests/scaling_tests/forces/summit_submit_mproc.sh b/libensemble/tests/scaling_tests/forces/forces_adv/summit_submit_mproc.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/summit_submit_mproc.sh rename to libensemble/tests/scaling_tests/forces/forces_adv/summit_submit_mproc.sh diff --git a/libensemble/tests/scaling_tests/forces/theta_submit_balsam.sh b/libensemble/tests/scaling_tests/forces/forces_adv/theta_submit_balsam.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/theta_submit_balsam.sh rename to libensemble/tests/scaling_tests/forces/forces_adv/theta_submit_balsam.sh diff --git a/libensemble/tests/scaling_tests/forces/theta_submit_mproc.sh b/libensemble/tests/scaling_tests/forces/forces_adv/theta_submit_mproc.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/theta_submit_mproc.sh rename to libensemble/tests/scaling_tests/forces/forces_adv/theta_submit_mproc.sh diff --git a/libensemble/tests/scaling_tests/forces/build_forces.sh b/libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/build_forces.sh rename to libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh diff --git a/libensemble/tests/scaling_tests/forces/forces.c b/libensemble/tests/scaling_tests/forces/forces_app/forces.c similarity index 98% rename from libensemble/tests/scaling_tests/forces/forces.c rename to libensemble/tests/scaling_tests/forces/forces_app/forces.c index 6c4cb1caf..d0f2ce717 100755 --- a/libensemble/tests/scaling_tests/forces/forces.c +++ b/libensemble/tests/scaling_tests/forces/forces_app/forces.c @@ -248,9 +248,8 @@ int print_step_summary(int step, double total_en, return 0; } -int open_stat_file(num_particles) { - char *statfile; - asprintf(&statfile, "forces%d.stat", num_particles); +int open_stat_file() { + char *statfile = "forces.stat"; stat_fp = fopen(statfile, "w"); if(stat_fp == NULL) { printf("Error opening statfile"); @@ -394,7 +393,7 @@ int main(int argc, char **argv) { fflush(stdout); if (rank == 0) { - open_stat_file(num_particles); + open_stat_file(); } gettimeofday(&tstart, NULL); diff --git a/libensemble/tests/scaling_tests/forces/mini_forces/build_forces.sh b/libensemble/tests/scaling_tests/forces/forces_app/mini_forces/build_forces.sh similarity index 100% rename from libensemble/tests/scaling_tests/forces/mini_forces/build_forces.sh rename to libensemble/tests/scaling_tests/forces/forces_app/mini_forces/build_forces.sh diff --git a/libensemble/tests/scaling_tests/forces/mini_forces/mini_forces.c b/libensemble/tests/scaling_tests/forces/forces_app/mini_forces/mini_forces.c similarity index 100% rename from libensemble/tests/scaling_tests/forces/mini_forces/mini_forces.c rename to libensemble/tests/scaling_tests/forces/forces_app/mini_forces/mini_forces.c diff --git a/libensemble/tests/scaling_tests/forces/mini_forces/mini_forces_AoS.c b/libensemble/tests/scaling_tests/forces/forces_app/mini_forces/mini_forces_AoS.c similarity index 100% rename from libensemble/tests/scaling_tests/forces/mini_forces/mini_forces_AoS.c rename to libensemble/tests/scaling_tests/forces/forces_app/mini_forces/mini_forces_AoS.c diff --git a/examples/tutorials/forces_with_executor/tutorial_forces_simf.py b/libensemble/tests/scaling_tests/forces/forces_simple/forces_simf_simple.py similarity index 96% rename from examples/tutorials/forces_with_executor/tutorial_forces_simf.py rename to libensemble/tests/scaling_tests/forces/forces_simple/forces_simf_simple.py index 960f779fa..a39666074 100644 --- a/examples/tutorials/forces_with_executor/tutorial_forces_simf.py +++ b/libensemble/tests/scaling_tests/forces/forces_simple/forces_simf_simple.py @@ -26,7 +26,7 @@ def run_forces(H, persis_info, sim_specs, libE_info): task.wait(timeout=60) # Stat file to check for bad runs - statfile = "forces{}.stat".format(particles) + statfile = "forces.stat" # Try loading final energy reading, set the sim's status try: diff --git a/examples/tutorials/forces_with_executor/tutorial_run_libe_forces.py b/libensemble/tests/scaling_tests/forces/forces_simple/run_libe_forces_simple.py similarity index 93% rename from examples/tutorials/forces_with_executor/tutorial_run_libe_forces.py rename to libensemble/tests/scaling_tests/forces/forces_simple/run_libe_forces_simple.py index da0b7b00d..d92b69a06 100644 --- a/examples/tutorials/forces_with_executor/tutorial_run_libe_forces.py +++ b/libensemble/tests/scaling_tests/forces/forces_simple/run_libe_forces_simple.py @@ -16,10 +16,10 @@ # Normally would be pre-compiled if not os.path.isfile("forces.x"): - if os.path.isfile("build_forces.sh"): + if os.path.isfile("../forces_app/build_forces.sh"): import subprocess - subprocess.check_call(["./build_forces.sh"]) + subprocess.check_call(["../forces_app/build_forces.sh"]) # Register simulation executable with executor sim_app = os.path.join(os.getcwd(), "forces.x") diff --git a/libensemble/tests/scaling_tests/funcx_forces/cleanup.sh b/libensemble/tests/scaling_tests/forces/funcx_forces/cleanup.sh similarity index 100% rename from libensemble/tests/scaling_tests/funcx_forces/cleanup.sh rename to libensemble/tests/scaling_tests/forces/funcx_forces/cleanup.sh diff --git a/libensemble/tests/scaling_tests/funcx_forces/forces_simf.py b/libensemble/tests/scaling_tests/forces/funcx_forces/forces_simf.py similarity index 100% rename from libensemble/tests/scaling_tests/funcx_forces/forces_simf.py rename to libensemble/tests/scaling_tests/forces/funcx_forces/forces_simf.py diff --git a/libensemble/tests/scaling_tests/funcx_forces/funcx_forces.yaml b/libensemble/tests/scaling_tests/forces/funcx_forces/funcx_forces.yaml similarity index 100% rename from libensemble/tests/scaling_tests/funcx_forces/funcx_forces.yaml rename to libensemble/tests/scaling_tests/forces/funcx_forces/funcx_forces.yaml diff --git a/libensemble/tests/scaling_tests/funcx_forces/readme.md b/libensemble/tests/scaling_tests/forces/funcx_forces/readme.md similarity index 69% rename from libensemble/tests/scaling_tests/funcx_forces/readme.md rename to libensemble/tests/scaling_tests/forces/funcx_forces/readme.md index 6f3f804fe..99fadb98d 100644 --- a/libensemble/tests/scaling_tests/funcx_forces/readme.md +++ b/libensemble/tests/scaling_tests/forces/funcx_forces/readme.md @@ -1,6 +1,6 @@ ## Running test run_libe_forces_funcx.py -Naive Electostatics Code Test +Naive Electrostatics Code Test This is designed only as an artificial, highly configurable test code for a libEnsemble sim func. This variant is primarily to test libEnsemble's @@ -9,20 +9,20 @@ manager and workers are running. ### Forces Mini-App -A system of charged particles is set up and simulated over a number of time-steps. +A system of charged particles is initialized and simulated over a number of time-steps. -Particles position and charge are initiated by a random stream. +Particles' position and charge are initiated using a random stream. Particles are replicated on all ranks. -**Each rank** computes forces for a subset of particles (O(N^2)) -Particle force arrays are allreduced across ranks. -Particles are moved (replicated on each rank) -Total energy is appended to file forces.stat +**Each rank** computes forces for a subset of particles (`O(N^2)` operations). +Particle force arrays are `allreduced` across ranks. +Particles are moved (replicated on each rank). +Total energy is appended to the forces.stat file. -To run forces as a standalone executable on N procs: +To run forces as a standalone executable on `N` procs: mpirun -np N ./forces.x -This application will need to be built on the remote machine where the sims will run. +This application will need to be compiled on the remote machine where the sim_f will run. See below. ### Running with libEnsemble. diff --git a/libensemble/tests/scaling_tests/funcx_forces/run_libe_forces_funcx.py b/libensemble/tests/scaling_tests/forces/funcx_forces/run_libe_forces_funcx.py similarity index 100% rename from libensemble/tests/scaling_tests/funcx_forces/run_libe_forces_funcx.py rename to libensemble/tests/scaling_tests/forces/funcx_forces/run_libe_forces_funcx.py diff --git a/libensemble/tests/scaling_tests/funcx_forces/build_forces.sh b/libensemble/tests/scaling_tests/funcx_forces/build_forces.sh deleted file mode 100755 index 20b106ba4..000000000 --- a/libensemble/tests/scaling_tests/funcx_forces/build_forces.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -# Building flat MPI - -# GCC -mpicc -O3 -o forces.x ../forces/forces.c -lm - -# Intel -# mpiicc -O3 -o forces.x forces.c - -# Cray -# cc -O3 -o forces.x forces.c - -# ---------------------------------------------- - -# Building with OpenMP for CPU - -# GCC -# mpicc -O3 -fopenmp -o forces.x forces.c -lm - -# Intel -# mpiicc -O3 -qopenmp -o forces.x forces.c - -# Cray / Intel (for CCE OpenMP is recognized by default) -# cc -O3 -qopenmp -o forces.x forces.c - -# xl -# xlc_r -O3 -qsmp=omp -o forces.x forces.c - -# ---------------------------------------------- - -# Building with OpenMP for target device (e.g. GPU) -# Need to toggle to OpenMP target directive in forces.c. - -# xl -# xlc_r -O3 -qsmp=omp -qoffload -o forces.x forces.c - -# IRIS node (Intel Gen9 GPU) -# env MPICH_CC=icx mpigcc -g -fiopenmp -fopenmp-targets=spir64 -o forces.x forces.c diff --git a/libensemble/tests/unit_tests/test_executor.py b/libensemble/tests/unit_tests/test_executor.py index 751394508..beb7f2aea 100644 --- a/libensemble/tests/unit_tests/test_executor.py +++ b/libensemble/tests/unit_tests/test_executor.py @@ -67,8 +67,8 @@ def build_simfuncs(): def setup_executor(): """Set up an MPI Executor with sim app""" if USE_BALSAM: - from libensemble.executors.balsam_executor import BalsamMPIExecutor - exctr = BalsamMPIExecutor() + from libensemble.executors.balsam_executor import LegacyBalsamMPIExecutor + exctr = LegacyBalsamMPIExecutor() else: from libensemble.executors.mpi_executor import MPIExecutor exctr = MPIExecutor() @@ -93,8 +93,8 @@ def setup_executor_startups(): def setup_executor_noapp(): """Set up an MPI Executor but do not register application""" if USE_BALSAM: - from libensemble.executors.balsam_executor import BalsamMPIExecutor - exctr = BalsamMPIExecutor() + from libensemble.executors.balsam_executor import LegacyBalsamMPIExecutor + exctr = LegacyBalsamMPIExecutor() else: from libensemble.executors.mpi_executor import MPIExecutor exctr = MPIExecutor()