Executor with Electrostatic Forces
----------------------------------------

This tutorial highlights libEnsemble's capability to execute
and monitor external scripts or user applications within simulation or generator
functions using a libEnsemble Executor. In this tutorial,
our calling script registers a compiled executable that simulates
electrostatic forces between a collection of particles. The simulator function
launches instances of this executable and reads output files to determine
if the run was successful.

It is possible to use ``subprocess`` calls from Python to issue
commands such as ``jsrun`` or ``aprun`` to run applications. Unfortunately,
hard-coding such commands within user scripts isn't portable.
Furthermore, many systems like Argonne's Theta do not
allow libEnsemble to submit additional tasks from the compute nodes. On these
systems, a proxy launch mechanism (such as Balsam) is required.
libEnsemble's Executors were developed to directly address such issues.

In particular, libEnsemble's MPI Executor can automatically
detect available MPI runners and resources, and by default divide them equally among workers.

Getting Started
------------------

**An MPI distribution and ``mpi4py`` are required to use this notebook locally**.

The simulation source code ``forces.c`` can be obtained directly from the
libEnsemble repository.

Assuming MPI and its C compiler ``mpicc`` are available, obtain
``forces.c`` and compile it into an executable (``forces.x``) with:

In [None]:
import subprocess
import requests

url = 'https://raw.githubusercontent.com/Libensemble/libensemble/main/libensemble/tests/scaling_tests/forces/forces.c'
forces = requests.get(url)
open('./forces.c', 'wb').write(forces.content)

subprocess.run('mpicc -O3 -o forces.x forces.c -lm'.split())

Calling Script
----------------

__*Note: Several of these stand-alone code-cells may not execute properly until each necessary component is defined. The complete libEnsemble routine should still function as expected.*__

Let's begin by writing our calling script to parameterize our simulation and
generation functions and call libEnsemble. Create a Python file containing:

In [None]:
#!/usr/bin/env python
import os
import numpy as np
from forces_simf import run_forces  # Sim func from current dir

from libensemble.libE import libE
from libensemble.gen_funcs.sampling import uniform_random_sample
from libensemble.tools import parse_args, add_unique_random_streams
from libensemble.executors import MPIExecutor

# Parse number of workers, comms type, etc. from arguments
nworkers, is_manager, libE_specs, _ = parse_args()

# Initialize MPI Executor instance
exctr = MPIExecutor()

# Register simulation executable with executor
sim_app = os.path.join(os.getcwd(), "forces.x")
exctr.register_app(full_path=sim_app, app_name="forces")

On line 15, we instantiate our ``MPIExecutor`` class instance,
which can optionally be customized by specifying alternative MPI runners. The
auto-detected default should be sufficient.

Registering an application is as easy as providing the full file-path and giving
it a memorable name. This Executor instance will later be retrieved within our
simulation function to launch the registered app.

Next define the ``sim_specs`` and ``gen_specs`` data structures. Recall that these
are used to specify to libEnsemble what user functions and input/output fields to
expect, and also to parameterize function instances without hard-coding:

In [None]:
# State the sim_f, inputs, outputs
sim_specs = {
    "sim_f": run_forces,  # sim_f, imported above
    "in": ["x"],  # Name of input for sim_f
    "out": [("energy", float)],  # Name, type of output from sim_f
}

# State the gen_f, inputs, outputs, additional parameters
gen_specs = {
    "gen_f": uniform_random_sample,  # Generator function
    "in": ["sim_id"],  # Generator input
    "out": [("x", float, (1,))],  # Name, type, and size of data from gen_f
    "user": {
        "lb": np.array([1000]),  # User parameters for the gen_f
        "ub": np.array([3000]),
        "gen_batch_size": 8,
    },
}

Our generation function will generate random numbers of particles (between
the ``"lb"`` and ``"ub"`` bounds) for our simulation function to evaluate via our
registered application.

The following additional ``libE_specs`` setting instructs libEnsemble's workers
to each create and work within a separate directory each time they call a simulation
function. This helps organize output and also helps prevents workers from overwriting
previous results:

In [None]:
# Create and work inside separate per-simulation directories
libE_specs['sim_dirs_make'] = True

After configuring ``persis_info`` and
``exit_criteria``, we initialize libEnsemble
by calling the primary ``libE()`` routine:

In [None]:
# Instruct libEnsemble to exit after this many simulations
exit_criteria = {"sim_max": 8}

# Seed random streams for each worker, particularly for gen_f
persis_info = add_unique_random_streams({}, nworkers + 1)

# Launch libEnsemble
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info=persis_info, libE_specs=libE_specs)

Exercise
----------

This may take some additional browsing of the docs to complete.

Write an alternative Calling Script similar to above, but with the following differences:

 1. Add an additional worker directory setting so workers operate in ``/scratch/ensemble`` instead of the default current working directory.
 2. Override the MPIExecutor's detected MPI runner with ``'openmpi'``.
 3. Set libEnsemble's logger to print debug messages.
 4. Use the ``save_libE_output()`` function to save the History array and ``persis_info`` to files after libEnsemble completes.

In [None]:
#!/usr/bin/env python
import os
import numpy as np
from tutorial_forces import run_forces  # Sim func from current dir

from libensemble import logger
from libensemble.libE import libE
from libensemble.gen_funcs.sampling import uniform_random_sample
from libensemble.tools import parse_args, add_unique_random_streams, save_libE_output
from libensemble.executors import MPIExecutor

# Parse number of workers, comms type, etc. from arguments
nworkers, is_manager, libE_specs, _ = parse_args()

# Adjust logger level
logger.set_level('DEBUG')

# Initialize MPI Executor instance
exctr = MPIExecutor(custom_info={'mpi_runner': 'openmpi'})

...

# Instruct workers to operate somewhere else on the filesystem
libE_specs['ensemble_dir_path'] = "/scratch/ensemble"

...

# Launch libEnsemble
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info=persis_info, libE_specs=libE_specs)

if is_manager:
    save_libE_output(H, persis_info, __file__, nworkers)

In [None]:
#!/usr/bin/env python
# Write your solution below...

Simulation Function
-------------------

Our simulation function is where we'll use libEnsemble's executor to configure and submit
our application for execution. We'll poll this task's state while
it runs, and once we've detected it has finished we'll send any results or
exit statuses back to the manager.

Create another Python file named ``forces_simf.py`` containing the following
for starters:

In [None]:
import numpy as np

# To retrieve our MPI Executor instance
from libensemble.executors.executor import Executor

# Optional status codes to display in libE_stats.txt for each gen or sim
from libensemble.message_numbers import WORKER_DONE, TASK_FAILED


def run_forces(H, persis_info, sim_specs, libE_info):
    calc_status = 0

    # Parse out num particles, from generator function
    particles = str(int(H["x"][0][0]))

    # num particles, timesteps, also using num particles as seed
    args = particles + " " + str(10) + " " + particles

    # Retrieve our MPI Executor instance
    exctr = Executor.executor

    # Submit our forces app for execution
    task = exctr.submit(app_name="forces", app_args=args)

    # Block until the task finishes
    task.wait(timeout=60)

We retrieve the generated number of particles from ``H`` and construct
an argument string for our launched application. We retrieved our
previously instantiated Executor instance from the class definition,
where it was automatically stored as an attribute.

After submitting the "forces" app for execution,
a ``Task`` object is returned that correlates with the launched app.
This object is roughly equivalent to a Python future, and can be polled, killed,
and evaluated in a variety of helpful ways. For now, we're satisfied with waiting
for the task to complete via ``task.wait()``.

We can assume that afterward, any results are now available to parse. Our application
produces a ``forces[particles].stat`` file that contains either energy
computations for every time-step or a "kill" message if particles were lost, which
indicates a failed simulation.

To complete our simulation function, parse the last energy value from the output file into
a local output History array, and if successful,
set the simulation function's exit status ``calc_status``
to ``WORKER_DONE``. Otherwise, send back ``NAN`` and a ``TASK_FAILED`` status:

In [None]:
# Stat file to check for bad runs
statfile = "forces{}.stat".format(particles)

# Try loading final energy reading, set the sim's status
try:
    data = np.loadtxt(statfile)
    final_energy = data[-1]
    calc_status = WORKER_DONE
except Exception:
    final_energy = np.nan
    calc_status = TASK_FAILED

# Define our output array,  populate with energy reading
outspecs = sim_specs["out"]
output = np.zeros(1, dtype=outspecs)
output["energy"][0] = final_energy

# Return final information to worker, for reporting to manager
return output, persis_info, calc_status

``calc_status`` will be displayed in the ``libE_stats.txt`` log file.

That's it! As can be seen, with libEnsemble, it's relatively easy to get started
with launching applications. Behind the scenes, libEnsemble evaluates default
MPI runners and available resources and divides them among the workers.

This completes our calling script and simulation function. Run this routine by running the calling script blocks once the simulation function blocks have been defined.

This may take up to a minute to complete. Output files---including ``forces.stat``
and files containing ``stdout`` and ``stderr`` content for each task---should
appear in the current working directory. Overall workflow information
should appear in ``libE_stats.txt`` and ``ensemble.log`` as usual.

For example, my ``libE_stats.txt`` resembled::

Where ``status`` is set based on the simulation function's returned ``calc_status``.

My ``ensemble.log`` (on a ten-core laptop) resembled::

Note again that the ten cores were divided equally among two workers.

That concludes this tutorial.
Each of these example files can be found in the repository in ``examples/tutorials/forces_with_executor``

For further experimentation, we recommend trying out this libEnsemble tutorial
workflow on a cluster or multi-node system, since libEnsemble can also manage
those resources and is developed to coordinate computations at huge scales.
Please feel free to contact us or open an issue on GitHub if this tutorial
workflow doesn't work properly on your cluster or other compute resource.

Exercises
-----------

These may require additional browsing of the documentation to complete.

  1. Adjust ``submit()`` to launch onto two nodes, with eight processes per node.
  2. Adjust ``submit()`` again so the app's ``stdout`` and ``stderr`` are written to ``stdout.txt`` and ``stderr.txt`` respectively.
  3. Construct a ``while not task.finished:`` loop that periodically sleeps for one second, calls ``task.poll()``,
     then reads the output ``.stat`` file, and calls ``task.kill()`` if the output file contains ``"kill\n"``
     or if ``task.runtime`` exceeds sixty seconds.

In [None]:
    import time
    ...
    task = exctr.submit(app_name="forces", app_args=args, wait_on_start=True,
                        num_nodes=2, procs_per_node=8, stdout="stdout.txt", stderr="stderr.txt")

    while not task.finished:
      time.sleep(1)
      task.poll()

      with open(statfile, 'r') as f:
        if "kill\n" in f.readlines():
          task.kill()

      if task.runtime > 60:
        task.kill()

    ...

In [None]:
#!/usr/bin/env python
# Write your solution below...