Skip to content

Commit

Permalink
Fix dynamic MPMCQueue tryObtainPromisedPushTicket() to prevent tryWri…
Browse files Browse the repository at this point in the history
…teUntil() and writeIfNotFull() from blocking indefinitely for a matching read.

Summary:
The bug was reported by Alexander Pronchenkov in https://fb.facebook.com/groups/560979627394613/permalink/837052843120622/

Under certain conditions a `tryWriteUntil()`--and also `writeIfNotFull()`--operation may block indefinitely awaiting a matching read. This could happen because in each dynamic MPMCQueue expansion, typically one or two tickets are associated with the closed array not the new one. In the incorrect code, a `tryWriteUntil()` operation that induced expansion but gets a ticket associated with the closed array, incorrectly assumes that because the expansion succeeded then there is space for it. However because the ticket is associated with the closed array, the operation needs to wait (possibly indefinitely) for space to open in the closed array.

The fix: Changed the code in tryObtainPromisedPushTicket() such that the operation tries to acquire a ticket only if there is promised space in the array associated with that ticket. If there is no space, an expansion is attempted if the ticket is not associated with a closed array. If not or if expansion fails because of reaching maximum capacity or for being out-of-memory, then the operation returns false without attempting to acquire the ticket.

Other changes:
- Added a note about this difference in semantic between the dynamic and non-dynamic version to the main comment about the dynamic version.
- Changed `oldCap` to `curCap` because the value is actually current not old.
- Added two tests for checking that tryWriteUntil() never blocks indefinitely for both dynamic and non-dynamic versions.
- Removed all the `never_fail` tests for the dynamic version, because such operations may fails as described above.
- Added `asm_volatile_pause` when spinning on the seqlock.

Reviewed By: djwatson

Differential Revision: D4389347

fbshipit-source-id: c46dbefc9fe08e146250d2ad8ba68b0887f97436
  • Loading branch information
magedm authored and facebook-github-bot committed Jan 10, 2017
1 parent 410f652 commit 232f650
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 45 deletions.
36 changes: 22 additions & 14 deletions folly/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
/// call to blockingWrite() when the queue size is known to be equal
/// to its capacity.
///
/// Note that some writeIfNotFull() and tryWriteUntil() operations may
/// fail even if the size of the queue is less than its maximum
/// capacity and despite the success of expansion, if the operation
/// happens to acquire a ticket that belongs to a closed array. This
/// is a transient condition. Typically, one or two ticket values may
/// be subject to such condition per expansion.
///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template<typename> class Atom>
Expand Down Expand Up @@ -264,6 +271,7 @@ class MPMCQueue<T,Atom,true> :
uint64_t offset;
do {
if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}
if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
Expand Down Expand Up @@ -302,7 +310,9 @@ class MPMCQueue<T,Atom,true> :
int stride;
uint64_t state;
uint64_t offset;
while (!trySeqlockReadSection(state, slots, cap, stride));
while (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
}
// If there was an expansion after the corresponding push ticket
// was issued, adjust accordingly
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
Expand Down Expand Up @@ -344,6 +354,7 @@ class MPMCQueue<T,Atom,true> :
do {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}

Expand Down Expand Up @@ -388,31 +399,26 @@ class MPMCQueue<T,Atom,true> :
ticket = this->pushTicket_.load(std::memory_order_acquire);
auto numPops = this->popTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}

const auto oldCap = cap;
const auto curCap = cap;
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);

int64_t n = ticket - numPops;
if (n >= static_cast<ssize_t>(this->capacity_)) {
ticket -= offset;
return false;
}

if (n >= static_cast<ssize_t>(oldCap)) {
if (tryExpand(state, oldCap)) {
// This or another thread started an expansion. Start over
// with a new state.
if (n >= static_cast<ssize_t>(cap)) {
if ((cap == curCap) && tryExpand(state, cap)) {
// This or another thread started an expansion. Start over.
continue;
} else {
// Can't expand.
ticket -= offset;
return false;
}
// Can't expand.
ticket -= offset;
return false;
}

if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
Expand All @@ -430,6 +436,7 @@ class MPMCQueue<T,Atom,true> :
do {
ticket = this->popTicket_.load(std::memory_order_relaxed);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}

Expand Down Expand Up @@ -459,6 +466,7 @@ class MPMCQueue<T,Atom,true> :
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}

Expand Down
54 changes: 23 additions & 31 deletions folly/test/MPMCQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,30 +772,23 @@ void runMtNeverFail(std::vector<int>& nts, int n) {
}
}

// All the never_fail tests are for the non-dynamic version only.
// False positive for dynamic version. Some writeIfNotFull() and
// tryWriteUntil() operations may fail in transient conditions related
// to expansion.

TEST(MPMCQueue, mt_never_fail) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<std::atomic>(nts, n);
}

TEST(MPMCQueue, mt_never_fail_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<std::atomic, true>(nts, n);
}

TEST(MPMCQueue, mt_never_fail_emulated_futex) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic>(nts, n);
}

TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
}

template<bool Dynamic = false>
void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
LOG(INFO) << "using seed " << seed;
Expand All @@ -818,13 +811,6 @@ TEST(MPMCQueue, mt_never_fail_deterministic) {
runMtNeverFailDeterministic(nts, n, seed);
}

TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
std::vector<int> nts {3, 10};
long seed = 0; // nowMicro() % 10000;
int n = 1000;
runMtNeverFailDeterministic<true>(nts, n, seed);
}

template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
Expand Down Expand Up @@ -889,12 +875,6 @@ TEST(MPMCQueue, mt_never_fail_until_system) {
runMtNeverFailUntilSystem(nts, n);
}

TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFailUntilSystem<true>(nts, n);
}

template <bool Dynamic = false>
void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
Expand All @@ -911,12 +891,6 @@ TEST(MPMCQueue, mt_never_fail_until_steady) {
runMtNeverFailUntilSteady(nts, n);
}

TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFailUntilSteady<true>(nts, n);
}

enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
Expand Down Expand Up @@ -1249,3 +1223,21 @@ TEST(MPMCQueue, try_write_until) {
TEST(MPMCQueue, try_write_until_dynamic) {
testTryWriteUntil<true>();
}

template <bool Dynamic>
void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
CHECK(q.write(1));
/* The following must not block forever */
q.tryWriteUntil(
std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
}

TEST(MPMCQueue, try_write_until_timeout) {
folly::MPMCQueue<int, std::atomic, false> queue(1);
testTimeout<false>(queue);
}

TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
testTimeout<true>(queue);
}

0 comments on commit 232f650

Please sign in to comment.