forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
simple_queue.h
79 lines (68 loc) · 2.54 KB
/
simple_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#ifndef CAFFE2_UTILS_SIMPLE_QUEUE_H_
#define CAFFE2_UTILS_SIMPLE_QUEUE_H_
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <queue>
#include "caffe2/core/logging.h"
namespace caffe2 {
// This is a very simple queue that Yangqing wrote when bottlefeeding the baby,
// so don't take it seriously. What it does is a minimal thread-safe queue that
// allows me to run network as a DAG.
//
// A usual work pattern looks like this: one or multiple producers push jobs
// into this queue, and one or multiple workers pops jobs from this queue. If
// nothing is in the queue but NoMoreJobs() is not called yet, the pop calls
// will wait. If NoMoreJobs() has been called, pop calls will return false,
// which serves as a message to the workers that they should exit.
template <typename T>
class SimpleQueue {
public:
SimpleQueue() : no_more_jobs_(false) {}
// Pops a value and writes it to the value pointer. If there is nothing in the
// queue, this will wait till a value is inserted to the queue. If there are
// no more jobs to pop, the function returns false. Otherwise, it returns
// true.
bool Pop(T* value) {
std::unique_lock<std::mutex> mutex_lock(mutex_);
while (queue_.size() == 0 && !no_more_jobs_) cv_.wait(mutex_lock);
if (queue_.size() == 0 && no_more_jobs_) return false;
*value = queue_.front();
queue_.pop();
return true;
}
int size() {
std::unique_lock<std::mutex> mutex_lock(mutex_);
return queue_.size();
}
// Push pushes a value to the queue.
void Push(const T& value) {
{
std::lock_guard<std::mutex> mutex_lock(mutex_);
CAFFE_ENFORCE(!no_more_jobs_, "Cannot push to a closed queue.");
queue_.push(value);
}
cv_.notify_one();
}
// NoMoreJobs() marks the close of this queue. It also notifies all waiting
// Pop() calls so that they either check out remaining jobs, or return false.
// After NoMoreJobs() is called, this queue is considered closed - no more
// Push() functions are allowed, and once existing items are all checked out
// by the Pop() functions, any more Pop() function will immediately return
// false with nothing set to the value.
void NoMoreJobs() {
{
std::lock_guard<std::mutex> mutex_lock(mutex_);
no_more_jobs_ = true;
}
cv_.notify_all();
}
private:
std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
bool no_more_jobs_{};
// We do not allow copy constructors.
SimpleQueue(const SimpleQueue& /*src*/) {}
};
} // namespace caffe2
#endif // CAFFE2_UTILS_SIMPLE_QUEUE_H_