forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
recurrent_network_executor_gpu.cc
155 lines (136 loc) · 4.81 KB
/
recurrent_network_executor_gpu.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include "caffe2/operators/rnn/recurrent_network_executor_gpu.h"
#include "caffe2/core/context_gpu.h"
namespace caffe2 {
template <>
std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor<CUDAContext>(
const NetDef& step_net_def,
std::map<string, string>& recurrent_input_map,
std::string timestep_blob,
ArgumentHelper arg_helper) {
auto* exec = new CUDARecurrentNetworkExecutor(
step_net_def, recurrent_input_map, timestep_blob);
int max_streams = arg_helper.GetSingleArgument<int>("rnn_executor.max_cuda_streams", 0);
if (max_streams > 0) {
exec->setMaxStreams(max_streams);
LOG(INFO) << "Set max streams:" << max_streams;
}
std::unique_ptr<RecurrentNetworkExecutorBase> ptr(exec);
return ptr;
}
CUDARecurrentNetworkExecutor::~CUDARecurrentNetworkExecutor() {
for (cudaEvent_t ev : events_) {
if (ev != nullptr) {
CUDA_CHECK(cudaEventDestroy(ev));
}
}
}
/**
* Special execution for CUDA. It tries to run ops with as little overhead as
* possible, but to identify opportunities to run ops with "frontier execution"
* parallelism, i.e by starting kernel from next timestep in parallel with
* the current timestep. This is done by assigning streams.
*/
void CUDARecurrentNetworkExecutor::_ExecRange(int from, int to) {
int direction = to > from ? 1 : -1;
int max_streams = max_parallel_timesteps_ > 0 ?
std::min(max_parallel_timesteps_, max_cuda_streams_)
: max_cuda_streams_;
int stream_seq = 0;
int num_ops = timestep_ops_[0].size();
events_.resize(num_ops * timestep_ops_.size(), nullptr);
int gpu_id = -1;
// Loop over timesteps
for (int t = from; t != to; t += direction) {
bool first_timestep = t == from;
bool last_timestep =
(direction == -1 && t == 0) || (direction == 1 && t == to - 1);
auto& ops = timestep_ops_[t];
int stream_id = stream_seq % max_streams;
for (int i = 0; i < ops.size(); i++) {
auto& rnn_op = ops[i];
// Special handling for link ops -- we just run them directly
// they do not execute any kernels.
if (rnn_op.link_op) {
rnn_op.op->RunAsync(stream_id);
CAFFE_ENFORCE(
rnn_op.dependencies.empty(),
"GPU executor ignores link dependencies");
continue;
}
if (gpu_id == -1 &&
rnn_op.op->device_option().device_type() ==
DeviceTypeProto::PROTO_CUDA) {
gpu_id = rnn_op.op->device_option().device_id();
} else {
CAFFE_ENFORCE(
rnn_op.op->device_option().device_type() == 0 ||
rnn_op.op->device_option().device_id() == gpu_id,
"RNN Executor only supports ops on one GPU");
}
// If have recurrent parents, add for event waits so that those
// parents complete their work.
if (has_timestep_parallelism_ && !first_timestep) {
for (int parent : rnn_op.parents) {
if (parent > i) {
int parent_ev_idx = (t - direction) * num_ops + parent;
CHECK(events_.size() > parent_ev_idx);
CAFFE_ENFORCE(events_[parent_ev_idx] != nullptr);
CUDA_CHECK(cudaStreamWaitEvent(
CUDAContext::cuda_stream(gpu_id, stream_id),
events_[parent_ev_idx],
0));
}
}
}
// Run the op in the given stream
rnn_op.op->RunAsync(stream_id);
// Create and record event for this op, if it has at least one
// recurrent dependency.
if (has_timestep_parallelism_ && !last_timestep) {
for (int dep : rnn_op.dependencies) {
if (dep < i) {
int event_idx = t * num_ops + i;
// Create event for recurrent connections
if (events_[event_idx] == nullptr) {
CUDA_CHECK(cudaEventCreate(&events_[event_idx]));
}
CUDA_CHECK(cudaEventRecord(
events_[event_idx],
CUDAContext::cuda_stream(gpu_id, stream_id)));
break;
}
}
}
} // for over ops
// Next timestep will run on different stream
if (has_timestep_parallelism_) {
stream_seq++;
}
} // for over timesteps
/**
* Wait for all the started streams to complete.
*/
for (int stream_id = 0; stream_id <= std::min(stream_seq, max_streams - 1);
stream_id++) {
VLOG(1) << "Wait for stream:" << stream_id;
CUDA_CHECK(
cudaStreamSynchronize(CUDAContext::cuda_stream(gpu_id, stream_id)));
}
}
bool CUDARecurrentNetworkExecutor::Run(int T) {
CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
if (T == 0) {
return true;
}
_ExecRange(0, T);
return true;
}
bool CUDARecurrentNetworkExecutor::RunBackwards(int T) {
CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
if (T == 0) {
return true;
}
_ExecRange(T - 1, -1);
return true;
}
}