forked from horovod/horovod
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mpi_ops.py
182 lines (142 loc) · 5.92 KB
/
mpi_ops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
# Modifications copyright (C) 2017 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Inter-process communication using MPI."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import re
import tensorflow as tf
from tensorflow.python.framework import load_library
from tensorflow.python.framework import ops
from tensorflow.python.platform import resource_loader
from horovod.common import get_ext_suffix
from horovod.common import HorovodBasics as _HorovodBasics
def _load_library(name, op_list=None):
"""Loads a .so file containing the specified operators.
Args:
name: The name of the .so file to load.
op_list: A list of names of operators that the library should have. If None
then the .so file's contents will not be verified.
Raises:
NameError if one of the required ops is missing.
NotFoundError if were not able to load .so file.
"""
filename = resource_loader.get_path_to_datafile(name)
library = load_library.load_op_library(filename)
for expected_op in (op_list or []):
for lib_op in library.OP_LIST.op:
if lib_op.name == expected_op:
break
else:
raise NameError(
'Could not find operator %s in dynamic library %s' %
(expected_op, name))
return library
MPI_LIB = _load_library('mpi_lib' + get_ext_suffix(),
['HorovodAllgather', 'HorovodAllreduce'])
_basics = _HorovodBasics(__file__, 'mpi_lib')
# import basic methods
init = _basics.init
shutdown = _basics.shutdown
size = _basics.size
local_size = _basics.local_size
rank = _basics.rank
local_rank = _basics.local_rank
mpi_threads_supported = _basics.mpi_threads_supported
def _normalize_name(name):
"""Normalizes operation name to TensorFlow rules."""
return re.sub('[^a-zA-Z0-9_]', '_', name)
def _allreduce(tensor, name=None):
"""An op which sums an input tensor over all the Horovod processes.
The reduction operation is keyed by the name of the op. The tensor type and
shape must be the same on all Horovod processes for a given name. The reduction
will not start until all processes are ready to send and receive the tensor.
Returns:
A tensor of the same shape and type as `tensor`, summed across all
processes.
"""
if name is None:
name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
return MPI_LIB.horovod_allreduce(tensor, name=name)
@ops.RegisterGradient('HorovodAllreduce')
def _allreduce_grad(op, grad):
"""Gradient for allreduce op.
Args:
op: An operation.
grad: `Tensor` gradient with respect to the output of the op.
Returns:
The gradient with respect to the input of the op.
"""
return _allreduce(grad)
def allgather(tensor, name=None):
"""An op which concatenates the input tensor with the same input tensor on
all other Horovod processes.
The concatenation is done on the first dimension, so the input tensors on the
different processes must have the same rank and shape, except for the first
dimension, which is allowed to be different.
Returns:
A tensor of the same type as `tensor`, concatenated on dimension zero
across all processes. The shape is identical to the input shape, except for
the first dimension, which may be greater and is the sum of all first
dimensions of the tensors in different Horovod processes.
"""
if name is None:
name = 'HorovodAllgather_%s' % _normalize_name(tensor.name)
return MPI_LIB.horovod_allgather(tensor, name=name)
@ops.RegisterGradient('HorovodAllgather')
def _allgather_grad(op, grad):
"""Gradient for allgather op.
Args:
op: An operation.
grad: `Tensor` gradient with respect to the output of the op.
Returns:
The gradient with respect to the input of the op.
"""
grad = _allreduce(grad)
x = op.inputs[0]
d0 = x.get_shape().as_list()[0]
d = tf.convert_to_tensor([d0], dtype=tf.int32)
s = size()
d = tf.reshape(allgather(d), [s])
splits = tf.split(grad, num_or_size_splits=d, axis=0)
return splits[rank()]
def broadcast(tensor, root_rank, name=None):
"""An op which broadcasts the input tensor on root rank to the same input tensor
on all other Horovod processes.
The broadcast operation is keyed by the name of the op. The tensor type and
shape must be the same on all Horovod processes for a given name. The broadcast
will not start until all processes are ready to send and receive the tensor.
Returns:
A tensor of the same shape and type as `tensor`, with the value broadcasted
from root rank.
"""
if name is None:
name = 'HorovodBroadcast_%s' % _normalize_name(tensor.name)
return MPI_LIB.horovod_broadcast(tensor, name=name, root_rank=root_rank)
@ops.RegisterGradient('HorovodBroadcast')
def _broadcast_grad(op, grad):
"""Gradient for broadcast op.
Args:
op: An operation.
grad: `Tensor` gradient with respect to the output of the op.
Returns:
The gradient with respect to the input of the op.
"""
root_rank = op.get_attr('root_rank')
grad_reduced = _allreduce(grad)
if rank() != root_rank:
return grad_reduced * 0
return grad_reduced