Skip to content

Commit

Permalink
Merge commit '1110bbc38' into couchbase/master
Browse files Browse the repository at this point in the history
* commit '1110bbc38':
  MB-47851: Cancel any requests blocked on warmup if warmup stopped.

Change-Id: I867ecd2bd7bd5035abed5f24437ff98555c739dd
  • Loading branch information
rdemellow committed Sep 2, 2021
2 parents 7116424 + 1110bbc commit c599be4
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 77 deletions.
9 changes: 8 additions & 1 deletion engines/ep/src/ep_bucket.cc
Expand Up @@ -319,7 +319,6 @@ void EPBucket::initializeShards() {
std::vector<ExTask> EPBucket::deinitialize() {
stopFlusher();
stopBgFetcher();
stopWarmup();

auto ret = KVBucket::deinitialize();

Expand Down Expand Up @@ -2226,3 +2225,11 @@ Flusher* EPBucket::getOneFlusher() {
Expects(flushers.size() > 0);
return flushers.front().get();
}

void EPBucket::releaseBlockedCookies() {
KVBucket::releaseBlockedCookies();

// Stop warmup (if not yet completed) which will unblock any cookies which
// were held pending if they were received before populateVBucketMap phase.
stopWarmup();
}
2 changes: 2 additions & 0 deletions engines/ep/src/ep_bucket.h
Expand Up @@ -269,6 +269,8 @@ class EPBucket : public KVBucket {
// implemented by querying StorageProperties for the buckets KVStore
bool isByIdScanSupported() const override;

void releaseBlockedCookies() override;

bool canEvictFromReplicas() override {
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/ep_engine.cc
Expand Up @@ -6393,7 +6393,7 @@ void EventuallyPersistentEngine::initiate_shutdown() {

void EventuallyPersistentEngine::cancel_all_operations_in_ewb_state() {
auto eng = acquireEngine(this);
kvBucket->releaseRegisteredSyncWrites();
kvBucket->releaseBlockedCookies();
}

cb::mcbp::Status EventuallyPersistentEngine::stopFlusher(const char** msg,
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/kv_bucket.cc
Expand Up @@ -849,7 +849,7 @@ GetValue KVBucket::getReplica(const DocKey& key,
return getInternal(key, vbucket, cookie, ForGetReplicaOp::Yes, options);
}

void KVBucket::releaseRegisteredSyncWrites() {
void KVBucket::releaseBlockedCookies() {
for (size_t vbid = 0; vbid < vbMap.size; ++vbid) {
VBucketPtr vb = vbMap.getBucket(Vbid{gsl::narrow<uint16_t>(vbid)});
if (!vb) {
Expand Down
16 changes: 9 additions & 7 deletions engines/ep/src/kv_bucket.h
Expand Up @@ -287,16 +287,18 @@ class KVBucket : public KVBucketIface {
}

/**
* Release all cookies blocked for sync write
* Release all cookies blocked on pending requests (e.g. SyncWrites,
* requests waiting for warmup to complete).
*
* This method is called during bucket shutdown to make sure that
* all of the cookies waiting for a durable write is released so that
* we can continue bucket deletion. As part of bucket deletion one of
* the first things we do is to tear down the DCP streams so that
* the durable writes will never be notified and would be stuck
* waiting for a timeout if we don't explicitly release them.
* all of the cookies blocked waiting on a request to complete are released
* so that we can continue bucket deletion.
* For example, as part of bucket deletion one of the first things we do is
* to tear down the DCP streams so that SyncWrites will never be notified
* and would be stuck waiting for a timeout if we don't explicitly release
* them.
*/
void releaseRegisteredSyncWrites();
virtual void releaseBlockedCookies();

/**
* Sets the vbucket or creates a vbucket with the desired state
Expand Down
39 changes: 23 additions & 16 deletions engines/ep/src/warmup.cc
Expand Up @@ -1209,6 +1209,13 @@ void Warmup::stop() {
}
transition(WarmupState::State::Done, true);
done();

// If we haven't already completed populateVBucketMap step, then
// unblock (and cancel) any pending cookies so those connections don't
// get stuck.
// (On a normal, successful warmup these cookies would have already
// been notified when populateVBucketMap finished).
processCreateVBucketsComplete(cb::engine_errc::disconnect);
}

void Warmup::scheduleInitialize() {
Expand Down Expand Up @@ -1376,21 +1383,21 @@ void Warmup::createVBuckets(uint16_t shardId) {
}
}

void Warmup::processCreateVBucketsComplete() {
std::unique_lock<std::mutex> lock(pendingCookiesMutex);
createVBucketsComplete = true;
if (!pendingCookies.empty()) {
EP_LOG_INFO(
"Warmup::processCreateVBucketsComplete unblocking {} cookie(s)",
pendingCookies.size());
while (!pendingCookies.empty()) {
const CookieIface* c = pendingCookies.front();
pendingCookies.pop_front();
// drop lock to avoid lock inversion
lock.unlock();
store.getEPEngine().notifyIOComplete(c, cb::engine_errc::success);
lock.lock();
}
void Warmup::processCreateVBucketsComplete(cb::engine_errc status) {
PendingCookiesQueue toNotify;
{
std::unique_lock<std::mutex> lock(pendingCookiesMutex);
createVBucketsComplete = true;
pendingCookies.swap(toNotify);
}
if (toNotify.empty()) {
return;
}

EP_LOG_INFO("Warmup::processCreateVBucketsComplete unblocking {} cookie(s)",
toNotify.size());
for (const auto* c : toNotify) {
store.getEPEngine().notifyIOComplete(c, status);
}
}

Expand Down Expand Up @@ -1604,7 +1611,7 @@ void Warmup::populateVBucketMap(uint16_t shardId) {

warmedUpVbuckets.clear();
// Once we have populated the VBMap we can allow setVB state changes
processCreateVBucketsComplete();
processCreateVBucketsComplete(cb::engine_errc::success);
if (store.getItemEvictionPolicy() == EvictionPolicy::Value) {
transition(WarmupState::State::KeyDump);
} else {
Expand Down
7 changes: 5 additions & 2 deletions engines/ep/src/warmup.h
Expand Up @@ -17,6 +17,7 @@

#include <folly/AtomicHashMap.h>
#include <memcached/engine_common.h>
#include <memcached/engine_error.h>
#include <platform/atomic_duration.h>

#include <atomic>
Expand Down Expand Up @@ -272,8 +273,9 @@ class Warmup {
/**
* Perform any notifications to any pending setVBState operations and mark
* that vbucket creation is complete.
* @param status Status code to send to all waiting cookies.
*/
void processCreateVBucketsComplete();
void processCreateVBucketsComplete(cb::engine_errc status);

bool setOOMFailure() {
bool inverse = false;
Expand Down Expand Up @@ -445,7 +447,8 @@ class Warmup {
std::numeric_limits<size_t>::max()};

/// All of the cookies which need notifying when create-vbuckets is done
std::deque<const CookieIface*> pendingCookies;
using PendingCookiesQueue = std::deque<const CookieIface*>;
PendingCookiesQueue pendingCookies;
/// flag to mark once warmup is passed createVbuckets
bool createVBucketsComplete{false};
/// A mutex which gives safe access to the cookies and state flag
Expand Down
129 changes: 81 additions & 48 deletions engines/ep/tests/module_tests/evp_store_warmup_test.cc
Expand Up @@ -63,6 +63,8 @@ class WarmupTest : public SingleThreadedKVBucketTest {
engine.reset(nullptr);
ObjectRegistry::onSwitchThread(nullptr);
};

void testOperationsInterlockedWithWarmup(bool abortWarmup);
};

// Test that the FreqSaturatedCallback of a vbucket is initialized and after
Expand Down Expand Up @@ -246,9 +248,22 @@ TEST_F(WarmupTest, MB_27162) {
EXPECT_EQ(3, itemMeta.revSeqno);
}

// MB-25197 and MB-34422
// Some operations must block until warmup has loaded the vbuckets
TEST_F(WarmupTest, OperationsInterlockedWithWarmup) {
testOperationsInterlockedWithWarmup(false);
}

TEST_F(WarmupTest, OperationsInterlockedWithWarmupCancelled) {
testOperationsInterlockedWithWarmup(true);
}

/**
* MB-25197, MB-34422 and MB-47851.
* Some operations must block until warmup has loaded the vbuckets
* Two variants of test - either let warmup complete successfully
* (abortWarmup=false), or abort it before cookies would normally be notified.
* In both cases all cookies should be unblocked.
*/
void WarmupTest::testOperationsInterlockedWithWarmup(bool abortWarmup) {
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);

store_item(vbid, makeStoredDocKey("key1"), "value");
Expand Down Expand Up @@ -283,74 +298,92 @@ TEST_F(WarmupTest, OperationsInterlockedWithWarmup) {

};

while (engine->getKVBucket()->maybeWaitForVBucketWarmup(cookie)) {
CheckedExecutor executor(task_executor, readerQueue);
// Do a setVBState but don't flush it through. This call should be
// failed ewouldblock whilst warmup has yet to attempt to create VBs.
EXPECT_EQ(cb::engine_errc::would_block,
// Perform requests for each of the different calls which should return
// EWOULDBLOCK if called before populateVBucketMap has completed.

// Do a setVBState but don't flush it through. This call should be
// failed ewouldblock whilst warmup has yet to attempt to create VBs.
EXPECT_EQ(cb::engine_errc::would_block,
store->setVBucketState(vbid,
vbucket_state_active,
{},
TransferVB::No,
setVBStateCookie));

EXPECT_EQ(cb::engine_errc::would_block,
engine->get_failover_log(*getFailoverCookie,
1 /*opaque*/,
vbid,
fakeDcpAddFailoverLog));

EXPECT_EQ(cb::engine_errc::would_block,
engine->get_stats(*statsCookie1, "vbucket", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::would_block,
engine->get_stats(
*statsCookie2, "vbucket-details", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::would_block,
engine->get_stats(
*statsCookie3, "vbucket-seqno", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::would_block,
engine->deleteVBucket(*delVbCookie, vbid, true));

// Unblock cookies - either by aborting warmup, or advancing warmup
// far enough that the request can be issued successfully.
if (abortWarmup) {
engine->initiate_shutdown();
engine->cancel_all_operations_in_ewb_state();
} else {
// Per warmup advance normally, until we have completed
// populateVBucketMap.
while (engine->getKVBucket()->maybeWaitForVBucketWarmup(cookie)) {
CheckedExecutor executor(task_executor, readerQueue);
executor.runCurrentTask();
}
EXPECT_NE(nullptr, store->getVBuckets().getBucket(vbid));
}

// Should have received one more notification than started with, and
// with appropriate status code.
const auto expectedStatus = abortWarmup ? cb::engine_errc::disconnect
: cb::engine_errc::success;
for (const auto& n : notifications) {
EXPECT_EQ(n.second + 1, n.first->getNumIoNotifications());
EXPECT_EQ(expectedStatus, n.first->getStatus());
}

if (!abortWarmup) {
EXPECT_EQ(cb::engine_errc::success,
store->setVBucketState(vbid,
vbucket_state_active,
{},
TransferVB::No,
setVBStateCookie));

EXPECT_EQ(cb::engine_errc::would_block,
EXPECT_EQ(cb::engine_errc::success,
engine->get_failover_log(*getFailoverCookie,
1 /*opaque*/,
vbid,
fakeDcpAddFailoverLog));

EXPECT_EQ(
cb::engine_errc::would_block,
cb::engine_errc::success,
engine->get_stats(*statsCookie1, "vbucket", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::would_block,
EXPECT_EQ(cb::engine_errc::success,
engine->get_stats(
*statsCookie2, "vbucket-details", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::would_block,
EXPECT_EQ(cb::engine_errc::success,
engine->get_stats(
*statsCookie3, "vbucket-seqno", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::would_block,
engine->deleteVBucket(*delVbCookie, vbid, true));

executor.runCurrentTask();
}

for (const auto& [cookie, numIoNotifictions] : notifications) {
EXPECT_GT(cookie->getNumIoNotifications(), numIoNotifictions);
EXPECT_EQ(cb::engine_errc::success,
engine->deleteVBucket(*delVbCookie, vbid, false));
}

EXPECT_NE(nullptr, store->getVBuckets().getBucket(vbid));

EXPECT_EQ(cb::engine_errc::success,
store->setVBucketState(vbid,
vbucket_state_active,
{},
TransferVB::No,
setVBStateCookie));

EXPECT_EQ(cb::engine_errc::success,
engine->get_failover_log(*getFailoverCookie,
1 /*opaque*/,
vbid,
fakeDcpAddFailoverLog));

EXPECT_EQ(cb::engine_errc::success,
engine->get_stats(*statsCookie1, "vbucket", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::success,
engine->get_stats(
*statsCookie2, "vbucket-details", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::success,
engine->get_stats(
*statsCookie3, "vbucket-seqno", {}, dummyAddStats));

EXPECT_EQ(cb::engine_errc::success,
engine->deleteVBucket(*delVbCookie, vbid, false));

// finish warmup so the test can exit
while (engine->getKVBucket()->isWarmingUp()) {
CheckedExecutor executor(task_executor, readerQueue);
Expand Down
3 changes: 2 additions & 1 deletion engines/ep/tests/module_tests/kv_bucket_test.cc
Expand Up @@ -81,7 +81,8 @@ void KVBucketTest::SetUp() {

if (completeWarmup && engine->getKVBucket()->getWarmup()) {
engine->getKVBucket()->getWarmup()->setComplete();
engine->getKVBucket()->getWarmup()->processCreateVBucketsComplete();
engine->getKVBucket()->getWarmup()->processCreateVBucketsComplete(
cb::engine_errc::success);
}
}

Expand Down

0 comments on commit c599be4

Please sign in to comment.