Skip to content

Commit

Permalink
MB-54591: ActiveStream: Avoid lost wakeup due to race on itemsReady
Browse files Browse the repository at this point in the history
+Summary+

There exists a race in ActiveStream between a notifying that a new
mutation available, and processing the previous mutation(s). If the
notification occurs just before ActiveStream sets a flag indicating
that there's currently no more items to process, then the notification
is lost and the DCP stream is not notified of this new mutation when
it should be.

This results in the DCP stream being behind by that seqno, until the
the next mutation on the vBucket occurs with will re-notify the stream
and cause it to process both.

+Impact+

When it occurs, this bug introduces additional latency between a
mutation occurring on the KV node, and it getting passed to the
affected DCP client. For DCP connections which require a "current" set
of seqnos to proceed - e.g. GSI when using request_plus queries, this
can manifest as those operations hanging.

Note the latency added is bounded by the mutation rate on the vBucket
- the next mutation is guaranteed to wake up the stream (as it is now
idle and there's no possible race). For workloads with moderate
mutation rates (e.g. 1000 mutations per second per Bucket), the
additional latency on average should be 1s. However for workloads with
very low mutation rates (<100 per second) then on average the latency
would be 10+ seconds.

For workloads which are intermittent and idle for extended periods,
the added latency could be large - as long as it takes for that
vBucket to be modified again.

+Details+

The race is between the itemsReady flag being tested in
notifyStreamReady, and clearing itemsReady when there are no more
items available for the stream in ActiveStream::next(). Flag is tested
here:

    void ActiveStream::notifyStreamReady(bool force, DcpProducer* producer, uint64_t seqno) {
        bool inverse = false;
        if (force || itemsReady.compare_exchange_strong(inverse, true)) {

            if (seqno) {
                itemsReadySeqnosForNotify.lock()->push_back(seqno);
            }
    ...

And it is set/reset here:

    std::unique_ptr<DcpResponse> ActiveStream::next(DcpProducer& producer) {
        std::unique_ptr<DcpResponse> response;
        switch (state_.load()) {
        ...
        case StreamState::InMemory:
            response = inMemoryPhase(producer);
            break;
        ...
        }

       itemsReady.store(response ? true : false);
       return response;
   }

We can lose a notification (from ActiveStream::notifyStreamReady) from
one front-end thread (performing a set) if the notification occurs
while the DCP thread is executing ActiveStream::next() and has already
called inMemoryPhase() and found the CheckpointManger empty - but
*before* we clear itemsReady at the end of the function.

To fix the lost wakeup we clear itemsReady _before_ we fetch the next
response - and only set it back to true (inhibiting notifies) if we
find we have at least one more item.

This potentially swaps lost wakeups for unnecessary wakeups (i.e. if a
notifyStreamReady() occurs in ActiveStream::next() between clearing
itemsReady and re-setting it at the end) - but correctness >
performance.

Change-Id: I3ed9ff1abfe654b8ced4e9c8d084bd0ddaa668ee
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/182929
Well-Formed: Restriction Checker
Reviewed-by: Ben Huddleston <ben.huddleston@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
daverigby committed Nov 21, 2022
1 parent 3e1d1cb commit 8d32d24
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 5 deletions.
20 changes: 18 additions & 2 deletions engines/ep/src/dcp/active_stream.cc
Expand Up @@ -152,6 +152,16 @@ std::unique_ptr<DcpResponse> ActiveStream::next() {

std::unique_ptr<DcpResponse> ActiveStream::next(
std::lock_guard<std::mutex>& lh) {

// Clear notification flag before checking for a response, as if there was
// nothing available when we checked, we want to be notified again when
// more items are available. We do this to avoid a lost wake-up, in the
// event we are notified about a new seqno just after we have found
// no response is ready.
// Note however this does mean we can get spurious wakeups between here
// and when we set itemsReady at the end of this function.
itemsReady.store(false);

std::unique_ptr<DcpResponse> response;

switch (state_.load()) {
Expand All @@ -175,10 +185,16 @@ std::unique_ptr<DcpResponse> ActiveStream::next(
}

if (nextHook) {
nextHook();
nextHook(response.get());
}

itemsReady.store(response ? true : false);
// We have at least one response, and hence will call next() at least one
// more time (a null response is used to indicate the Stream has no items
// currently available) - as such set the itemsReady flag to avoid
// unnecessary notifications - we know we need to check again.
if (response) {
itemsReady.store(true);
}
return response;
}

Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/dcp/active_stream.h
Expand Up @@ -426,7 +426,7 @@ class ActiveStream : public Stream,

// MB-37468: Test only hooks set via Mock class
std::function<void()> completeBackfillHook;
std::function<void()> nextHook;
std::function<void(const DcpResponse*)> nextHook;

// Whether the responses sent using this stream should contain the body
const IncludeValue includeValue;
Expand Down
3 changes: 3 additions & 0 deletions engines/ep/src/dcp/stream.h
Expand Up @@ -138,6 +138,9 @@ class Stream {
uint64_t snap_start_seqno_;
uint64_t snap_end_seqno_;

// Notification flag set to true by notifyStreamReady() (and next()) if
// there are new item(s) waiting to be processed by this Stream. Cleared
// once the Stream wants to be notified again about new seqnos.
std::atomic<bool> itemsReady;
std::mutex streamMutex;

Expand Down
2 changes: 1 addition & 1 deletion engines/ep/tests/mock/mock_stream.h
Expand Up @@ -163,7 +163,7 @@ class MockActiveStream : public ActiveStream {
completeBackfillHook = hook;
}

void setNextHook(std::function<void()> hook) {
void setNextHook(std::function<void(const DcpResponse*)> hook) {
nextHook = hook;
}

Expand Down
122 changes: 121 additions & 1 deletion engines/ep/tests/module_tests/dcp_stream_test.cc
Expand Up @@ -43,11 +43,13 @@
#include "../mock/mock_dcp_consumer.h"
#include "../mock/mock_dcp_producer.h"
#include "../mock/mock_stream.h"
#include "../mock/gmock_dcp_msg_producers.h"
#include "../mock/mock_synchronous_ep_engine.h"
#include "checkpoint_utils.h"

#include "engines/ep/tests/mock/mock_dcp_conn_map.h"
#include <engines/ep/tests/mock/mock_dcp_backfill_mgr.h>
#include <folly/synchronization/Baton.h>
#include <programs/engine_testapp/mock_server.h>
#include <xattr/blob.h>
#include <xattr/utils.h>
Expand Down Expand Up @@ -2855,7 +2857,7 @@ TEST_P(SingleThreadedActiveStreamTest, CompleteBackfillRaceNoStreamEnd) {
EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
EXPECT_EQ(cb::mcbp::ClientOpcode::DcpMutation, producers.last_op);

stream->setNextHook([&tg1, &tg2]() {
stream->setNextHook([&tg1, &tg2](const DcpResponse* response) {
if (!tg1.isComplete()) {
tg1.threadUp();

Expand Down Expand Up @@ -2897,6 +2899,124 @@ TEST_P(SingleThreadedActiveStreamTest, CompleteBackfillRaceNoStreamEnd) {
EXPECT_TRUE(producer->getReadyQueue().empty());
}

// MB-54591: An ActiveStream can lose a notification of a new seqno if
// the notification occurs while the frontend DCP thread is finishing processing
// the previous item(s) via ActiveStream::next(). Specifically if
// notifyStreamReady() is called before itemsReady is cleared at the end of
// ActiveStream::next().
// This results in the DCP stream not waking and not sending out the affected
// seqno(s) until another mutation for that vBucket occurs.
TEST_P(SingleThreadedActiveStreamTest,
RaceBetweenNotifyAndProcessingExistingItems) {

// Replace initial stream with one registered with DCP producer.
auto vb = engine->getVBucket(vbid);
startCheckpointTask();
stream = producer->mockActiveStreamRequest(0,
/*opaque*/ 0,
*vb,
/*st_seqno*/ 0,
/*en_seqno*/ ~0,
/*vb_uuid*/ 0xabcd,
/*snap_start_seqno*/ 0,
/*snap_end_seqno*/ ~0);
auto& connMap = static_cast<MockDcpConnMap&>(engine->getDcpConnMap());
connMap.addConn(cookie, producer);
connMap.addVBConnByVBId(producer, vbid);

// Add an initial item which we will correctly process.
store_item(vbid, makeStoredDocKey("key1"), "value");

// step() the producer to schedule ActiveStreamCheckpointProcessorTask and
// run it once to process the items from CkptManager into the Streams' readyQ.
GMockDcpMsgProducers producers;
ASSERT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));
auto& nonIO = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
runNextTask(nonIO,
"Process checkpoint(s) for DCP producer "
"test_producer->test_consumer");

// Setup Mock Producer expectations - we should see two snapshot
// markers with one mutation each:
{
::testing::InSequence dummy;
using ::testing::_;
using ::testing::Return;

EXPECT_CALL(producers, marker(_, vbid, _, _, _, _, _, _))
.WillOnce(Return(ENGINE_SUCCESS));

EXPECT_CALL(producers, mutation(_, _, vbid, /*seqno*/ 1, _, _, _, _))
.WillOnce(Return(ENGINE_SUCCESS));

EXPECT_CALL(producers, marker(_, vbid, _, _, _, _, _, _))
.WillOnce(Return(ENGINE_SUCCESS));

EXPECT_CALL(producers, mutation(_, _, vbid, /*seqno*/ 2, _, _, _, _))
.WillOnce(Return(ENGINE_SUCCESS));
}

// Step DCP producer twice to generate the initial snapshot marker and
// mutation to "key1"
ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));

// Step again - but this time configure a callback in ActiveStream::next()
// which will perform another front-end store(). This _should_ result in
// ActiveStream::notifyStreamReady() notifying the Producer via
// Producer::notifyStreamReady() and waking up the front-end again - but in
// the case of the bug this wakeup was missed.

// Note we must perform the store() on a different thread (instead of
// directly inside the hook) otherwise we will encounter lock inversions.
folly::Baton<> baton;
auto frontEndThread = std::thread([&] {
baton.wait();
store_item(vbid, makeStoredDocKey("key2"), "value");
});

bool extraStoreIssued = false;
stream->setNextHook([&](const DcpResponse* response) {
// Only want to add one extra mutation.
if (extraStoreIssued) {
return;
}
EXPECT_FALSE(response)
<< "ActiveStream::next() hook expected to only be called for "
"nullptr response when all previous items processed.";

baton.post();
frontEndThread.join();
extraStoreIssued = true;
});

// Call step - this calls ActiveStream::next() which initially returns
// nullptr as CkptManager has no more items, but our callback above adds
// another mutation to CM at the end of ActiveStream::next(). With the bug
// we miss the wakeup and producer->step() returns without scheduling
// any more work - so runNextTask below fails.
// With the bug fixed it will call ActiveStream::next() again, spot there's
// a new item in CM and schedule the ActiveStreamCheckpointProcessorTask.
ASSERT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));

runNextTask(nonIO,
"Process checkpoint(s) for DCP producer "
"test_producer->test_consumer");

// Once the task has run then it should have notified the producer again.
EXPECT_TRUE(producer->getReadyQueue().exists(vbid));

// Step the producer to consume the second snapshot marker and key2.
// Should finish with would_block to indicate no more data ready.
EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
EXPECT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));

// Cleanup
connMap.removeVBConnByVBId(cookie, vbid);
connMap.removeConn(cookie);
}

void SingleThreadedActiveStreamTest::testProducerIncludesUserXattrsInDelete(
const boost::optional<cb::durability::Requirements>& durReqs) {
using DcpOpenFlag = cb::mcbp::request::DcpOpenPayload;
Expand Down

0 comments on commit 8d32d24

Please sign in to comment.