/
allreduce.h
195 lines (155 loc) · 5.29 KB
/
allreduce.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <functional>
#include <memory>
#include <vector>
#include "gloo/context.h"
#include "gloo/transport/unbound_buffer.h"
namespace gloo {
namespace detail {
struct AllreduceOptionsImpl {
// This type describes the function to use for element wise reduction.
//
// Its arguments are:
// 1. non-const output pointer
// 2. const input pointer 1 (may be equal to 1)
// 3. const input pointer 2 (may be equal to 1)
// 4. number of elements to reduce.
//
// Note that this function is not strictly typed and takes void pointers.
// This is specifically done to avoid the need for a templated options class
// and templated algorithm implementations. We found this adds very little
// value for the increase in compilation time and code size.
//
using Func = std::function<void(void*, const void*, const void*, size_t)>;
enum Algorithm {
UNSPECIFIED = 0,
RING = 1,
BCUBE = 2,
};
explicit AllreduceOptionsImpl(const std::shared_ptr<Context>& context)
: context(context),
timeout(context->getTimeout()),
algorithm(UNSPECIFIED) {}
std::shared_ptr<Context> context;
// End-to-end timeout for this operation.
std::chrono::milliseconds timeout;
// Algorithm selection.
Algorithm algorithm;
// Input and output buffers.
// The output is used as input if input is not specified.
std::vector<std::unique_ptr<transport::UnboundBuffer>> in;
std::vector<std::unique_ptr<transport::UnboundBuffer>> out;
// Number of elements.
size_t elements = 0;
// Number of bytes per element.
size_t elementSize = 0;
// Reduction function.
Func reduce;
// Tag for this operation.
// Must be unique across operations executing in parallel.
uint32_t tag = 0;
// This is the maximum size of each I/O operation (send/recv) of which
// two are in flight at all times. A smaller value leads to more
// overhead and a larger value leads to poor cache behavior.
static constexpr size_t kMaxSegmentSize = 1024 * 1024;
// Internal use only. This is used to exercise code paths where we
// have more than 2 segments per rank without making the tests slow
// (because they would require millions of elements if the default
// were not configurable).
size_t maxSegmentSize = kMaxSegmentSize;
};
} // namespace detail
class AllreduceOptions {
public:
using Func = detail::AllreduceOptionsImpl::Func;
using Algorithm = detail::AllreduceOptionsImpl::Algorithm;
explicit AllreduceOptions(const std::shared_ptr<Context>& context)
: impl_(context) {}
void setAlgorithm(Algorithm algorithm) {
impl_.algorithm = algorithm;
}
template <typename T>
void setInput(std::unique_ptr<transport::UnboundBuffer> buf) {
std::vector<std::unique_ptr<transport::UnboundBuffer>> bufs(1);
bufs[0] = std::move(buf);
setInputs<T>(std::move(bufs));
}
template <typename T>
void setInputs(std::vector<std::unique_ptr<transport::UnboundBuffer>> bufs) {
impl_.elements = bufs[0]->size / sizeof(T);
impl_.elementSize = sizeof(T);
impl_.in = std::move(bufs);
}
template <typename T>
void setInput(T* ptr, size_t elements) {
setInputs(&ptr, 1, elements);
}
template <typename T>
void setInputs(std::vector<T*> ptrs, size_t elements) {
setInputs(ptrs.data(), ptrs.size(), elements);
}
template <typename T>
void setInputs(T** ptrs, size_t len, size_t elements) {
impl_.elements = elements;
impl_.elementSize = sizeof(T);
impl_.in.reserve(len);
for (size_t i = 0; i < len; i++) {
impl_.in.push_back(
impl_.context->createUnboundBuffer(ptrs[i], elements * sizeof(T)));
}
}
template <typename T>
void setOutput(std::unique_ptr<transport::UnboundBuffer> buf) {
std::vector<std::unique_ptr<transport::UnboundBuffer>> bufs(1);
bufs[0] = std::move(buf);
setOutputs<T>(std::move(bufs));
}
template <typename T>
void setOutputs(std::vector<std::unique_ptr<transport::UnboundBuffer>> bufs) {
impl_.elements = bufs[0]->size / sizeof(T);
impl_.elementSize = sizeof(T);
impl_.out = std::move(bufs);
}
template <typename T>
void setOutput(T* ptr, size_t elements) {
setOutputs(&ptr, 1, elements);
}
template <typename T>
void setOutputs(std::vector<T*> ptrs, size_t elements) {
setOutputs(ptrs.data(), ptrs.size(), elements);
}
template <typename T>
void setOutputs(T** ptrs, size_t len, size_t elements) {
impl_.elements = elements;
impl_.elementSize = sizeof(T);
impl_.out.reserve(len);
for (size_t i = 0; i < len; i++) {
impl_.out.push_back(
impl_.context->createUnboundBuffer(ptrs[i], elements * sizeof(T)));
}
}
void setReduceFunction(Func fn) {
impl_.reduce = fn;
}
void setTag(uint32_t tag) {
impl_.tag = tag;
}
void setMaxSegmentSize(size_t maxSegmentSize) {
impl_.maxSegmentSize = maxSegmentSize;
}
void setTimeout(std::chrono::milliseconds timeout) {
impl_.timeout = timeout;
}
protected:
detail::AllreduceOptionsImpl impl_;
friend void allreduce(const AllreduceOptions&);
};
void allreduce(const AllreduceOptions& opts);
} // namespace gloo