Skip to content

Commit

Permalink
CCBC-1346: return LCB_ERR_DOCUMENT_LOCKED for locked documents
Browse files Browse the repository at this point in the history
Change-Id: Ibaad6c2565fab6747f2b048e4ab0f396c5e18931
Reviewed-on: http://review.couchbase.org/c/libcouchbase/+/141483
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
avsej committed Dec 16, 2020
1 parent d2d3666 commit 1b50f85
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/handler.cc
Expand Up @@ -202,6 +202,8 @@ lcb_STATUS lcb_map_error(lcb_INSTANCE *instance, int in)
return LCB_ERR_DURABLE_WRITE_RE_COMMIT_IN_PROGRESS;
case PROTOCOL_BINARY_RESPONSE_SYNC_WRITE_AMBIGUOUS:
return LCB_ERR_DURABILITY_AMBIGUOUS;
case PROTOCOL_BINARY_RESPONSE_LOCKED:
return LCB_ERR_DOCUMENT_LOCKED;
default:
if (instance != nullptr) {
return instance->callbacks.errmap(instance, in);
Expand Down
1 change: 1 addition & 0 deletions src/mcserver/mcserver.cc
Expand Up @@ -338,6 +338,7 @@ static bool is_fastpath_error(uint16_t rc)
case PROTOCOL_BINARY_RESPONSE_DURABILITY_IMPOSSIBLE:
case PROTOCOL_BINARY_RESPONSE_SYNC_WRITE_IN_PROGRESS:
case PROTOCOL_BINARY_RESPONSE_SYNC_WRITE_AMBIGUOUS:
case PROTOCOL_BINARY_RESPONSE_LOCKED:
return true;
default:
if (rc >= 0xc0 && rc <= 0xcc) {
Expand Down
203 changes: 203 additions & 0 deletions tests/iotests/t_get.cc
Expand Up @@ -768,3 +768,206 @@ TEST_F(GetUnitTest, testFailoverAndMultiGet)
lcb_wait(instance, LCB_WAIT_NOCHECK);
ASSERT_EQ(nbCallbacks, counter);
}

extern "C" {

struct pl_result {
lcb_STATUS status{LCB_ERR_GENERIC};
bool invoked{false};
uint64_t cas{0};
};

static void pl_store_callback(lcb_INSTANCE *instance, lcb_CALLBACK_TYPE, const lcb_RESPSTORE *resp)
{
pl_result *res = nullptr;
lcb_respstore_cookie(resp, (void **)&res);
res->invoked = true;
res->status = lcb_respstore_status(resp);
if (res->status == LCB_SUCCESS) {
lcb_respstore_cas(resp, &res->cas);
}
}

static void pl_get_callback(lcb_INSTANCE *instance, lcb_CALLBACK_TYPE, const lcb_RESPGET *resp)
{
pl_result *res = nullptr;
lcb_respget_cookie(resp, (void **)&res);
res->invoked = true;
res->status = lcb_respget_status(resp);
if (res->status == LCB_SUCCESS) {
lcb_respget_cas(resp, &res->cas);
}
}

static void pl_unlock_callback(lcb_INSTANCE *instance, lcb_CALLBACK_TYPE, const lcb_RESPUNLOCK *resp)
{
pl_result *res = nullptr;
lcb_respunlock_cookie(resp, (void **)&res);
res->invoked = true;
res->status = lcb_respunlock_status(resp);
if (res->status == LCB_SUCCESS) {
lcb_respunlock_cas(resp, &res->cas);
}
}
}

TEST_F(GetUnitTest, testPessimisticLock)
{
SKIP_IF_MOCK();
MockEnvironment *mock = MockEnvironment::getInstance();
HandleWrap hw;
lcb_INSTANCE *instance;
createConnection(hw, &instance);

lcb_install_callback(instance, LCB_CALLBACK_GET, reinterpret_cast<lcb_RESPCALLBACK>(pl_get_callback));
lcb_install_callback(instance, LCB_CALLBACK_STORE, reinterpret_cast<lcb_RESPCALLBACK>(pl_store_callback));
lcb_install_callback(instance, LCB_CALLBACK_UNLOCK, reinterpret_cast<lcb_RESPCALLBACK>(pl_unlock_callback));

std::string key("testPessimisticLock");

std::uint64_t cas{0};
{
pl_result res{};

std::string value{"foo"};
lcb_CMDSTORE *cmd = nullptr;
lcb_cmdstore_create(&cmd, LCB_STORE_UPSERT);
lcb_cmdstore_key(cmd, key.c_str(), key.size());
lcb_cmdstore_value(cmd, value.c_str(), value.size());
lcb_store(instance, &res, cmd);
lcb_cmdstore_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
cas = res.cas;
}
{
// lock and record CAS of the locked document
pl_result res{};

lcb_CMDGET *cmd = nullptr;
lcb_cmdget_create(&cmd);
lcb_cmdget_key(cmd, key.c_str(), key.size());
lcb_cmdget_locktime(cmd, 5);
lcb_get(instance, &res, cmd);
lcb_cmdget_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
ASSERT_NE(cas, res.cas);
cas = res.cas;
}
{
// real CAS is masked now and not visible by regular GET
pl_result res{};

lcb_CMDGET *cmd = nullptr;
lcb_cmdget_create(&cmd);
lcb_cmdget_key(cmd, key.c_str(), key.size());
lcb_get(instance, &res, cmd);
lcb_cmdget_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
ASSERT_NE(cas, res.cas);
}
{
// it is not allowed to lock the same key twice
pl_result res{};

lcb_CMDGET *cmd = nullptr;
lcb_cmdget_create(&cmd);
lcb_cmdget_key(cmd, key.c_str(), key.size());
lcb_cmdget_locktime(cmd, 5);
lcb_get(instance, &res, cmd);
lcb_cmdget_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_ERR_DOCUMENT_LOCKED, res.status);
}
{
// it is not allowed to mutate the locked key
pl_result res{};

std::string value{"foo"};
lcb_CMDSTORE *cmd = nullptr;
lcb_cmdstore_create(&cmd, LCB_STORE_UPSERT);
lcb_cmdstore_key(cmd, key.c_str(), key.size());
lcb_cmdstore_value(cmd, value.c_str(), value.size());
lcb_store(instance, &res, cmd);
lcb_cmdstore_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_ERR_DOCUMENT_LOCKED, res.status);
}
{
// but mutating the locked key is allowed with known cas
pl_result res{};

std::string value{"foo"};
lcb_CMDSTORE *cmd = nullptr;
lcb_cmdstore_create(&cmd, LCB_STORE_UPSERT);
lcb_cmdstore_key(cmd, key.c_str(), key.size());
lcb_cmdstore_value(cmd, value.c_str(), value.size());
lcb_cmdstore_cas(cmd, cas);
lcb_store(instance, &res, cmd);
lcb_cmdstore_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
}
{
pl_result res{};

lcb_CMDGET *cmd = nullptr;
lcb_cmdget_create(&cmd);
lcb_cmdget_key(cmd, key.c_str(), key.size());
lcb_cmdget_locktime(cmd, 5);
lcb_get(instance, &res, cmd);
lcb_cmdget_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
ASSERT_NE(cas, res.cas);
cas = res.cas;
}
{
// to unlock key without mutation, lcb_unlock might be used
pl_result res{};

std::string value{"foo"};
lcb_CMDUNLOCK *cmd = nullptr;
lcb_cmdunlock_create(&cmd);
lcb_cmdunlock_key(cmd, key.c_str(), key.size());
lcb_cmdunlock_cas(cmd, cas);
lcb_unlock(instance, &res, cmd);
lcb_cmdunlock_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
}
{
// now the key is not locked
pl_result res{};

std::string value{"foo"};
lcb_CMDSTORE *cmd = nullptr;
lcb_cmdstore_create(&cmd, LCB_STORE_UPSERT);
lcb_cmdstore_key(cmd, key.c_str(), key.size());
lcb_cmdstore_value(cmd, value.c_str(), value.size());
lcb_store(instance, &res, cmd);
lcb_cmdstore_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);

ASSERT_TRUE(res.invoked);
ASSERT_EQ(LCB_SUCCESS, res.status);
}
}
14 changes: 11 additions & 3 deletions tests/iotests/t_lock.cc
Expand Up @@ -181,7 +181,11 @@ TEST_F(LockUnitTest, testUnlockMissingCas)
if (CLUSTER_VERSION_IS_HIGHER_THAN(MockEnvironment::VERSION_50)) {
ASSERT_EQ(LCB_ERR_KVENGINE_INVALID_PACKET, reserr);
} else {
ASSERT_EQ(LCB_ERR_TEMPORARY_FAILURE, reserr);
if (MockEnvironment::getInstance()->isRealCluster()) {
ASSERT_EQ(LCB_ERR_DOCUMENT_LOCKED, reserr);
} else {
ASSERT_EQ(LCB_ERR_TEMPORARY_FAILURE, reserr);
}
}
}

Expand Down Expand Up @@ -247,7 +251,7 @@ TEST_F(LockUnitTest, testStorageLockContention)
Item s_itm;
ASSERT_EQ(LCB_SUCCESS, lcb_store(instance, &s_itm, scmd));
lcb_wait(instance, LCB_WAIT_DEFAULT);
ASSERT_EQ(LCB_ERR_DOCUMENT_EXISTS, s_itm.err);
ASSERT_EQ(LCB_ERR_DOCUMENT_LOCKED, s_itm.err);

/* verify the value is still the old value */
Item ritem;
Expand Down Expand Up @@ -316,7 +320,11 @@ TEST_F(LockUnitTest, testUnlLockContention)
uint64_t validCas = gitm.cas;
ASSERT_EQ(LCB_SUCCESS, lcb_get(instance, &gitm, gcmd));
lcb_wait(instance, LCB_WAIT_DEFAULT);
ASSERT_EQ(LCB_ERR_TEMPORARY_FAILURE, gitm.err);
if (MockEnvironment::getInstance()->isRealCluster()) {
ASSERT_EQ(LCB_ERR_DOCUMENT_LOCKED, gitm.err);
} else {
ASSERT_EQ(LCB_ERR_TEMPORARY_FAILURE, gitm.err);
}
lcb_cmdget_destroy(gcmd);

lcb_CMDUNLOCK *ucmd;
Expand Down

0 comments on commit 1b50f85

Please sign in to comment.