-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blocking queue for reader #10206
Blocking queue for reader #10206
Conversation
@JiayiFeng just a quick question. I noticed that there is already an implementation of blocking queue in fluid: What is the difference between the blocking queue in this PR and the blocking queue that is already used in PaddlePaddle? The PR replaces |
Hi @tpatejko, I have just updated the PR description. And maybe your questions can be answered by it. |
@@ -58,7 +58,7 @@ class DoubleBufferReader : public framework::DecoratedReader { | |||
bool HasNext() const; | |||
|
|||
void StartPrefetcher() { | |||
channel_ = framework::MakeChannel<size_t>(kChannelSize); | |||
channel_ = new reader::BlockingQueue<size_t>(kChannelSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you name the class BlockingQueue
, please name the variable blocking_queue_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think channel
is a better name. It is implemented with blocking queue while it is essentially a channel.
It seems that this PR DOESN'T replace framework::Channel. Instead, it replaces the invocations of framework::Channel in reader/*. |
namespace reader { | ||
|
||
template <typename T> | ||
class BlockingQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we need a class comment, something like
// BlockingQueue is for buffered reading and is supposed to use only the reader package. It is true that we could and we should have been using framework::Channel, but which has currently a deadlock bug. BlockingQueue is a workaround and a simplified version of framework::Channel as it doesn't support GPU and it implements on buffered blocking queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
return closed_; | ||
} | ||
|
||
bool CanSend() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid that CanSend is useless. Please correct me if I am wrong.
In my mind, the usage of CanSend is
if (q.CanSend()) {
q.Send(...);
}
However, between the invocation of CanSend and Send, there could be some other threads who wrote something into the queue and made it no longer CanSend.
If I am right, please delete CanSend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's correct that CanSend
and CanReceive
are not thread-safe. They are offered for two reasons:
-
framework::Channel
has these two interfaces. As a replacement offramework::Channel
,reader::BlockingQueue
would better to have similar interfaces. It is possible that we reuse theframework::Channel
in readers in the future. Similar interfaces can reduce the migration workload. -
In current implementations of all c++ readers,
CanSend
andCanReceive
are invoked single-threaded. So no bug is caused.
However, I agree that removing them is a better choise. Hidden troubles should be eliminated at the very start.
return !closed_ && queue_.size() < capacity_; | ||
} | ||
|
||
bool CanReceive() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comments as the one I gave to CanSend.
std::unique_lock<std::mutex> lock(mutex_); | ||
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); | ||
if (closed_) { | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we might need to VLOG a warning in addition to returning false because sending to a closed channel is very likely a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea!
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); | ||
if (closed_) { | ||
return false; | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me that we don't need this else
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
EXPECT_FALSE(q.CanSend()); | ||
} | ||
|
||
void FirstInFirstOut(size_t queue_cap, size_t elem_num, size_t send_time_gap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the point of having send_time_gap and receive_time_gap? Can we remove them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In unit tests, sender threads start before receiver threads. If we don't set the time_gap
, all sender threads may have finished before receiver threads starting. Bugs that only appear when senders and receivers run concurrently will not be found.
send_cv_.notify_one(); | ||
return true; | ||
} else { | ||
PADDLE_ENFORCE(closed_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的假设是不是 —— “如果队列空,就必须是已经closed的”?
如果是,那么这个假设不合理吧。队列刚创立的时候是空的;此时,如果在writer写入数据之前,reader调用了Receive,难道就crash了?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool Receive(T* elem) {
std::unique_lock<std::mutex> lock(mutex_);
receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; });
if (!queue_.empty()) {
PADDLE_ENFORCE_NOT_NULL(elem);
*elem = queue_.front();
queue_.pop_front();
send_cv_.notify_one();
return true;
} else {
PADDLE_ENFORCE(closed_);
return false;
}
}
if之前有一个条件变量的wait操作。所以当代码运行到这个if...else...
的时候,意味着“队列非空或者队列已被close”。然后如果进入了else
分支,说明此时“队列为空”,那么显然,队列已经被close。
您说的一上来直接就receive的情况,这时候队列为空,并且没有close,那么会在前面的条件变量wait处就被阻塞。所以并不会出现您说的问题。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
明白了!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@JiayiFeng @wangkuiyi Thanks for clarifying the idea! |
This PR replaces the invocations of
framework::Channel
in readers with new implementation ofreader::BlockingQueue
.Why not keep using
framework::Channel
?framework::Channel
is a great idea. It provides rich functionality and the usage of only a small part of it has made the developing of readers quite easy. However, the implementation offramework::Channel
is not very stable so far. It crashes in GCC 4.8.2 environment and suffers occasional deadlock. So we implemented thereader::BlockingQueue
, which can be regarded as an extremely simplifiedframework::Channel
. It has similar interfaces withframework::Channel
while only provides features that readers really need. Its conciseness makes itself easy to maintain, and also it runs a little faster thanframeowrk::Channel
(892s vs 910s, 1000 batches in the transformer job).Why not merge
framework::BlockingQueue
andreader::BlockingQueue
?These two blocking queues have different customized features. The
framework::BlockingQueue
is mainly used in theParallelExecutor
, so it supportsextend
operation and timeout mechanism. Theframework::BlockingQueue
is only used inReaders
. As a replacement offramework::Channel
, it supports capacity limitation and closing mechanism.It's hard to implement all these features in a single blocking queue.