-
-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start MPI Dynamically from Dask #25
Comments
I do not know how to initialize an MPI environment without starting a new process. Every MPI implementation is different, and so every The closest thing I can think of to launching an MPI job from within python is my personal If starting another process is a non-starter (bad pun?), then the only thing I can think of is tailoring a solution to a particular implementation of MPI, such as MPICH or OpenMPI. |
OK, so lets say we pin ourselves to something like OpenMPI (or whatever is most common). Does this become possible? If so, what is the path to doing this? |
I think it becomes possible, but I think we would need some OpenMPI or MPICH developers to chime in. Maybe an issue here: https://github.com/open-mpi/ompi? |
I don't suggest limiting yourself to only a single MPI implementation. @kmpaul I can't speak for every MPI implementation, but usually It is possible, but ugly, to do this manually. I've done this before in a previous life. It requires using the You end up having to build up your communicator a process at a time. However, you should note that some MPI implementations flat out do not implement these APIs. I believe OpenMPI and Intel MPI both do, but MVAPICH does not. MPI_COMM_JOIN is an additional API that allows you to do this that assumes you have an existing socket connection between your two processes. Last I heard the MPI Forum was trying to deprecate this API, so I definitely don't suggest using this one. https://www.mpi-forum.org/docs/mpi-2.0/mpi-20-html/node115.htm |
@jrhemstad I agree that we shouldn't limit ourselves to a single MPI implementation, but you also point out a problem that finding a solution that works for all implementations may not be possible...easily. (This API is supported in the MPI 3.1 spec, too.) |
I was suggesting that one avoids doing anything implementation specific. The APIs I suggest are part of the MPI standard, they're just so infrequently used that some implementations just neglect to implement them. |
MPI libraries use an interface called Process Management Interface (PMI) to interact with the job launcher. Different MPI libraries use different versions of PMI that are incompatible with one another. There has been efforts for standardization and compatibility but AFAIK this is far from complete. In current state, the job launcher/manager should provide the same PMI version that the MPI library is using. Hence, the tie-in between MPICH MPI library + hydra launcher, OpenMPI library + orte launcher. Slurm provides its own variant of PMI and both MPICH and OpenMPI have configuration option to use PMI interface compatible with SLURM. It is a non-trivial effort for DASK to replace the MPI launcher. It will have to
And it will have redo 1 for each MPI library it has to be compatible with. @jrhemstad, dynamic process model (MPI_Comm_spawn, MPI_Comm_accept, MPI_Comm_connect) are for a way for independent MPI applications to launch, discover and connect with each other. The assumption is that you are in an MPI environment already. So it does not obviate the need for the above, especially if you want to use existing MPI libraries. |
This is not correct. It is certainly possible to launch a process w/o See SO post here: https://stackoverflow.com/questions/15578009/difference-between-running-a-program-with-and-without-mpirun |
@jrhemstad, my bad. I agree with you. at least according to the specification, one should be able to call these from a singleton process which was not launched through a process manger. I am skeptical about the current state of implementation for these in MPI libraries though. Like you said, not all libraries support these APIs and even if they do, probably not on all system configs. Also, this is not the most performant way of initializing MPI. And it can be quite ugly like you point out. |
Given this seems ugly in general, and I hate to say let's create yet another standard, but should we start with OpenMPI and then slowly add other variants as time goes on that follow the same standard? |
Absolutely! It's definitely not going to be as easy as just doing I think a good place to start is to create a simple client/server C++ code that exercises the |
To be clear, the APIs I described are part of the MPI standard that all MPI implementations should implement, it's not specific to OpenMPI. From what I recall when I've played with this in the past, OpenMPI is one implementation that does support these APIs, so it's a good place to start. |
Understood. So should we start with OpenMPI to see if the standard that's already suppose to be there is universally used :) |
mpi4py.futures may be useful for this: https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html |
What's the upfront cost like? Microseconds, milliseconds, seconds?
+1 |
I do not think using MPI Dynamic Process Management (DPM) (accept, connect, etc.) is about just the upfront cost. The DPM features though supported in an MPI library, may not work with all transports/networks. In this case, one would be limited to a lower-performant network (say sockets) for all communication. Something to keep in mind. Also, the support in OpenMPI appears broken right now, see this open bug and comments say it is low on priority: open-mpi/ompi#3458 I prefer the approach of DASK providing a PMI implementation compatible with MPI libraries though it is a much larger effort compared to using DPM. |
Thanks for the input @spotluri ! Some followup questions if I may
Do know have a sense for what this looks like? Can you recommend a good reference here for people to check out?
Dask is more than happy to move around control information. My hope is that this would be done once at startup and then MPI would take over for most communication. Is this the case? For context, Dask communications will have latencies of at least a millisecond, which, as I'm sure you're aware, can be a long time for tightly-coupled MPI computations.
I want to verify here that ideally we should be able to start MPI from the existing dask-worker processes, not from starting new processes as would be done with SLURM/etc.. Not a hard requirement, but definitely a nice-to-have. Is this goal consistent with PMI? |
Ah yes! I remember stumbling across this back when I was originally exploring this stuff ~2 years ago. So my memory of OpenMPI was incorrect. In which case it was Intel MPI where these APIs do work. From what I remember, MVAPICH doesn't support these APIs either (I think you actually get an "We don't support this function!" message if you try and use them). This is indeed problematic because you don't want to pin yourself to a non-free MPI.
These is very dangerous territory. You would be programming to implementation specific details that may or may not be portable across MPI implementations. The "right" solution here is to make use of the APIs in the standard. Unfortunately, support for these APIs is sparse. Therefore, though it may not be fast, the best long-term solution is to complain (loudly) to the various implementations that this feature is important to us, and they are out of compliance with the MPI standard by not supporting these features. |
I agree with @jrhemstad, here. It is unfortunate that the MPI standard is...well...not very standard. However, this is the only appropriate way forward. We need to stick to the MPI specification. ...and for MPI implementations that claim to be open source, perhaps we can suggest (or even go so far as providing) implementations of the missing API. |
A set of slides from an MPI BoF at SC. Link from @jrhemstad |
So based on outside conversation and further investigation, it seems like it is worth the effort of exploring using PMI or PMIx directly. I only recently learned of this effort, but it looks to have wide support among the MPI implementations and contributors that matter. Process start-up is something that is outside of the MPI standard, so I believe that it is fine to explore options outside of the It may be worth having a conversation with the PMIx devs about our use case and how it maps to their design goals. @rhc54 is probably the guy to talk to. |
I think you'll find that the PMIx Groups work has what you need: pmix/pmix-standard#139 The implementation is already in PMIx master branch: https://github.com/pmix/pmix The MPI Sessions WG is exploring it for what you have described - dynamic async construction of MPI communicators. |
@rhc54 Thanks Ralph! We'll take a look. |
For transparency, it appears that NVIDIA MPI folks have started conversations about this topic internally. We'll see what surfaces... |
@mrocklin Out of curiosity, can you say more about this? Are they looking at using the PMIx Group capability, or some other approach? |
Honestly I've stopped tracking the internal chatter, I'll ask someone there to summarize the situation and report up though. |
Restating Dask's problem statement for clarity:
The fundamental issue with the above problem statement is that if we do not want to spawn new processes to do the MPI job, this requires calling However, this is expressly forbidden by the MPI Standard. Some have suggested we just "do it anyways" and exploit implementation specific behavior, but I think this is a non-starter. I had lovely conversation with @rhc54 @jsquyres @hppritcha (and others I don't know the github handles for) where I introduced them to the above problem statement and we had a conversation about how we can solve it. For those of you who were in the call, please correct or clarify anything I've said that doesn't sound right. In short, the result of the conversation is that MPI Sessions and mpi-forum/mpi-issues#103 looks to be our best path forward for a long-term solution. Sessions came about in-part due to frustrations with the single init/finalize requirement. They are targeted for the MPI 4.0 standard, which is slated for early 2020. OpenMPI currently has an experimental prototype implementation of Sessions that we could begin experimenting with in the short-term. I'm not going to try and give a functionally complete summary of MPI Sessions because I am not qualified to do so. However, this is my limited understanding: An MPI session allows you to query the "resource manager" RM (think Dask scheduler here) for a list of "process sets" that can participate in an MPI job. For example, Dask workers You use one of the available set of processes to initialize a Session and then create your MPI group/communicator and use that as you would any other MPI communicator* for your MPI job. You can initialize/finalize a Session many times throughout the lifetime of a process, and MPI Job For OpenMPI, Furthermore, Dask can also be modified (or an add-on) to do all of the work of setting up the communicator from the process set/Session, and that communicator can then just be handed off to whatever C/C++ library that is using MPI. In this way, no modification is necessary in the underlying C/C++ library using MPI**. Finally, I believe the folks on the OpenMPI call that they were excited about having a solid use-case for Sessions and expressed interest in staying in close communication about our efforts here. It was my impression that there could be a fruitful opportunity to collaborate here.
** Similar to *, the libraries API needs to be defined that it just accepts a communicator rather than relying on |
@jrhemstad Thanks for the links on mpi-sessions. Those were useful! Like we already discussed this at several places, there are the following cases that we can consider how to put dask and mpi together. I don't see how mpi-sessions would help us in any of these cases. May be I'm missing something here? Case1a: If we are going with 'dask-first' approach and further decide to reuse dask-worker processes for MPI tasks, we still need to use the traditional 'MPI_Init' anyways to make these processes become MPI-aware, right? If so, how would mpi-sessions help us in this case? Maybe you meant it'll be useful when we want to regroup those processes differently for cudf part and the cuML part? Case1b: If we are going with 'dask-first' approach but dask spawning new MPI processes for the current task (eg: running a cuML-algo), we might as well get its communicator and go ahead, no? Case2: If we are going with 'mpi-first' approach (as in Matt's blog on dask-mpi), we still have to deal with the complexities due to the 2 extra mpi ranks being launched. Even here, we could just create a communicator out of those 'actual' N-2 ranks and use them for the cuML work, right? |
I think @hppritcha would be the best source for MPI Sessions questions. |
Case 1a is what we are ideally targeting with Dask. @rhc54 or @hppritcha correct me if I am wrong, but Sessions help us because we do not have to explicitly call In this way, we do not have the problem of calling See slide 50+ here: https://raw.githubusercontent.com/wiki/mpiwg-sessions/sessions-issues/2016-12-12-webex/2016-12-12-webex.pptx |
@jrhemstad So, IIRC, for Case1a, we want to have the ability to perform separate Init/Finalize calls done once for cudf-part and for the cuml-part. That's where you mention that mpi-sessions can be useful, am I right? @rhc54 @hppritcha In other words, as per Case1a above, we are having processes which are not launched via a "parallel application" mechanism. Can we still use mpi-sessions on them and get these processes to communicate with each other? |
I believe the answer is "yes", but I think @hppritcha and @dholmes-epcc-ed-ac-uk are the right people to address your questions. Unfortunately, the MPI Forum meeting is being held this week and I suspect they won't be able to answer right away. |
Thank you @rhc54 I went ahead and setup a toy-project to simulate this "dask-first" approach. Here's the toy-repo: https://github.com/teju85/dask-first-with-mpi If you run the Singleton MPI from this repo, it basically does the following:
So, what should I be doing here to make these processes get appropriate rank and a world-size of 8? If these are mostly RTFM-worthy questions, please pardon and point me to the relevant docs. Regards, |
The intent of MPI Sessions is to provide a different way to initialise MPI, which does not require calling MPI_INIT. The sequence of calls is:
This code could be put into any function (not just main, e.g. a library or a component or whatever). |
The current reference implementation has been done (by @hjelmn) within Open MPI and relies internally on services provided by PMIx Groups. There are functional unit tests for the proposed new API. The example source codes proposed for addition to the MPI Standard compile and run against this reference implementation. The current status within the MPI Forum is "promising". The MPI Sessions WG has a list of to-do items based on feedback from the 12 hour presentation to the MPI Forum this week. The intent is that this new interface will target MPI-4.0, which is currently slated for release during 2020. A solid use-case, fully implemented and demonstrated would definitely bolster the standardisation effort. The MPI Sessions WG is very interested in collaborating on this. |
For hardy souls, the most recent version of the proposed MPI Standard can be found attached to the MPI Forum issue #103: The direct link to the (unofficial draft, for comment only) PDF document is here: This a long document(!) but the important bits are section 10.4.1 (session initialisation and finalisation), section 6.3.2 (final function definition in that section), section 6.4.2 (final function definition in that section). |
@mrocklin forgot to update this thread. Thanks to the discussions in this thread and several chats, I was able to successfully "convert" dask processes into MPI ranks in my repo here: https://github.com/teju85/dask-first-with-mpi. @cjnolet is also trying to take a look at this for integrating this into RAPIDS workflow. Along with him, if you can take a look at this repo and provide any feedbacks especially from dask usage perspective, that'll help. |
I don't know enough about MPI or the implementation to be able to answer that unfortunately. I suspect that you would know more here than I would @kmpaul :) |
@kmpaul the usage of MPI connect/accept in the toy repo linked by @teju85 looks correct (although suboptimal) to me. I say suboptimal because it requires each non-root process to connect/accept with the root process, so scales linearly with the number of processes. Various other patterns could be used, e.g. recursive doubling, to achieve scaling proportional to the log of the number of processes - if that was important, e.g. for P=1024 processes, (P-1)=1023 steps, whereas log_2(P)=10 steps. For completeness, the MPI Sessions proposal would hope to abstract much of this code away from end-users, but it requires someone do a bit of integration work for MPI+DASK. Ideally, the end-user would use MPI_Session_init (instead of MPI_Init in the current example code), then get the name of the process set that refers to all the DASK worker processes (which is where the integration work comes in), then use MPI_Group_from_pset to create an MPI_Group, then use MPI_Comm_from_group to create a communicator (with comm_size = number of dask workers, and appropriate comm-rank in range [0,P-1]). The necessary integration work would enable MPI to retrieve process set names from DASK for presentation to MPI users and/or to understand process set names given by MPI users that refer to sets of DASK processes. The proposal requires that process names be strings, formatted like URIs. The MPI_Comm_from_group function requires a unique string tag that differentiates between concurrent communicator creation operations (similar to the role of service name in MPI_Publish_name) - it is suggested to use reverse dot notation to achieve uniqueness. So, it is hoped that, once MPI Sessions is accepted and the implementation/integration work is done, the cpp code could look more like this:
|
In case this is still of interest, the MPI Sessions proposal was accepted into the MPI Standard and appears in the MPI-4.0 version of the MPI Standard (downloadable from https://www.mpi-forum.org/docs/). The API syntax and operational semantics are broadly the same as described in this thread. |
@Wee-Free-Scot! That's very cool! Do you know if this has been implemented in MPICH or OpenMPI? |
There is a reference implementation in Open MPI but it is not yet production code. I expect it will a matter of (a short amount of) time until all major/popular MPI libraries have implemented this functionality, in order that they can claim compliance with MPI-4.0 |
👍 Then we'll wait some more. Seems like this is the best solution. |
Just dropping into this thread as end-user who'd be really interested in taking advantage of a feature like this. MPICH supports MPI-4.0 from v4.0 and subset of MPI-4.0 is implemented in v5.0.x of OpenMPI, but is still in-progress. |
Update for 2024 🤯: Sessions is now fully supported in OpenMPI as of 5.0.0, and has had subsequent bugs squashed in later releases it seems. 5.0.3 is currently the latest release. Sorry for otherwise being unhelpful here, but I'm very interesting in following the development of this capability since I have a bunch of use-cases that would be awesome to enable with this. |
I would like to explore the possibility of Dask starting MPI. This is sort of the reverse behavior of what the dask-mpi package does today.
To clarify the situation I'm talking about, consider the situation where we are running Dask with some other system like Kubernetes, Yarn, or SLURM, and are doing Dask's normal dynamic computations. We scale up and down, load data from disk, do some preprocessing. Then, we want to run some MPI code on the data we have in memory in the Dask worker processes. We don't currently have an MPI world set up (our workers were not started with
mpirun
or anything) but would like to create one. Is this possible?To do this, Dask will have to go through whatever process
mpirun
/mpiexec
goes through to set up an MPI communicator. What is this process?Ideally it would be able to do this without launching new processes. Ideally the same processes currently running the Dask workers would initialize some MPI code, be told about each other, then run some MPI program, then shut down MPI and continue on with normal Dask work.
We'll need to become careful about what to do if a worker goes away during this process. We'll probably have to restart the MPI job, which will be fine. I think that I can handle that on the scheduling/resiliency side.
I think that the people who know the answer to these questions will be people who have experience not only in using MPI, but also in deploying it.
The text was updated successfully, but these errors were encountered: