Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an MPIExecutor #3423

Merged
merged 18 commits into from
May 17, 2024
Merged

Add an MPIExecutor #3423

merged 18 commits into from
May 17, 2024

Conversation

WardLT
Copy link
Contributor

@WardLT WardLT commented May 8, 2024

Description

Add MPIExecutor -- a wrapper class over HTEx which fixes or removes options irrelevant when enable_mpi_mode=True.

To do list:

  • Gather consensus on what to call "max_workers"? I've started with max_tasks, but might change that to max_tasks_per_block.
  • Write a test case ensuring the docstring and init function are not out of date (mechanism for ensuring changes to HTEx don't break MPIEx)
  • Write a test case showing MPIEx works as intended
  • Update Parsl documentation with new example

Also, whoops, I included a commit from #3418. LMK if I should bother rebasing

Changed Behaviour

No behavior was changed, only added

Fixes

N/A

Type of change

Choose which options apply, and delete the ones which do not apply.

  • New feature
  • Update to human readable text: Documentation/error messages/comments
  • Code maintenance/cleanup

WardLT added 3 commits May 7, 2024 16:51
SingleNodeLauncher creates too many workers when running in MPI mode
@Andrew-S-Rosen
Copy link
Contributor

Andrew-S-Rosen commented May 8, 2024

Gather consensus on what to call "max_workers"? I've started with max_tasks, but might change that to max_tasks_per_block.

I'm going to think out loud here. With the HighThroughputExecutor, I usually set max_workers_per_node=nodes_per_app * nodes_per_block [NB: this used to be max_workers before the rename to max_workers_per_node, which only complicated matters.].

So, let's consider the following scenario: 5 Slurm allocations (blocks), each reserving 4 nodes, and I want each task (App) to run via MPI over 2 nodes in the same block. This means I'd usually set max_workers_per_node to 2*4=8.

Now let's consider what happens with the same scenario but using the MPIExecutor where we call max_workers_per_node by the name max_tasks_per_block instead. The parameter max_tasks_per_block=8 doesn't seem to check out because I have 4 nodes on a block, but only 2 (not 8) tasks are running on that block.

Is there a mistake somewhere in my logic? My gut feeling is that the confusion is coming in from the max_worker_per_node rename. If I understand correctly, the change from max_workers to max_workers_per_node in HPCExecutor was just a name change and nothing more, even though in our use case it does not make sense to be thought of as "per node". Alternatively, I could just be wrong, although I thought I had tested this out.


For full transparency, here is my "old" way in this hypothetical example:

import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SimpleLauncher
from parsl.providers import SlurmProvider

max_blocks = 5
nodes_per_block = 4
nodes_per_app = 2
cores_per_node = 128

config = Config(
    strategy="htex_auto_scale",
    executors=[
        HighThroughputExecutor(
            label="test",
            max_workers=nodes_per_app * nodes_per_block,  # or, equivalently, `max_workers_per_node`
            cores_per_worker=1e-6,
            provider=SlurmProvider(
                account="MyAccountName",
                qos="debug",
                constraint="cpu",
                worker_init=f"module load vasp/6.4.1-cpu && export ASE_VASP_COMMAND='srun -N {nodes_per_app} --ntasks-per-node={cores_per_node} --cpu_bind=cores'",
                walltime="00:10:00",
                nodes_per_block=nodes_per_block,
                init_blocks=0,
                max_blocks=max_blocks,
                launcher=SimpleLauncher(),
            ),
        )
    ],
)

parsl.load(config)

@WardLT
Copy link
Contributor Author

WardLT commented May 9, 2024

My understanding is that the max_workers_per_node in your example should be nodes_per_block // nodes_per_app and not nodes_per_block * nodes_per_app.

The weird nomenclature is that, normally, each node of each block gets one worker pool and each pool gets up to max_workers_per_node.

Only one pool gets launched per block in MPI mode (regardless of node count), which means the total number of workers available to launch equals max_workers_per_node.

Does that clarify? If so, maybe I rename max_workers_per_node to max_workers_per_block to avoid introducing another term?

@Andrew-S-Rosen
Copy link
Contributor

Andrew-S-Rosen commented May 9, 2024

Thanks @WardLT! That makes perfect sense. I tested things out with HighThroughputExecutor, and it looks like I was misled by what was being printed in the VASP OUTCAR. This is even more justification for the need to have a dedicated MPIExecutor. 😉

As for the original question about the name, I think adding a _per qualifier is worthwhile for clarity's sake. Using max_tasks_per_block seems wise then.

Comment on lines +39 to +40
heartbeat_threshold: int = 120,
heartbeat_period: int = 30,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to enumerate all of base class' (HighThroughputExecutor) keyword arguments if we aren't modifying the defaults? My concern is if we update the base class, then we might forget to update MPIExecutor. It seems like a safer approach might be to only explicitly have new and updated keyword arguments and then pass any remaining **kwargs to the base class. Thoughts?

Copy link
Contributor Author

@WardLT WardLT May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using **kwargs was my first thought too, but I wanted to try enumerating them to give better IDE support.

I made a unit test that will fail if the signature of the base class changes. Does that seem sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me!

(I really wish IDEs supported better inspection of classes...)

@benclifford
Copy link
Collaborator

no brainpower for this for the next day or so but will give some more in-depth opinions after that

encrypted: bool = False):
super().__init__(
# Hard-coded settings
cores_per_worker=1e-9, # Ensures there will always be enough workers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not always - there's a number of workers where this will no longer work - because this is not 1/∞, just an approximation of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone >1 billion MPI tasks in a single block, I'd be quite curious to know how they did it. Maybe I'll change to "there will be an absurd number of workers?"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, isn't the edge case 1/max_tasks here? Since we know that there will never be more than max_tasks workers? It might be cleaner that way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right something like that, maybe will some other factors in there too.

The correct solution IMO is (separately) to make cores per worker be an optional limit, rather than always taking part in size computations. That way you can choose to not specify it at all, rather than "I won't change this piece of code, so I have to write the other piece of code to trick the first piece of code".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the original assumption for this parameter is "of course a task needs cores on the manager node to execute" but that's not how htex-mpi does things)

@benclifford
Copy link
Collaborator

for the above max_workers_per_* discussion, more generally I'd like to push the internals of htex to talk about "per_pool" when it's relevant, although that doesn't necessarily need to be reflected in the user facing parameters that this PR is dealing with. but as the above discussion shows "per node" and "per block" are internally not going to be the right words to use.

@benclifford benclifford reopened this May 10, 2024
@WardLT
Copy link
Contributor Author

WardLT commented May 13, 2024

I added a first draft of the MPI Executor docs

@Andrew-S-Rosen and @tomdemeyere , you are a good target audience. How does it seem to you? https://parsl--3423.org.readthedocs.build/en/3423/userguide/mpi_apps.html

@WardLT WardLT marked this pull request as ready for review May 13, 2024 18:59
@tomdemeyere
Copy link

Nice! I am excited to try it with the modified MPI mode in #3418 but even without it, this seems much simpler and accessible.

I see everything that I would need to launch MPI apps, would this work with max_block > 1 ?

@benclifford
Copy link
Collaborator

benclifford commented May 13, 2024

@tomdemeyere there's not enough awareness of tasks having different resource requirements (number of ranks for each task) in the interchange, which dispatches tasks to (in the htex-mpi case) different blocks. So you might end up with some tasks queued in one block due to insufficient resources while another block has capacity.

That awareness was one of the targets of this project which was ultimately not funded: #3323

Copy link
Contributor

@Andrew-S-Rosen Andrew-S-Rosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WardLT: Awesome!! This is looking great. Some minor corrections below :)

HTEX and MPI Tasks
------------------
1. Set ``max_workers_per_block`` to maximum number of tasks to run per block of compute nodes.
This value is typically the number of nodes per task divided by the number of nodes per task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is typically the number of nodes per task divided by the number of nodes per task.

Wow, @WardLT, fanciest way to say "1" ever. 🤣 (Clearly a typo.)

------------------
1. Set ``max_workers_per_block`` to maximum number of tasks to run per block of compute nodes.
This value is typically the number of nodes per task divided by the number of nodes per task.
2. Set ``mpi_launcher`` to launcher used for your application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small typo: "to [the] launcher"


Create the app by first defining a function which includes ``parsl_resource_specification`` keyword argument.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a hyperlink here to what parsl_resource_specification is because the uninitiated reader likely has no clue.



Next we define the CosmicTagger MPI application. TODO: Ask Khalid for help.
@bash_app
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a @python_app but should be😅

print(f"Stdout : {future.stdout}")
print(f"Stderr : {future.stderr}")
futures[(num_nodes, batchsize)] = future
# Resources in terms of nodes and how ranks are to be distributed are set on a per app
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always need a parsel_ressource_specification? I'm just thinking back to the approach used in your older videos where we never used such a thing with the HighThroughputExecutor. If it's omitted, I assume the implicit assumption is that all tasks have the same resource requirements but it will still function?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has to be some default made in that case. In the workqueue case, for example, ommitting a core count (or memory specification) for a task will allocate one entire worker node (whatever the relevant worker node happens to have on it).

That is one obvious default. The other is that you get 1 rank.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you do always need one and - at the moment (see #3427) - Parsl will crash without it being provided as a keyword argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. This is not your problem to deal with so to speak, but this just makes it even more awkward that parsl_resource_specification is not a keyword argument of the decorator itself. Having to redefine all of your functions to take this (hidden) keyword argument just to parallelize things is a bit hacky and counter-intuitive.

Comment on lines 106 to 108
"num_nodes" = 2,
"ranks_per_node" = 2,
"num_ranks" = 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably these should be : instead of =.


Advanced: More Environment Variables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very selfish question unrelated to this specific line:

I am usually running VASP (via ASE) on the GPU nodes of Perlmutter. I assume one is still using a SimpleLauncher() here since it's an "internal" call to srun?

For context, a Slurm script usually looks something like the following:

#!/bin/bash
#SBATCH -N 1
#SBATCH -C gpu
#SBATCH -G 4
#SBATCH -q debug
#SBATCH -J vasp_job
#SBATCH -t 00:30:00
#SBATCH -A matgen_g

module load vasp/6.4.1-gpu
export OMP_NUM_THREADS=8
export OMP_PLACES=threads
export OMP_PROC_BIND=spread
export ASE_VASP_COMMAND="srun -n 4 -c 32 --cpu_bind=cores -G 4 --gpu-bind=none vasp_std"

python run_my_jobs.py

where run_my_jobs.py is some arbitrary program that in it launches a bunch of @python_apps representing one VASP task per App.

It's also a bit awkward here because the concept of ranks is mostly obscured...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this application is possible by replacing the parts of the VASP command definition which deal with the total ranks to \$PARSL_MPI_PREFIX. Escaping the $ will (hopefully!) result in the environment variable not being resolved with ASE_VASP_COMMAND is defined but instead when the contents ASE_VASP_COMMAND is used to start a subprocess somewhere deep in the bowels of ASE.

export ASE_VASP_COMMAND="\$PARSL_MPI_PREFIX -c 32 --cpu_bind=cores -G 4 --gpu-bind=none vasp_std"

That said, I'm not sure if -G is the "total" number of GPUs or the number per node. You might to switch to --gpus-per-node instead of -G.

Woof, that wasn't great.
@WardLT
Copy link
Contributor Author

WardLT commented May 14, 2024

Alright. I've addressed Andrew's comments and fixed a bunch of other grammar issues.

I think this is ready for review.

@@ -208,6 +208,8 @@ def get_result(self, block: bool, timeout: float):
"""Return result and relinquish provisioned nodes"""
result_pkl = self.pending_result_q.get(block, timeout=timeout)
result_dict = pickle.loads(result_pkl)
# TODO (wardlt): If the task did not request nodes, it won't be in `self._map_tasks_to_nodes`. Causes Parsl to hang
# See Issue #3427
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this message could be put as something more machine-readable, rather than a comment:

assert task_in in self._map_tasks_to_nodes, "You are about to experience issue #3427"

that might at least get some message somewhere at runtime (some stderr somewhere?), as well as making the message visible in source code like it is now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I should have done that. Was leery to touch code, but it's already broken 🤷🏼

"ranks_per_node": 2,
})
env_vars = future.result()
assert env_vars['PARSL_NUM_RANKS'] == '4'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mathematician in me would use two different prime numbers here, not both 2, to get some difference in the values of the parameters (eg so that you're additionally testing that your aren't being allocated num_nodes * num_nodes ranks). then CS people can be forever confused why you're using not using a power of two...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using primes is a nice trick

executors=[
MPIExecutor(
address=address_by_interface('bond0'),
max_workers_per_block=2, # Assuming 2 nodes per task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's that use of 2 twice for different meanings I mentioend elsewhere!

@WardLT
Copy link
Contributor Author

WardLT commented May 15, 2024

Thanks, @benclifford ! I think I've addressed each of your points.

@WardLT WardLT requested a review from benclifford May 16, 2024 02:58
@benclifford benclifford merged commit b214714 into Parsl:master May 17, 2024
6 checks passed
to spawn multi-node tasks.

Specify the maximum number of multi-node tasks to run at once using ``max_workers_per_block``.
The maximum number should be smaller than the ``nodes_per_block`` in the Provider.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WardLT: Just looking at this again. Should it be "The value should be less than or equal to the nodes_per_block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be less than or equal to

Andrew-S-Rosen added a commit to Andrew-S-Rosen/parsl that referenced this pull request May 22, 2024
As noted in Parsl#3423 (comment), the docstring for `MPIExecutor` said `max_workers_per_block` should be less than `nodes_per_block` when in reality it should be less than or equal to.

CC @WardLT.
benclifford pushed a commit that referenced this pull request May 23, 2024
As noted in #3423 (comment), the docstring for `MPIExecutor` said `max_workers_per_block` should be less than `nodes_per_block` when in reality it should be less than or equal to.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants