Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Could dask-mpi run the client script too? #2402

Closed
kmpaul opened this issue Dec 6, 2018 · 14 comments

Comments

@kmpaul
Copy link

@kmpaul kmpaul commented Dec 6, 2018

I've been dealing with an issue that...well, I was convinced shouldn't be an issue, so I never said anything about it until dask/dask-blog#5. And after a discussion with @guillaumeeb, I was convinced that maybe I'm not as crazy (or as ill-informed) as I thought I was. So, here's the issue...

I've been trying to figure out a way of launching the Dask Scheduler, Workers, and the Client script in the same MPI environment. Currently, the way dask-mpi works is that the Scheduler and the Workers are started, and you separately connect your client (in your separate script) to the Scheduler via, for example, the scheduler.json file.

I discussed with @guillaumeeb one approach that should work, something like the following:

# [PBS header info requesting N MPI processes]

mpirun -np N dask-mpi [dask-mpi options] &
python my_dask_script.py

However, this launches Scheduler/Worker processes on all N allocated MPI processes, and then the python my_dask_script.py process could, potentially, run on the same process as the Scheduler, for example. If you have a compute-intensive client script, this could be problematic.

What I was originally hoping for was a solution that allowed something more like this:

# [PBS header info requesting N MPI processes]

mpirun -np N dask-mpi [dask-mpi options] --script my_dask_script

But after thinking about it for a while, I found that what I really wanted was something that worked like this:

# [PBS header info requesting N MPI processes]

mpirun -np N python my_dask_mpi_script.py

where the my_dask_mpi_script.py has something like an import dask-mpi line that does the following:

  1. let's MPI rank 0 pass through,
  2. launches the Scheduler on MPI rank 1 and runs an IOLoop until the rank 0 process is complete,
  3. launches the Workers on MPI ranks >1, which also run until rank 0 process is complete.

At this point, I feel like I could write this myself...except that I don't know how to implement the "run the IOLoop until rank 0 process is complete" part.

Any thoughts? Are there different solutions? Would you recommend something different?

@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 6, 2018

CCing @andersy005

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 7, 2018

Yes, something like this sounds like a great idea to me. I agree entirely with your design.

At this point, I feel like I could write this myself...except that I don't know how to implement the "run the IOLoop until rank 0 process is complete" part.

Yeah, that's not entirely trivial. In principle we want to do something like the following:

if rank == 0:
    client = Client('SCHEDULER_ADDRESS')

    <user's code>

    client.sync(client.scheduler.terminate)

Alternatively, @jacobtomlinson proposed another solution in #2346 where the scheduler would terminate automatically after 60s if no clients were connected. This was originally designed to clean up orphaned clusters, but could solve this problem as well.

@guillaumeeb

This comment has been minimized.

Copy link
Member

@guillaumeeb guillaumeeb commented Dec 11, 2018

As discussed with him, improving dask-mpi with what @kmpaul proposes seems really important.

That being said, I'm not sure of what design to follow. I see the point of the second proposed design, but I think we should clarify

the my_dask_mpi_script.py has something like an import dask-mpi line

first.

Should the use script extend a dask-mpi class and implement a method? Should it only call something at the beginning of its main function?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 12, 2018

the my_dask_mpi_script.py has something like an import dask-mpi line

So ...

# User script

import dask_mpi
dask_mpi.initialize()

# client code continues
from dask.distributed import Client
client = Client()  # grabs address from dask.config.get('scheduler-address') automatically
# Dask-mpi import file

... MPI Prelude

def start():
    if rank == 0:
        # start scheduler
        # wait until scheduler is finished
        sys.exit()
    elif rank == 1:
        return # pass on to the client code coming next
    else:
        # start worker
        # wait until worker is finished
        sys.exit()

Then we execute with something like

mpirun -n 100 myscript.py

Alternatively

We could add a --script keyword to the current dask-mpi application

# dask-mpi.py

... MPI Prelude

if rank == 0:
    # start scheduler
    # wait for scheduler to finish
elif script and rank == 1:
    SCHEDULER_ADDRESS = get_scheduler_address(scheduler_file) # search for scheduler_file in client.py
    with dask.config.set(scheduler_address=SCHEDULER_ADDRESS):
        importlib.import(script)
else:
    # start worker
    # wait for worker to finish
@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 12, 2018

I like the first pattern. I think it will make more sense to "traditional" MPI users. I wonder how you would feel about dropping the initialize() method, though. It could be initialized at import-time, but I don't know if its a good design. I like the "at import-time" initialization, in part, because if the script is running with mpirun, then all of the code is running on multiple MPI ranks from the start. And, while I know that the import dask_mpi line could be anywhere, it is canonically at the top of the script. Does that make sense?

I might have time (and I definitely have interest) in doing this today. I think the missing piece that @mrocklin provided for me was the client.sync call when shutting down. So, the question I have from you all is how should this be packaged? Should it be a PR to distributed or should I make a new repo?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 12, 2018

I wouldn't mind pulling dask-mpi out of the distributed codebase if you're willing to make a new repository.

My guess is that initialization at import time might be difficult. For example from dask_mpi.utils import foo will trigger the full MPI initialization process. If it's easy to get around this though then I agree that that would be great.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 12, 2018

We could also import dask_mpi.initialize

@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 12, 2018

Oh! I like that, idea. We could make dask_mpi a package. That's a good idea. That would mean we could still do the dask_mpi.initialize()-like call from a different function that does import dask_mpi.initialize. Obviously, that's a namespace collision, but we could easily rename the initialize module...something like init...?

I'd be happy to make a new repository. If it's going to be maintained in the long term, I'll make it an NCAR repo.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 12, 2018

Any objection to eventually moving it to the dask github org? You and other NCAR devs would still have ownership rights over the repository.

@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 12, 2018

No objections at all. I just misunderstood what you meant by "pulling [it] out of the distributed codebase." It makes sense for it to be in the dask GitHub org, and I'd rather it be there.

@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 12, 2018

I think something would need to happen for me to have permissions to create a repo in the dask GitHub org, though.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 12, 2018

I've made https://github.com/dask/dask-mpi and given @kmpaul @guillaumeeb and @andersy005 write permissions.

@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 12, 2018

Excellent! Thanks, @mrocklin. I've already created the first issue.

@kmpaul

This comment has been minimized.

Copy link
Author

@kmpaul kmpaul commented Dec 27, 2018

This has now been completed in https://github.com/dask/dask-mpi with dask/dask-mpi#6. The PR implements the "functional initialization" enhancement and the "pulling dask-mpi out of the [distributed] codebase" request.

I will leave it to other dask developers to remove the dask-mpi code from distributed as they see fit.

@kmpaul kmpaul closed this Dec 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.