Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added support for TOUCH and GAT (get and touch)

Change-Id: If59137732bbdf4667bd0dfed3b8b21fca23b511d
Reviewed-on: http://review.membase.org/4972
Tested-by: Steve Yen <steve.yen@gmail.com>
Reviewed-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information...
commit 5dc5782fe16c24778e75a805c10a29305e98b2cb 1 parent fa33f86
@trondn trondn authored steveyen committed
View
56 ep.cc
@@ -861,6 +861,62 @@ GetValue EventuallyPersistentStore::get(const std::string &key,
}
}
+GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
+ uint16_t vbucket,
+ const void *cookie,
+ bool queueBG,
+ uint32_t exptime)
+{
+ RCPtr<VBucket> vb = getVBucket(vbucket);
+ if (!vb) {
+ ++stats.numNotMyVBuckets;
+ return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
+ } else if (vb->getState() == vbucket_state_dead) {
+ ++stats.numNotMyVBuckets;
+ return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
+ } else if (vb->getState() == vbucket_state_active) {
+ // OK
+ } else if (vb->getState() == vbucket_state_replica) {
+ ++stats.numNotMyVBuckets;
+ return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
+ } else if (vb->getState() == vbucket_state_pending) {
+ if (vb->addPendingOp(cookie)) {
+ return GetValue(NULL, ENGINE_EWOULDBLOCK);
+ }
+ }
+
+ int bucket_num(0);
+ LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
+ StoredValue *v = fetchValidValue(vb, key, bucket_num);
+
+ if (v) {
+ v->setExptime(engine.getServerApi()->core->realtime(exptime));
+ // If the value is not resident, wait for it...
+ if (!v->isResident()) {
+ if (queueBG) {
+ bgFetch(key, vbucket, vbuckets.getBucketVersion(vbucket),
+ v->getId(), cookie);
+ return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getId());
+ } else {
+ // You didn't want the item anyway...
+ return GetValue(NULL, ENGINE_SUCCESS, v->getId());
+ }
+ }
+
+ // return an invalid cas value if the item is locked
+ uint64_t icas = v->isLocked(ep_current_time())
+ ? static_cast<uint64_t>(-1)
+ : v->getCas();
+ GetValue rv(new Item(v->getKey(), v->getFlags(), v->getExptime(),
+ v->getValue(), icas, v->getId(), vbucket),
+ ENGINE_SUCCESS, v->getId());
+ return rv;
+ } else {
+ GetValue rv;
+ return rv;
+ }
+}
+
ENGINE_ERROR_CODE
EventuallyPersistentStore::getFromUnderlying(const std::string &key,
uint16_t vbucket,
View
14 ep.hh
@@ -451,6 +451,20 @@ public:
bool honorStates=true);
/**
+ * Retrieve a value, but update its TTL first
+ *
+ * @param key the key to fetch
+ * @param vbucket the vbucket from which to retrieve the key
+ * @param cookie the connection cookie
+ * @param queueBG if true, automatically queue a background fetch if necessary
+ * @param exptime the new expiry time for the object
+ *
+ * @return a GetValue representing the result of the request
+ */
+ GetValue getAndUpdateTtl(const std::string &key, uint16_t vbucket,
+ const void *cookie, bool queueBG, uint32_t exptime);
+
+ /**
* Retrieve an item from the disk for vkey stats
*
* @param key the key to fetch
View
68 ep_engine.cc
@@ -577,6 +577,11 @@ extern "C" {
}
break;
+ case PROTOCOL_BINARY_CMD_TOUCH:
+ case PROTOCOL_BINARY_CMD_GAT:
+ case PROTOCOL_BINARY_CMD_GATQ:
+ return h->touch(cookie, request, response);
+
case CMD_STOP_PERSISTENCE:
res = stopFlusher(h, &msg);
break;
@@ -2836,3 +2841,66 @@ void EventuallyPersistentEngine::notifyTapIoThread(void) {
tapConnMap.wait(1.0);
}
}
+
+ENGINE_ERROR_CODE EventuallyPersistentEngine::touch(const void *cookie,
+ protocol_binary_request_header *request,
+ ADD_RESPONSE response)
+{
+ if (request->request.extlen != 4 || request->request.keylen == 0) {
+ if (response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie)) {
+ return ENGINE_SUCCESS;
+ } else {
+ return ENGINE_FAILED;
+ }
+ }
+
+ protocol_binary_request_touch *t = reinterpret_cast<protocol_binary_request_touch*>(request);
+ void *key = t->bytes + sizeof(t->bytes);
+ uint32_t exptime = ntohl(t->message.body.expiration);
+ uint16_t nkey = ntohs(request->request.keylen);
+ uint16_t vbucket = ntohs(request->request.vbucket);
+
+ // try to get the object
+ std::string k(static_cast<const char*>(key), nkey);
+ GetValue gv(epstore->getAndUpdateTtl(k, vbucket, cookie,
+ request->request.opcode != PROTOCOL_BINARY_CMD_TOUCH,
+ exptime));
+ ENGINE_ERROR_CODE rv = gv.getStatus();
+ if (rv == ENGINE_SUCCESS) {
+ bool ret;
+ Item *it = gv.getValue();
+ if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
+ ret = response(NULL, 0, NULL, 0, NULL, 0,
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
+ } else {
+ uint32_t flags = it->getFlags();
+ ret = response(NULL, 0, &flags, sizeof(flags),
+ it->getData(), it->getNBytes(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, it->getCas(),
+ cookie);
+ }
+ delete it;
+ if (ret) {
+ rv = ENGINE_SUCCESS;
+ } else {
+ rv = ENGINE_FAILED;
+ }
+ } else if (rv == ENGINE_KEY_ENOENT) {
+ if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
+ // GATQ should not return response upon cache miss
+ rv = ENGINE_SUCCESS;
+ } else {
+ if (response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie)) {
+ rv = ENGINE_SUCCESS;
+ } else {
+ rv = ENGINE_FAILED;
+ }
+ }
+ }
+
+ return rv;
+}
View
4 ep_engine.h
@@ -321,6 +321,10 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
size_t ndata,
uint16_t vbucket);
+ ENGINE_ERROR_CODE touch(const void* cookie,
+ protocol_binary_request_header *request,
+ ADD_RESPONSE response);
+
/**
* Visit the objects and add them to the tap connecitons queue.
* @todo this code should honor the backfill time!
View
174 ep_testsuite.cc
@@ -1782,6 +1782,177 @@ static enum test_result test_vb_del_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *
return SUCCESS;
}
+static enum test_result test_touch(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+ char buffer[512];
+ memset(buffer, 0, sizeof(buffer));
+ protocol_binary_request_touch *req = reinterpret_cast<protocol_binary_request_touch *>(buffer);
+ protocol_binary_request_header *request = reinterpret_cast<protocol_binary_request_header*>(req);
+
+ req->message.header.request.magic = PROTOCOL_BINARY_REQ;
+ req->message.header.request.opcode = PROTOCOL_BINARY_CMD_TOUCH;
+ req->message.header.request.extlen = 4;
+ req->message.header.request.bodylen = htonl(4);
+ req->message.body.expiration = ntohl(10);
+
+ // key is a mandatory field!
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call touch");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
+
+ // extlen is a mandatory field!
+ req->message.header.request.extlen = 0;
+ req->message.header.request.keylen = 4;
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call touch");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
+
+ // Try to touch an unknown item...
+ req->message.header.request.extlen = 4;
+ req->message.header.request.keylen = htons(5);
+ req->message.header.request.bodylen = htonl(4 + 5);
+ memcpy(buffer + sizeof(req->bytes), "mykey", 5);
+
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call touch");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, "Testing unknown key");
+
+ // Store the item!
+ item *itm = NULL;
+ check(store(h, h1, NULL, OPERATION_SET, "mykey", "somevalue", &itm) == ENGINE_SUCCESS,
+ "Failed set.");
+ h1->release(h, NULL, itm);
+
+ check(check_key_value(h, h1, "mykey", "somevalue", strlen("somevalue")) == SUCCESS,
+ "Failed to retrieve data");
+
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call touch");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch mykey");
+
+ // time-travel 11 secs..
+ testHarness.time_travel(11);
+
+ // The item should have expired now...
+ check(h1->get(h, NULL, &itm, "mykey", 5, 0) == ENGINE_KEY_ENOENT, "Item should be gone");
+
+ return SUCCESS;
+}
+
+static enum test_result test_gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+ char buffer[512];
+ memset(buffer, 0, sizeof(buffer));
+ protocol_binary_request_gat *req = reinterpret_cast<protocol_binary_request_gat *>(buffer);
+ protocol_binary_request_header *request = reinterpret_cast<protocol_binary_request_header*>(req);
+
+ req->message.header.request.magic = PROTOCOL_BINARY_REQ;
+ req->message.header.request.opcode = PROTOCOL_BINARY_CMD_GAT;
+ req->message.header.request.extlen = 4;
+ req->message.header.request.bodylen = htonl(4);
+ req->message.body.expiration = ntohl(10);
+
+ // key is a mandatory field!
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
+
+ // extlen is a mandatory field!
+ req->message.header.request.extlen = 0;
+ req->message.header.request.keylen = 4;
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
+
+ // Try to touch an unknown item...
+ req->message.header.request.extlen = 4;
+ req->message.header.request.keylen = htons(5);
+ req->message.header.request.bodylen = htonl(4 + 5);
+ memcpy(buffer + sizeof(req->bytes), "mykey", 5);
+
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, "Testing unknown key");
+
+ // Store the item!
+ item *itm = NULL;
+ check(store(h, h1, NULL, OPERATION_SET, "mykey", "somevalue", &itm) == ENGINE_SUCCESS,
+ "Failed set.");
+ h1->release(h, NULL, itm);
+
+ check(check_key_value(h, h1, "mykey", "somevalue", strlen("somevalue")) == SUCCESS,
+ "Failed to retrieve data");
+
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "gat mykey");
+ check(memcmp(last_body, "somevalue", sizeof("somevalue")) == 0,
+ "Invalid data returned");
+ // time-travel 11 secs..
+ testHarness.time_travel(11);
+
+ // The item should have expired now...
+ check(h1->get(h, NULL, &itm, "mykey", 5, 0) == ENGINE_KEY_ENOENT, "Item should be gone");
+ return SUCCESS;
+}
+
+static enum test_result test_gatq(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+ char buffer[512];
+ memset(buffer, 0, sizeof(buffer));
+ protocol_binary_request_gat *req = reinterpret_cast<protocol_binary_request_gat *>(buffer);
+ protocol_binary_request_header *request = reinterpret_cast<protocol_binary_request_header*>(req);
+
+ req->message.header.request.magic = PROTOCOL_BINARY_REQ;
+ req->message.header.request.opcode = PROTOCOL_BINARY_CMD_GATQ;
+ req->message.header.request.extlen = 4;
+ req->message.header.request.bodylen = htonl(4);
+ req->message.body.expiration = ntohl(10);
+
+ // key is a mandatory field!
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
+
+ // extlen is a mandatory field!
+ req->message.header.request.extlen = 0;
+ req->message.header.request.keylen = 4;
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
+
+ // Try to gat an unknown item...
+ req->message.header.request.extlen = 4;
+ req->message.header.request.keylen = htons(5);
+ req->message.header.request.bodylen = htonl(4 + 5);
+ memcpy(buffer + sizeof(req->bytes), "mykey", 5);
+
+ last_status = static_cast<protocol_binary_response_status>(0xffff);
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+
+ // We should not have sent any response!
+ check(last_status == 0xffff, "Testing unknown key");
+
+ // Store the item!
+ item *itm = NULL;
+ check(store(h, h1, NULL, OPERATION_SET, "mykey", "somevalue", &itm) == ENGINE_SUCCESS,
+ "Failed set.");
+ h1->release(h, NULL, itm);
+
+ check(check_key_value(h, h1, "mykey", "somevalue", strlen("somevalue")) == SUCCESS,
+ "Failed to retrieve data");
+
+ check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+ "Failed to call gat");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "gat mykey");
+ check(memcmp(last_body, "somevalue", sizeof("somevalue")) == 0,
+ "Invalid data returned");
+ // time-travel 11 secs..
+ testHarness.time_travel(11);
+
+ // The item should have expired now...
+ check(h1->get(h, NULL, &itm, "mykey", 5, 0) == ENGINE_KEY_ENOENT, "Item should be gone");
+ return SUCCESS;
+}
+
static enum test_result test_alloc_limit(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
item *it = NULL;
@@ -3634,6 +3805,9 @@ engine_test_t* get_tests(void) {
{"incr", test_incr, NULL, teardown, NULL},
{"incr with default", test_incr_default, NULL, teardown, NULL},
{"incr expiry", test_bug2799, NULL, teardown, NULL},
+ {"test touch", test_touch, NULL, teardown, NULL},
+ {"test gat", test_gat, NULL, teardown, NULL},
+ {"test gatq", test_gatq, NULL, teardown, NULL},
{"delete", test_delete, NULL, teardown, NULL},
{"set/delete", test_set_delete, NULL, teardown, NULL},
{"delete/set/delete", test_delete_set, NULL, teardown, NULL},
View
7 stored-value.hh
@@ -194,6 +194,13 @@ public:
}
}
+ void setExptime(time_t tim) {
+ if (!_isSmall) {
+ extra.feature.exptime = tim;
+ markDirty();
+ }
+ }
+
/**
* Get the client-defined flags of this item.
*
Please sign in to comment.
Something went wrong with that request. Please try again.