Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Join op (with only support for AllReduce and PyTorch for now) #1058

Merged
merged 6 commits into from
Nov 1, 2019

Conversation

kit1980
Copy link
Contributor

@kit1980 kit1980 commented May 7, 2019

This is very earlier work in progress PR to add hvd.join() as described in #832

This is mostly boilerplate code, it compiles and I can call hvd.join() from the pytorch test (the Join op goes to the queue but does nothing).

The purpose of this PR for me is to understand if I'm going in the right direction.
Some other questions I have (in no particular order):

  1. Should Join op have name?
  2. Join op doesn't have an input, and should have single int output (last joined rank), right?
  3. Does the Join op need to be implemented separately for cuda, nccl, in cuda_operations.cc, nccl_operations.cc, etc. ?
  4. In operation manager, should it be a vector of available Join ops or just one Join op?
  5. The logic for join op should track already joined ranks, this should be similar to the logic in AllReduce and AllGather ops? Any code pointers/suggestions?
  6. AllReduce and AllGather ops need to know already joined ranks. Is it going to be stored in global state? Any code pointers/suggestions?
  7. Any suggestions how to make adding Join op more granular (several smaller PRs)?

@kit1980
Copy link
Contributor Author

kit1980 commented May 7, 2019

  1. For PyTorch, are both mpi_lib_v2 and mpi_lib implementations will be needed eventually or only v2?

Copy link
Member

@alsrgv alsrgv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for starting this! I left some stylistic feedback. To answer your questions:

  1. I don't think Join op needs a name, but we should make sure only one Join op is enqueued on any given rank. Duplicate name checking may be a way around it.

  2. Yes, it should return last joined rank.

  3. It feels like we need to expand the API of operations to allow for no-ops. All kinds of operations would then have to implement this additional API.

  4. If I understand correctly, you're thinking about bit vectors in accelerated orchestration logic? I think two bits would be sufficient to indicate that some/all of ranks did Join().

5-6. Rank-tracking logic is all done in the orchestration layer. We should implement this both for bitvector and standard approach. The logic for both is here, I recommend careful study as it got a bit complicated over time.

  1. I'd suggest sticking to a single PR for now until it becomes unmanageable (usually it doesn't).

  2. We'd need to eventually support PyTorch, TensorFlow and Apache MXNet. Legacy PyTorch could be made unsupported if it's too much work.

.cache/v/cache/lastfailed Outdated Show resolved Hide resolved
horovod/common/message.cc Show resolved Hide resolved
horovod/common/message.h Outdated Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/ops/collective_operations.h Outdated Show resolved Hide resolved
horovod/common/ops/collective_operations.h Outdated Show resolved Hide resolved
horovod/common/ops/operation_manager.cc Outdated Show resolved Hide resolved
horovod/common/ops/collective_operations.cc Show resolved Hide resolved
@kit1980
Copy link
Contributor Author

kit1980 commented May 31, 2019

Rebased on master, addressed style comments, fixed "Segmentation fault" because of trying to cache Join op.

horovod/common/message.cc Outdated Show resolved Hide resolved
horovod/common/message.h Outdated Show resolved Hide resolved
horovod/common/wire/message.fbs Outdated Show resolved Hide resolved
horovod/common/wire/message_generated.h Outdated Show resolved Hide resolved
horovod/common/wire/message_generated.h Outdated Show resolved Hide resolved
horovod/torch/mpi_ops.py Outdated Show resolved Hide resolved
test/test_torch.py Outdated Show resolved Hide resolved
@kit1980
Copy link
Contributor Author

kit1980 commented Jun 13, 2019

Hi @alsrgv,

I've coded some basic logic for the JoinOp (the code is not in this PR currently yet): global state stores number of ranks already joined on the coordinator, and each rank knows about itself if it already joined. Then IncrementTensorCount function takes into account number of ranks already joined, and sets ready_to_reduce if the number of ranks reducing the tensor equals to mpi_size - joined_count. And each rank checks its own joined bit to decide if it needs to actually perform the reduce operation after getting reduce response.

The problems is that the ranks that actually perform the reduce (not yet joined) use MPI or NCCL reduce, that expects all ranks to participate. So one way to solve it is to make every rank know the list of already joined ranks, and create new MPI communicator that only uses subset of all nodes; this seems not really feasible... Another idea is that all joined ranks still participate in MPI reduce (Join op Execute calls MPI reduce), but with zero tensors; but in this case the joined nodes need to somehow know the shape of the reduced tensor...

Any suggestions?

@kit1980
Copy link
Contributor Author

kit1980 commented Jun 18, 2019

An update after my last comment.
Now I'm trying to implement this design:

The coordinator tracks how many ranks sent Join request, and each rank knows if it Joined.
Then, when sending Reduce response, if at least one rank Joined, the coordinator sends Join response with the information about the reduced tensors - type and size (there will be many Join responses after one Join request). The Join response will be processed by the joined ranks only, and these ranks will use the information from the response to fill their tensor_table (the actual content of the tensors will be 0).
Then all the ranks will execute Reduce normally.

@alsrgv, could you review if it makes sense?

@kit1980
Copy link
Contributor Author

kit1980 commented Jun 18, 2019

Another problem I have is how to keep Python thread alive after one rank Joins earlier. The wait must be in the Python thread, not in the background thread, and I'm not sure how to communicate back that the wait is over (all ranks sent Join requests).

@kit1980
Copy link
Contributor Author

kit1980 commented Jun 25, 2019

So I've implemented this design (similar to my previous comment, but with some modifications):

The coordinator tracks how many ranks already sent Join requests, and each rank knows if it Joined.
Running join() from Python blocks Python thread for that rank until Join response, which is sent only when all ranks Joined.
When sending Reduce response, if at least one rank Joined, the coordinator includes tensor type and size information in the Reduce response. This information is used by the Joined ranks to construct temporary 0-filled data for MPI AllReduce, while non-Joined ranks execute normal AllReduce.

This seems to work OK with 1 small test I have. If you think this approach makes sense, I'll fix all TODOs and test/fix more scenarios (Fused responses, all data types, cache...)

@alsrgv
Copy link
Member

alsrgv commented Jul 2, 2019

@kit1980, sorry for going dark on you. Your proposed approach sounds good!

@alsrgv
Copy link
Member

alsrgv commented Jul 2, 2019

Note that it should work with AllGather, too (send empty slices from joined nodes).

@kit1980 kit1980 force-pushed the master branch 2 times, most recently from b131034 to 7374086 Compare July 24, 2019 00:17
@kit1980 kit1980 force-pushed the master branch 2 times, most recently from 084f1c1 to 530e8a9 Compare July 29, 2019 18:21
@kit1980
Copy link
Contributor Author

kit1980 commented Jul 29, 2019

Join with AllReduce now passes tests for all data types for MPI and NCCL.
Please review.

@DEKHTIARJonathan
Copy link
Collaborator

DEKHTIARJonathan commented Aug 5, 2019

I have a simple question how different it is from this:

from mpi4py import MPI
MPI.COMM_WORLD.Barrier()  # Waiting for all MPI processes to sync

Looks quite identical objective to me

@kit1980
Copy link
Contributor Author

kit1980 commented Aug 5, 2019

@DEKHTIARJonathan, this is for the case when data is unevenly distributed between workers, so different workers do different number of steps. MPI barrier can't help in this case - the workers that still have data will forever wait in optimizer.step() for the workers that already processed all their data.
Please see this issue for more details and discussion: #832

@DEKHTIARJonathan
Copy link
Collaborator

Oh I see. Thanks for the pointer. Never had this usecase

@kit1980
Copy link
Contributor Author

kit1980 commented Aug 6, 2019

@alsrgv Could you take a look at this?

Copy link
Member

@alsrgv alsrgv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments to help me understand the PR better. Could you rebase on the latest master to resolve conflicts?

horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/global_state.h Show resolved Hide resolved
@kit1980 kit1980 force-pushed the master branch 3 times, most recently from 0b4c542 to d762c2a Compare August 23, 2019 06:37
@kit1980
Copy link
Contributor Author

kit1980 commented Aug 23, 2019

Rebased on master (which was painful because of recent large code moves with simultaneous logic and formatting changes).

@kit1980
Copy link
Contributor Author

kit1980 commented Sep 12, 2019

Rebased on master again. Please review.

Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, just a few minor changes left.

horovod/common/global_state.h Show resolved Hide resolved
horovod/common/operations.cc Outdated Show resolved Hide resolved
horovod/common/tensor_queue.h Outdated Show resolved Hide resolved
horovod/torch/adapter_v2.cc Outdated Show resolved Hide resolved
horovod/torch/mpi_ops.cc Outdated Show resolved Hide resolved
test/test_torch.py Outdated Show resolved Hide resolved
@@ -211,6 +214,8 @@ class OpContext {
std::shared_ptr<PersistentBuffer>* tensor) = 0;
virtual Status AllocateOutput(TensorShape shape,
std::shared_ptr<Tensor>* tensor) = 0;
virtual Status AllocateZeros(int64_t num_elements, DataType dtype,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add stub implementations of this method to the other OpContext children, including TFOpContext in tensorflow/mpi_ops.cc and MXOpContext in mxnet/adapter.{h,cc}? These implementations should raise an error similar to the one for PyTorch <1.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for TF and MXNet.

@tgaddair
Copy link
Collaborator

@kit1980 looks like TensorFlow just switched nightly over to 2.0. I'll make a quick update to our test script and then your tests should pass.

@kit1980
Copy link
Contributor Author

kit1980 commented Oct 31, 2019

@kit1980 looks like TensorFlow just switched nightly over to 2.0. I'll make a quick update to our test script and then your tests should pass.

Thanks, I was trying to understand what's going on - the previous nightly build passed.

@kit1980
Copy link
Contributor Author

kit1980 commented Nov 1, 2019

All the tests passed.

Sergii Dymchenko added 6 commits October 31, 2019 17:50
Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Nice job!

@tgaddair tgaddair merged commit ef5804b into horovod:master Nov 1, 2019
@kit1980
Copy link
Contributor Author

kit1980 commented Nov 9, 2019

Hi @kit1980 I have a question: Does horovod's cache support the join function of pytorch? Because once there is a rank join, according to the cache, the tensor of other rank hits will be put back into tensor_queue_, so that the current horovod is always in an infinite loop state.

Can you provide a test that exhibits this behavior? I don't understand why this would happen "the tensor of other rank hits will be put back into tensor_queue_". I have test_horovod_join_allreduce for PyTorch that passes OK.

Hi, @kit1980 I don't have a pytorch test here, but if horovod's cache supports pytorch's join function, then the following situation will occur:
For ranks without join:
Train to step 1000: Since tensor1 tensor2 is cached the previous step, then it will hit
For the rank of join: there is no tensor1 tensor2
Then after the cheap mpi of the cache , because of the rank join, so for tensor1 and tensor2, not all ranks are ready. In this case, for the rank without join will put tensor1 tensor2 back Tensor_queue
That caused the program to loop
Of course, the assumption that the above situation is that pytorch will take the logic of the cache.
So my question is:
1.pytorch whether to execute the logic of the cache?

Hi @huoliquankai7,
I think I see what you mean now in my tests.
I'm working on fixing this.

@tgaddair
Copy link
Collaborator

Hey @huoliquankai7, any update on the TensorFlow PR you mentioned? Now that PyTorch has landed, would be great to get it for TF as well!

@tgaddair
Copy link
Collaborator

Hey @kit1980, what is your plan for adding Allgather, Broadcast, TensorFlow, and MXNet support for this operation? No worries if there are no immediate plans, we can add some of this work to the backlog, but just wanted to make sure we don't duplicate our efforts.

@kit1980
Copy link
Contributor Author

kit1980 commented Nov 12, 2019

Hey @kit1980, what is your plan for adding Allgather, Broadcast, TensorFlow, and MXNet support for this operation? No worries if there are no immediate plans, we can add some of this work to the backlog, but just wanted to make sure we don't duplicate our efforts.

Hi,
I've found couple of issues with Join and fusion and caching while testing with MNIST, should be fixed very soon.
After that I'll add support for Allgather and Broadcast, should be easy.
For TensorFlow and MXNet I'd like someone's help, because I don't really use these.

@Richie-yan
Copy link
Contributor

Hey @huoliquankai7, any update on the TensorFlow PR you mentioned? Now that PyTorch has landed, would be great to get it for TF as well!

Hi, @tgaddair ,Because @kit1980's join and cache have a problem, so wait for him to solve it, and then I sort it out, it should be put forward tensorflow pr soon.

@tgaddair
Copy link
Collaborator

Hey @kit1980, and update on the cache issue?

jeffdaily pushed a commit to ROCm/horovod that referenced this pull request Nov 27, 2019
…rovod#1058)

Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
@kit1980
Copy link
Contributor Author

kit1980 commented Dec 4, 2019

Hey @kit1980, and update on the cache issue?

Hi @tgaddair
I added 1 more special bit to the cache bit vector to decide that communication is needed if one of the ranks did join this tick.
It mostly works, still need to fix couple of issues I found after adding more extensive tests.

@tgaddair
Copy link
Collaborator

tgaddair commented Dec 4, 2019

Thanks for the update, @kit1980. When the PR is ready, we should also get @romerojosh to take a look, as he's the creator of the bit cache.

DelphianCalamity pushed a commit to DelphianCalamity/horovod that referenced this pull request Apr 18, 2020
…rovod#1058)

Signed-off-by: Sergii Dymchenko <sedymche@microsoft.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants