Skip to content

Commit

Permalink
// fix bug where awaitTermination won't return even if all worker quit.
Browse files Browse the repository at this point in the history
    // https://en.cppreference.com/w/cpp/thread/condition_variable
    // follow what the STD told us to
    // "Even if the shared variable is atomic, it must be modified while owning the mutex to
    // correctly publish the modification to the waiting thread."
    // DEEP EXPLAINATION:
    // https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut
  • Loading branch information
LanderlYoung committed Dec 3, 2023
1 parent 5c78c43 commit 9015681
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
21 changes: 17 additions & 4 deletions src/utils/MessageQueue.cc
Expand Up @@ -33,7 +33,10 @@ class LoopQueueGuard {

public:
explicit LoopQueueGuard(MessageQueue* queue) : queue_(queue) {
queue_->workerCount_++;
{
std::unique_lock<std::mutex> lk(queue_->queueMutex_);
++queue_->workerCount_;
}
getRunningQueue()[queue]++;
}

Expand All @@ -45,7 +48,17 @@ class LoopQueueGuard {
q.erase(queue_);
}

queue_->workerCount_--;
// bugfix: awaitTermination won't return even if all worker quit.
// https://en.cppreference.com/w/cpp/thread/condition_variable
// follow what the STD told us to
// "Even if the shared variable is atomic, it must be modified while owning the mutex to
// correctly publish the modification to the waiting thread."
// DEEP EXPLAINATION:
// https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut
{
std::unique_lock<std::mutex> lk(queue_->queueMutex_);
--queue_->workerCount_;
}
queue_->workerQuitCondition_.notify_all();
}

Expand Down Expand Up @@ -180,7 +193,7 @@ void MessageQueue::awaitTermination() {
workerQuitCondition_.wait(lk, [this] { return workerCount_ == 0; });
}

bool MessageQueue::isShutdown() {
bool MessageQueue::isShutdown() const {
std::unique_lock<std::mutex> lk(queueMutex_);
return shutdown_ != ShutdownType::kNone;
}
Expand Down Expand Up @@ -386,7 +399,7 @@ MessageQueue::LoopReturnType MessageQueue::loopQueue(MessageQueue::LoopType loop
if (loopType == LoopType::kLoopOnce) {
onceMessageCount = dueMessageCount();
}
MessageQueue::LoopReturnType returnType = LoopReturnType::kRunOnce;
LoopReturnType returnType = LoopReturnType::kRunOnce;

while (true) {
Message* message = awaitDueMessage(loopType, onceMessageCount, returnType);
Expand Down
4 changes: 2 additions & 2 deletions src/utils/MessageQueue.h
Expand Up @@ -226,7 +226,7 @@ class MessageQueue {
std::condition_variable queueNotFullCondition_;
std::deque<Message*> queue_;
std::atomic_int32_t messageIdCounter_;
std::atomic_uint32_t workerCount_;
std::uint32_t workerCount_; // guard by queueMutex_
std::condition_variable workerQuitCondition_;

std::shared_ptr<Supervisor> supervisor_;
Expand Down Expand Up @@ -321,7 +321,7 @@ class MessageQueue {
/**
* @return if the MessageQueue is shut down or shutting down.
*/
bool isShutdown();
bool isShutdown() const;

/**
* causing current loopQueue() call return immediately.
Expand Down

0 comments on commit 9015681

Please sign in to comment.