Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add all_gather and fix bug of multi rank doctest #6189

Merged
merged 8 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/oneflow/comm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from oneflow.comm.primitive import all_reduce
from oneflow.comm.comm_ops import all_reduce
from oneflow.comm.comm_ops import all_gather
from oneflow._C import send, recv
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ def all_reduce(tensor):
tensor([[2, 3],
[4, 5]], device='cuda:1', dtype=oneflow.int64)
>>> out = flow.comm.all_reduce(input)
>>> out
tensor([[3, 5],
[7, 9]], device='cuda:0', dtype=oneflow.int64)
>>> out.numpy()
array([[3, 5],
[7, 9]])
"""
assert isinstance(tensor, flow._oneflow_internal.Tensor)
assert tensor.device.index == flow.env.get_local_rank()
Expand All @@ -57,3 +57,49 @@ def all_reduce(tensor):
).to_consistent(placement=placement, sbp=flow.sbp.broadcast)

return tensor.to_local()


def all_gather(tensor_list, tensor):
"""
Gathers tensors from the whole group in a list.

Args:
tensor_list (list[Tensor]): Output list. It should contain
correctly-sized tensors to be used for output of the collective.

For example:

.. code-block:: python

>>> # We have 1 process groups, 2 ranks.
>>> import oneflow as flow

>>> input = flow.tensor([[1, 2], [3, 4]], device="cuda") + flow.env.get_local_rank()
>>> input # doctest: +ONLY_CHECK_RANK_0
tensor([[1, 2],
[3, 4]], device='cuda:0', dtype=oneflow.int64)
>>> input # doctest: +ONLY_CHECK_RANK_1
tensor([[2, 3],
[4, 5]], device='cuda:1', dtype=oneflow.int64)
>>> tensor_list = [flow.zeros(2, 2, dtype=flow.int64) for _ in range(2)]
>>> flow.comm.all_gather(tensor_list, input)
>>> tensor_list # doctest: +ONLY_CHECK_RANK_0
[tensor([[1, 2],
[3, 4]], device='cuda:0', dtype=oneflow.int64), tensor([[2, 3],
[4, 5]], device='cuda:0', dtype=oneflow.int64)]
>>> tensor_list # doctest: +ONLY_CHECK_RANK_1
[tensor([[1, 2],
[3, 4]], device='cuda:1', dtype=oneflow.int64), tensor([[2, 3],
[4, 5]], device='cuda:1', dtype=oneflow.int64)]
"""
assert isinstance(tensor, flow._oneflow_internal.Tensor)
assert isinstance(tensor_list, list)
assert tensor.device.index == flow.env.get_local_rank()
assert tensor.is_local
tensor = tensor.expand([1] + list(tensor.shape))
device_type = tensor.device.type
tensor = tensor.to_consistent(
placement=flow.env.all_device_placement(device_type), sbp=flow.sbp.split(0)
)
for i in range(tensor.shape[0]):
tensor_list[i] = tensor[i].to_local()
15 changes: 9 additions & 6 deletions python/oneflow/framework/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,17 @@ def __init__(self, check_flags):
self._check_flags = check_flags

def check_output(self, want, got, optionflags):
target_rank_list = [bool(flag & optionflags) for flag in self._check_flags]
if (
any(target_rank_list)
and target_rank_list.index(True) == oneflow.env.get_rank()
):
# default check_output without flag
if optionflags == 0:
return super(CondSkipChecker, self).check_output(want, got, optionflags)
else:

target_rank_list = [bool(flag & optionflags) for flag in self._check_flags]
# wrong flag will be handled before here, so any(target_rank_list) is True
# not target rank
if target_rank_list.index(True) != oneflow.env.get_rank():
return True
elif target_rank_list.index(True) == oneflow.env.get_rank():
return super(CondSkipChecker, self).check_output(want, got, optionflags)


def check_multi_rank_docstr(module):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,29 @@ def test_all_reduce_2n2d(test_case):
out = flow.comm.all_reduce(input)
test_case.assertTrue(np.allclose(out.numpy(), np_arr * 4))


class TestAllGather(flow.unittest.TestCase):
@flow.unittest.skip_unless_1n2d()
def test_all_gather_1n2d(test_case):
if flow.env.get_rank() == 0:
np_arr = np.array([[2, 3], [4, 5]])
elif flow.env.get_rank() == 1:
np_arr = np.array([[1, 2], [3, 4]])
input = flow.tensor(np_arr, device="cuda", dtype=flow.int32)
tensor_list = [flow.zeros(np_arr.shape, dtype=flow.int32) for _ in range(2)]
flow.comm.all_gather(tensor_list, input)
test_case.assertTrue(
np.allclose(tensor_list[0].numpy(), np.array([[2, 3], [4, 5]]))
)
test_case.assertTrue(
np.allclose(tensor_list[1].numpy(), np.array([[1, 2], [3, 4]]))
)


@flow.unittest.skip_unless_1n2d()
class TestDocs(flow.unittest.TestCase):
def test_docs(test_case):
oneflow.framework.unittest.check_multi_rank_docstr(oneflow.comm.primitive)
oneflow.framework.unittest.check_multi_rank_docstr(oneflow.comm.comm_ops)


if __name__ == "__main__":
Expand Down