Skip to content

Commit

Permalink
Use unbounded queue in NotificationQueue (same as in D7164130 but wit…
Browse files Browse the repository at this point in the history
…h the hazptr fix from D7256905 and the singleton one from D7283390)

Summary: Use unbounded queue in NotificationQueue (same as in D7164130 but with the hazptr fix from D7256905 and the singleton one from D7283390)

Reviewed By: yfeldblum

Differential Revision: D7278275

fbshipit-source-id: 78ea94e1c9d492febd33106b06ce275d4a90a56a
  • Loading branch information
dmm-fb authored and facebook-github-bot committed Mar 19, 2018
1 parent a2aff3b commit 2f9ee5b
Showing 1 changed file with 40 additions and 132 deletions.
172 changes: 40 additions & 132 deletions folly/io/async/NotificationQueue.h
Expand Up @@ -19,7 +19,6 @@
#include <sys/types.h>

#include <algorithm>
#include <deque>
#include <iterator>
#include <memory>
#include <stdexcept>
Expand All @@ -28,8 +27,10 @@
#include <folly/Exception.h>
#include <folly/FileUtil.h>
#include <folly/Likely.h>
#include <folly/Optional.h>
#include <folly/ScopeGuard.h>
#include <folly/SpinLock.h>
#include <folly/concurrency/UnboundedQueue.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
Expand Down Expand Up @@ -76,9 +77,9 @@ class NotificationQueue {
enum : uint16_t { kDefaultMaxReadAtOnce = 10 };

Consumer()
: queue_(nullptr),
destroyedFlagPtr_(nullptr),
maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
: queue_(nullptr),
destroyedFlagPtr_(nullptr),
maxReadAtOnce_(kDefaultMaxReadAtOnce) {}

// create a consumer in-place, without the need to build new class
template <typename TCallback>
Expand Down Expand Up @@ -186,25 +187,6 @@ class NotificationQueue {
* (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
*/
void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;

void setActive(bool active, bool shouldLock = false) {
if (!queue_) {
active_ = active;
return;
}
if (shouldLock) {
queue_->spinlock_.lock();
}
if (!active_ && active) {
++queue_->numActiveConsumers_;
} else if (active_ && !active) {
--queue_->numActiveConsumers_;
}
active_ = active;
if (shouldLock) {
queue_->spinlock_.unlock();
}
}
void init(EventBase* eventBase, NotificationQueue* queue);

NotificationQueue* queue_;
Expand All @@ -217,12 +199,9 @@ class NotificationQueue {
class SimpleConsumer {
public:
explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
++queue_.numConsumers_;
}

~SimpleConsumer() {
--queue_.numConsumers_;
}
~SimpleConsumer() = default;

int getFd() const {
return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
Expand Down Expand Up @@ -410,23 +389,18 @@ class NotificationQueue {

checkPid();

folly::SpinLockGuard g(spinlock_);

if (UNLIKELY(queue_.empty())) {
folly::Optional<Pair> data = queue_.try_dequeue();
if (!data) {
return false;
}

auto& data = queue_.front();
result = std::move(data.first);
RequestContext::setContext(std::move(data.second));

queue_.pop_front();
result = std::move(data.value().first);
RequestContext::setContext(std::move(data.value().second));

return true;
}

size_t size() const {
folly::SpinLockGuard g(spinlock_);
return queue_.size();
}

Expand All @@ -451,7 +425,6 @@ class NotificationQueue {
NotificationQueue& operator=(NotificationQueue const &) = delete;

inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
DCHECK(0 == spinlock_.try_lock());
if (maxSize > 0 && queue_.size() >= maxSize) {
if (throws) {
throw std::overflow_error("unable to add message to NotificationQueue: "
Expand All @@ -463,10 +436,11 @@ class NotificationQueue {
}

inline bool checkDraining(bool throws=true) {
if (UNLIKELY(draining_ && throws)) {
auto draining = draining_.load(std::memory_order_relaxed);
if (UNLIKELY(draining && throws)) {
throw std::runtime_error("queue is draining, cannot add message");
}
return draining_;
return draining;
}

#ifdef __ANDROID__
Expand Down Expand Up @@ -518,6 +492,10 @@ class NotificationQueue {
}

void drainSignalsLocked() {
if (!signal_) {
return;
}

ssize_t bytes_read = 0;
if (eventfd_ > 0) {
uint64_t message;
Expand Down Expand Up @@ -566,47 +544,25 @@ class NotificationQueue {
template <typename MessageTT>
bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
checkPid();
bool signal = false;
{
folly::SpinLockGuard g(spinlock_);
if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
return false;
}
// We only need to signal an event if not all consumers are
// awake.
if (numActiveConsumers_ < numConsumers_) {
signal = true;
}
queue_.emplace_back(
std::forward<MessageTT>(message), RequestContext::saveContext());
if (signal) {
ensureSignalLocked();
}
if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
return false;
}
queue_.enqueue(std::make_pair(
std::forward<MessageTT>(message), RequestContext::saveContext()));
ensureSignal();
return true;
}

template <typename InputIteratorT>
void putMessagesImpl(InputIteratorT first, InputIteratorT last,
std::input_iterator_tag) {
checkPid();
bool signal = false;
size_t numAdded = 0;
{
folly::SpinLockGuard g(spinlock_);
checkDraining();
while (first != last) {
queue_.emplace_back(*first, RequestContext::saveContext());
++first;
++numAdded;
}
if (numActiveConsumers_ < numConsumers_) {
signal = true;
}
if (signal) {
ensureSignalLocked();
}
checkDraining();
while (first != last) {
queue_.enqueue(std::make_pair(*first, RequestContext::saveContext()));
++first;
}
ensureSignal();
}

mutable folly::SpinLock spinlock_;
Expand All @@ -615,10 +571,9 @@ class NotificationQueue {
int pipeFds_[2]; // to fallback to on older/non-linux systems
uint32_t advisoryMaxQueueSize_;
pid_t pid_;
std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
int numConsumers_{0};
std::atomic<int> numActiveConsumers_{0};
bool draining_{false};
using Pair = std::pair<MessageT, std::shared_ptr<RequestContext>>;
UMPMCQueue<Pair, false> queue_;
std::atomic<bool> draining_{false};
};

template <typename MessageT>
Expand All @@ -645,56 +600,29 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
bool isDrain, size_t* numConsumed) noexcept {
DestructorGuard dg(this);
uint32_t numProcessed = 0;
setActive(true);
SCOPE_EXIT {
if (queue_) {
queue_->syncSignalAndQueue();
}
};
SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
SCOPE_EXIT {
if (numConsumed != nullptr) {
*numConsumed = numProcessed;
}
};
while (true) {
// Now pop the message off of the queue.
//
// We have to manually acquire and release the spinlock here, rather than
// using SpinLockHolder since the MessageT has to be constructed while
// holding the spinlock and available after we release it. SpinLockHolder
// unfortunately doesn't provide a release() method. (We can't construct
// MessageT first since we have no guarantee that MessageT has a default
// constructor.
queue_->spinlock_.lock();
bool locked = true;

try {
if (UNLIKELY(queue_->queue_.empty())) {
// If there is no message, we've reached the end of the queue, return.
setActive(false);
queue_->spinlock_.unlock();
// Pull a message off the queue.
folly::Optional<Pair> data = queue_->queue_.try_dequeue();
if (!data) {
return;
}

// Pull a message off the queue.
auto& data = queue_->queue_.front();

MessageT msg(std::move(data.first));
RequestContextScopeGuard rctx(std::move(data.second));
queue_->queue_.pop_front();
MessageT msg(std::move(data.value().first));
RequestContextScopeGuard rctx(std::move(data.value().second));

// Check to see if the queue is empty now.
// We use this as an optimization to see if we should bother trying to
// loop again and read another message after invoking this callback.
bool wasEmpty = queue_->queue_.empty();
if (wasEmpty) {
setActive(false);
}

// Now unlock the spinlock before we invoke the callback.
queue_->spinlock_.unlock();
locked = false;

// Call the callback
bool callbackDestroyed = false;
Expand Down Expand Up @@ -725,10 +653,11 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
// looping again and trying to re-read from the eventfd. (If a new
// message had in fact arrived while we were invoking the callback, we
// will simply be woken up the next time around the event loop and will
// process the message then.)
// // process the message then.)
if (wasEmpty) {
return;
}

} catch (const std::exception&) {
// This catch block is really just to handle the case where the MessageT
// constructor throws. The messageAvailable() callback itself is
Expand All @@ -740,10 +669,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
// trying to read the message again. If MessageT continues to throw we
// will never make forward progress and will keep trying each time around
// the event loop.
if (locked) {
// Unlock the spinlock.
queue_->spinlock_.unlock();
}

return;
}
Expand All @@ -763,10 +688,6 @@ void NotificationQueue<MessageT>::Consumer::init(

queue_ = queue;

{
folly::SpinLockGuard g(queue_->spinlock_);
queue_->numConsumers_++;
}
queue_->ensureSignal();

if (queue_->eventfd_ >= 0) {
Expand All @@ -783,12 +704,6 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
return;
}

{
folly::SpinLockGuard g(queue_->spinlock_);
queue_->numConsumers_--;
setActive(false);
}

assert(isHandlerRegistered());
unregisterHandler();
detachEventBase();
Expand All @@ -799,18 +714,11 @@ template <typename MessageT>
bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
size_t* numConsumed) noexcept {
DestructorGuard dg(this);
{
folly::SpinLockGuard g(queue_->spinlock_);
if (queue_->draining_) {
return false;
}
queue_->draining_ = true;
if (queue_->draining_.exchange(true, std::memory_order_relaxed)) {
return false;
}
consumeMessages(true, numConsumed);
{
folly::SpinLockGuard g(queue_->spinlock_);
queue_->draining_ = false;
}
queue_->draining_.store(false, std::memory_order_relaxed);
return true;
}

Expand Down

0 comments on commit 2f9ee5b

Please sign in to comment.