Skip to content

Conversation

lastephey
Copy link
Contributor

This PR addresses the problem described in @rcthomas issue #45

I think there was some confusion in this issue-- at NERSC our idea at the moment is to recommend that our users start Dask via the dask-mpi CLI (which never touches core.py, as far as we can tell.)

This PR implements the same logic in cli.py that already exists in core.py. It first starts the scheduler and then broadcasts the scheduler address to the workers before they can start. This behavior prevents the workers from reading an old scheduler.json file (which may have been left behind after a non-clean exit) and then hanging while they try to connect to a scheduler which does not exist.

@lastephey
Copy link
Contributor Author

Clearly I should have tested this locally first. :) I'll get a test setup going to fix these errors.

@mrocklin
Copy link
Member

Hi @lastephey ! Thanks for doing this. In principle this looks good to me, and seems to resolve the problem.

I suspect that @kmpaul has thoughts though, and he may also be able to shed some light on the CI failures. I think that he only works M-Th though, so we may not hear back from him for the next day or two.

@lastephey
Copy link
Contributor Author

Hi @mrocklin , thanks for the info! I'll keep thinking about this and hopefully @kmpaul has some insight. One issue is that I don't have a great setup for testing-- we can't use mpirun on Cori. Sorry for the noise from using your web CI for troubleshooting.

@mrocklin
Copy link
Member

mrocklin commented Dec 16, 2019 via email

@lastephey
Copy link
Contributor Author

Thanks for the advice, I'll do that.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 16, 2019

@lastephey Thanks for looking at this! As @mrocklin pointed out, I have a short work schedule, so I couldn't respond until today.

@mrocklin is correct. The best way to test is to do it via a conda install with mpi4py on your laptop. Install mpich and/or openmpi for testing.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 16, 2019

So, these kinds of errors have come up quite frequently when running with MPI. The work-around has always been to disable the --nanny option. (That is, use the --no-nanny option.) If you do this, then the tests pass.

I'm going to look a little further to see if I can figure out what is happening. I've always suspected that MPI does not like it when you fork/spawn new processes from a running MPI process. If that is the case, then I think that maybe we need to always use the --no-nanny option (or make it the default).

@lastephey
Copy link
Contributor Author

Thanks for looking into this @kmpaul . Oddly all the tests pass for me on my laptop using conda installed mpi4py with both --nanny or --no-nanny. I am packing up my setup in a container in case it's useful.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 16, 2019

Interesting! I'm able to reproduce the errors both on my laptop and in a docker container. Have you checked out your development branch?

@lastephey
Copy link
Contributor Author

Yes. I checked out the fix_mpi_sched branch from my forked dask-mpi repo.

What kind of mpi are you using/how did you install it?

Screen Shot 2019-12-16 at 11 05 48 AM

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 16, 2019

Everything is installed from the .circleci/install.sh script. You should just be able to do the following:

$ docker run -it continuumio/miniconda:latest

And then from the container run:

$ git clone https://github.com/lastephey/dask-mpi
$ cd dask-mpi
$ git checkout fix_mpi_sched
$ export PYTHON=3.6
$ export MPI=mpich
$ export ENV_NAME=dask-mpi-dev
$ .circleci/install.sh
$ conda activate dask-mpi-dev
$ pytest --verbose dask_mpi/tests/

@lastephey
Copy link
Contributor Author

Thanks @kmpaul . I followed your directions exactly except I needed to additionally pip install pytest inside the container and conda activate dask-mpi-dev. I can reproduce the hang that happens during the CI testing and that you also see.

But... this change in cli.py seems to be working in practice, it just fails the unit tests. Is there some way to make the tests more flexible/robust?

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 16, 2019

Looking at the test_core.py and test_cli.py tests, you will notice that the test_core.py tests use subprocess.Popen to launch the MPI job, while the test_cli.py tests use distributed.utils_test.popen to launch the MPI job (and the distributed.utils_test.loop fixture). I'm not sure if one of these could be the problem.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 17, 2019

Well, I've tried playing with these fixtures a bit but to no avail. Still hanging. I'm going to look more at this tomorrow.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 17, 2019

@lastephey Just so you know, I've had problems with the testing framework in the past. It has been challenging to test without problems. I believe you that the implementation in cli.py works. Now the challenge is to get tests that accurate demonstrate it.

@lastephey
Copy link
Contributor Author

@kmpaul Thanks for your help with this. I'll try poking at it as well.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 18, 2019

@lastephey I believe I know what is happening.

If you look at the dask_mpi/tests/test_cli.py tests, you will notice that they assume the use of a scheduler_file parameter when initializing the Client.

@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_basic(loop, nanny, mpirun):
with tmpfile(extension="json") as fn:
cmd = mpirun + ["-np", "4", "dask-mpi", "--scheduler-file", fn, nanny]
with popen(cmd):
with Client(scheduler_file=fn) as c:
start = time()
while len(c.scheduler_info()["workers"]) != 3:
assert time() < start + 10
sleep(0.2)
assert c.submit(lambda x: x + 1, 10, workers=1).result() == 11

What we want to do is change line 31 (of dask_mpi/tests/test_cli.py) from initializing the client via Client(scheduler_file=fn) to something like Client(address=URL)...but the problem is you don't actually know the URL from within the test. And I can't think of a way of guaranteeing what the URL should be in the test itself.

@mrocklin Perhaps you have some ideas on this?

Copy link
Collaborator

@kmpaul kmpaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've suggested some changes and also requested some items for discussion. I think there is a bigger problem here that needs solving.

dask_mpi/cli.py Outdated
Comment on lines 94 to 96
scheduler_address = comm.bcast(None, root=0)
dask.config.set(scheduler_address=scheduler_address)
comm.Barrier()
Copy link
Collaborator

@kmpaul kmpaul Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, you probably want to preface these lines with something like:

if not scheduler_file:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also added this if not scheduler_file and it causes the program to hang. i believe we want to send the scheduler info no matter what, so i'm going to leave this out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above.

cmd = mpirun + ["-np", "4", "dask-mpi", "--scheduler-file", fn, nanny, "&"]

with popen(cmd):
with Client(scheduler_file=fn) as c:
Copy link
Collaborator

@kmpaul kmpaul Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 31, above, is the culprit line causing the tests to hang. It is in this line that the distributed.Client is being initialized and told to find the Scheduler by looking up the address parameter in the scheduler_file. The scheduler_file (called fn in this test) is not being used by dask-mpi, so no information is being written to the scheduler_file from the Scheduler. Hence, the Client keeps waiting and doesn't know how to find and connect to the Scheduler.

It is unclear to me how this should change if the Scheduler uses an MPI broadcast to tell the Workers to what address to connect. The Client is left out of the broadcast, and so has no means of knowing where the Scheduler is located. (This is different in the core.py code, where the Client's process actually participates in the broadcast, and therefore knows where to the Scheduler is located.)

The correct solution to this problem probably requires some discussion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might also note that when you run dask-mpi by hand, as is commonly done, then you can get the address of the Scheduler from the log messages, and you can connect your Client to that. In this case, though, when you are automating the launch of dask-mpi in the test, the only way I know of getting the Scheduler address is (1) through a scheduler_file or (2) scrubbing STDOUT (or the log file) from the Scheduler process to find the address. Option (1) defeats the purpose of using the MPI broadcast, and Option (2) seems messy to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearly this decision is up to the main developers, but IMHO the unit tests should reflect how people are commonly using dask-mpi and not the corner cases that stem from the CI setup.

That said, I very much appreciate your help in carefully looking over this PR @kmpaul and @mrocklin !

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are more than welcome, @lastephey! And I very much appreciate your PR. Help on OSS is always appreciated. 😄

I wouldn't call this a corner case. The difficulty in getting this test to pass with the previous code changes was a sign that something was wrong. And the issue was that by using an MPI broadcast to send the scheduler info to the workers, there was no way to also send the scheduler info to the client (i.e., the client created in the test). Not without using a scheduler_file, and the use of a scheduler_file seems redundant if you are using an MPI broadcast.

@codecov
Copy link

codecov bot commented Dec 19, 2019

Codecov Report

Merging #46 into master will increase coverage by 0.15%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #46      +/-   ##
==========================================
+ Coverage      96%   96.15%   +0.15%     
==========================================
  Files           3        3              
  Lines          75       78       +3     
==========================================
+ Hits           72       75       +3     
  Misses          3        3
Impacted Files Coverage Δ
dask_mpi/cli.py 97.14% <100%> (+0.26%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update de00068...da75a6d. Read the comment docs.

Copy link
Collaborator

@kmpaul kmpaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid I am not understanding why you want to implement these changes, now. I think I need to better understand why you need to MPI broadcast the scheduler info to the workers and save the scheduler info in a scheduler_file. Since both of these methods allow the workers to find the scheduler, only one is needed. Correct?

There also exists another issue (which I've alluded to in previous comments on this PR) of how do you send the scheduler info to the client (not just the workers). Both the workers and the client can get the scheduler info from the scheduler_file. However, only the workers will get the scheduler info from an MPI broadcast. (This is why the existing tests in test_cli.py hang.)

If you want to use an MPI broadcast to communicate the scheduler info to both the workers and the client, then you have to have the client process started via mpirun in the same way that the scheduler and worker processes are started. This functionality already exists in the dask_mpi.core.initialize method.

If communicating the scheduler info to the client isn't something you need or care about, then I can see modifying the dask-mpi CLI to use either a scheduler_file or an MPI broadcast to communicate the scheduler info to the workers. If that is indeed what you want, then you need to have a way of disabling the scheduler_file approach in the code (currently, it is always enabled). You would also need to add a test or two for the new MPI broadcast only approach...and as I've mentioned before, I'm not sure how to write those tests because you would need a client (in the test) to find the scheduler.

Are my concerns clear? Am I misunderstanding something? I really appreciate the PR, and I don't mean to be difficult. I'm just afraid I'm misunderstanding your goal.

dask_mpi/cli.py Outdated
Comment on lines 94 to 96
scheduler_address = comm.bcast(None, root=0)
dask.config.set(scheduler_address=scheduler_address)
comm.Barrier()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above.

@lastephey
Copy link
Contributor Author

Hi @kmpaul

I think we are approaching this problem from two different viewpoints. I think you are viewing the client as a part of the cluster (hence assigning it to rank=1 in core.py) and we would prefer to run the client separately for ease of viewing the dashboard on our shared system.

This is why we wanted to use MPI to sync the scheduler info between the scheduler and the workers, but continue to use the scheduler_file for the client (which we instantiate separately from the cluster).

To give you an idea of how we are using dask-mpi at NERSC at the moment, you can check out some of @rcthomas notebooks: https://gitlab.com/NERSC/nersc-notebooks/tree/master/dask

Does this help make our motivation more clear? Of course if there is a better way to make our setup work, we are open to suggestions.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 19, 2019

@lastephey Ok. I'm very sorry for missing the reason for wanting both a scheduler_file and MPI broadcasting. I'm finally coming around.

However, I still don't think you actually need to MPI broadcast. I think you just need some appropriately placed MPI Barriers to make sure that the Scheduler has started (and wrote the new scheduler_file) before the Workers start (and attempt to read the scheduler_file). Doesn't that seem like it should fix your problem?

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 19, 2019

If you could accomplish this with just MPI Barriers, then it might be nice to add a test demonstrating the problems you experience with lingering stale scheduler_files.

@lastephey
Copy link
Contributor Author

@kmpaul ok good, I'm glad we are on the same page. We should have been more clear about our setup-- sorry for the confusion.

Yes I think appropriately placed barriers would do just as well. Let me try to get that working and I'll add a stale scheduler file unit test, too.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 19, 2019

Cool! Again, I'm really sorry for all the confusion. I feel like I just asked you to do a bunch of work that may not be necessary at all. Sorry. I would hate to have done that.

My first thoughts might be to just remove the dask.config and comm.bcast(...) lines entirely, leaving just the comm.Barrier() lines. That might do the trick. And if that's still not enough, you might add an asyncio.sleep(0.1) or something just before the Scheduler rank's barrier call to allow for enough time to write the new scheduler_file.

Tricky problem, but I think we might be narrowing on the solution. 😄

@lastephey
Copy link
Contributor Author

The barrier solution seems to work ok even without the asyncio sleep, so i left it out.

I'll work on the unit test now.

@lastephey
Copy link
Contributor Author

I have no idea why the tests failed before and passed now. I didn't make any meaningful changes.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 19, 2019

Yeah. That's very weird, but I've seen instabilities in MPI CI testing before. I've never fully understood why this happens.

Copy link
Collaborator

@kmpaul kmpaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this solution. Tests are passing, so I'm going to merge it.

@kmpaul kmpaul merged commit 2d516e0 into dask:master Dec 19, 2019
@lastephey
Copy link
Contributor Author

ok thanks @kmpaul ! do you want the new unit test as a new PR?

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 19, 2019

Ach! Sorry. I was trigger happy.

Yes. Another PR would be great.

@lastephey
Copy link
Contributor Author

Hi @kmpaul, just wondering if you're planning a release any time soon? I see the last release was Oct 2019 and this patch went in Jan 2020. It would be great to have a release install of installing from a commit.

@kmpaul
Copy link
Collaborator

kmpaul commented Jul 30, 2020

@lastephey: This is now done. Version 2.21.0 of dask-mpi is released on PyPI and conda-forge.

@kmpaul
Copy link
Collaborator

kmpaul commented Jul 30, 2020

...though the conda-forge release may take a bit to propagate out. Should be available for download soon.

@lastephey
Copy link
Contributor Author

Thank you very much for the fast turnaround!

@kmpaul
Copy link
Collaborator

kmpaul commented Jul 30, 2020

You're welcome. 😄 I wish I could be this fast all the time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants