Skip to content

Commit

Permalink
Don't copy the duplicate meta data into the engine_specific field
Browse files Browse the repository at this point in the history
The meta data of each item was previously constructed by
allocating 22 bytes of a byte array in Item class and passing its
pointer to the memcached through the engine_specific field.

As all the meta data fields except for an item's seqno are already
included in an TAP message, we only need to pass the seqno to the
memcached via the engine_specific field. This can also reduce the
memory overhead because Item instances are used in memcached,
flusher queue, checkpoints, etc.

Change-Id: Iadb2b34aa9b14a5f0dc506f6a247544283a9f1a2
Reviewed-on: http://review.couchbase.org/12293
Reviewed-by: Srinivas Vadlamani <srinivas@couchbase.com>
Tested-by: Srinivas Vadlamani <srinivas@couchbase.com>
  • Loading branch information
chiyoung committed Jan 12, 2012
1 parent c53c80e commit 3342e7b
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 31 deletions.
3 changes: 2 additions & 1 deletion Makefile.am
Expand Up @@ -213,7 +213,8 @@ EXTRA_TESTS =
ep_testsuite_la_CPPFLAGS = -I$(top_srcdir) -I$(top_srcdir)/sqlite-kvstore \ ep_testsuite_la_CPPFLAGS = -I$(top_srcdir) -I$(top_srcdir)/sqlite-kvstore \
$(AM_CPPFLAGS) ${NO_WERROR} $(AM_CPPFLAGS) ${NO_WERROR}
ep_testsuite_la_SOURCES= ep_testsuite.cc ep_testsuite.h \ ep_testsuite_la_SOURCES= ep_testsuite.cc ep_testsuite.h \
atomic.cc locks.hh mutex.cc mutex.hh testlogger_libify.cc atomic.cc locks.hh mutex.cc mutex.hh \
item.cc testlogger_libify.cc
ep_testsuite_la_LDFLAGS= -module -dynamic ep_testsuite_la_LDFLAGS= -module -dynamic


# This is because automake can't figure out how to build the same code # This is because automake can't figure out how to build the same code
Expand Down
27 changes: 12 additions & 15 deletions ep_engine.cc
Expand Up @@ -1381,7 +1381,7 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
connection->encodeVBucketStateTransition(ev, es, nes, vbucket); connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
break; break;
case TAP_OPAQUE: case TAP_OPAQUE:
connection->opaqueCommandCode = ev.state; connection->opaqueCommandCode = (uint32_t) ev.state;
*vbucket = ev.vbucket; *vbucket = ev.vbucket;
*es = &connection->opaqueCommandCode; *es = &connection->opaqueCommandCode;
*nes = sizeof(connection->opaqueCommandCode); *nes = sizeof(connection->opaqueCommandCode);
Expand All @@ -1400,8 +1400,9 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
case TAP_DELETION: case TAP_DELETION:
*itm = it; *itm = it;
if (ret == TAP_MUTATION || ret == TAP_DELETION) { if (ret == TAP_MUTATION || ret == TAP_DELETION) {
*es = (void*)it->getMetaData(); connection->itemRevSeqno = htonl(it->getSeqno());
*nes = it->getNMetaBytes(); *es = &connection->itemRevSeqno;
*nes = sizeof(connection->itemRevSeqno);
} }
break; break;
case TAP_NOOP: case TAP_NOOP:
Expand Down Expand Up @@ -1608,7 +1609,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
size_t nkey, size_t nkey,
uint32_t flags, uint32_t flags,
uint32_t exptime, uint32_t exptime,
uint64_t, // cas uint64_t cas,
const void *data, const void *data,
size_t ndata, size_t ndata,
uint16_t vbucket) uint16_t vbucket)
Expand Down Expand Up @@ -1705,17 +1706,13 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,


if (tc) { if (tc) {
bool meta = false; bool meta = false;
if (nengine > 0) { if (nengine == sizeof(uint32_t)) {
uint32_t s; uint32_t seqnum;
uint64_t c; memcpy(&seqnum, engine_specific, sizeof(seqnum));
uint32_t l; seqnum = ntohl(seqnum);
uint32_t f; itm->setCas(cas);

itm->setSeqno(seqnum);
if (Item::decodeMeta((uint8_t*)engine_specific, s, c, l, f)) { meta = true;
itm->setCas(c);
itm->setSeqno(s);
meta = true;
}
} }


if (tc->isBackfillPhase(vbucket)) { if (tc->isBackfillPhase(vbucket)) {
Expand Down
2 changes: 2 additions & 0 deletions item.cc
Expand Up @@ -19,6 +19,8 @@
#include "tools/cJSON.h" #include "tools/cJSON.h"


Atomic<uint64_t> Item::casCounter(1); Atomic<uint64_t> Item::casCounter(1);
// header(2 bytes) + seqno(uint32) + cas(uint64_t) + value_len(uint32_t) + flags(uint32_t)
const uint32_t Item::metaDataSize(3 * sizeof(uint32_t) + sizeof(uint64_t) + 2);


bool Item::append(const Item &i) { bool Item::append(const Item &i) {
assert(value.get() != NULL); assert(value.get() != NULL);
Expand Down
16 changes: 4 additions & 12 deletions item.hh
Expand Up @@ -202,16 +202,6 @@ public:
return static_cast<int>(key.length()); return static_cast<int>(key.length());
} }


uint32_t getNMetaBytes() const {
return sizeof(meta);
}

const char *getMetaData() {
size_t nb = sizeof(meta);
encodeMeta(seqno, cas, getNBytes(), flags, meta, nb);
return (const char*)meta;
}

uint32_t getNBytes() const { uint32_t getNBytes() const {
return value.get() ? static_cast<uint32_t>(value->length()) : 0; return value.get() ? static_cast<uint32_t>(value->length()) : 0;
} }
Expand Down Expand Up @@ -366,6 +356,9 @@ public:
return true; return true;
} }


static uint32_t getNMetaBytes() {
return metaDataSize;
}


private: private:
/** /**
Expand Down Expand Up @@ -393,8 +386,6 @@ private:
uint32_t seqno; uint32_t seqno;
uint16_t vbucketId; uint16_t vbucketId;


uint8_t meta[22];

static uint64_t nextCas(void) { static uint64_t nextCas(void) {
uint64_t ret = gethrtime(); uint64_t ret = gethrtime();
if ((ret & 1000) == 0) { if ((ret & 1000) == 0) {
Expand All @@ -409,6 +400,7 @@ private:
} }


static Atomic<uint64_t> casCounter; static Atomic<uint64_t> casCounter;
static const uint32_t metaDataSize;
DISALLOW_COPY_AND_ASSIGN(Item); DISALLOW_COPY_AND_ASSIGN(Item);
}; };


Expand Down
6 changes: 3 additions & 3 deletions mc-kvstore/mc-engine.cc
Expand Up @@ -1066,14 +1066,14 @@ void MemcachedEngine::setmq(const Item &it, Callback<mutation_result> &cb) {
req.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES; req.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
req.message.header.request.vbucket = ntohs(it.getVBucketId()); req.message.header.request.vbucket = ntohs(it.getVBucketId());
uint32_t bodylen = req.message.header.request.extlen + it.getNKey() uint32_t bodylen = req.message.header.request.extlen + it.getNKey()
+ it.getNBytes() + it.getNMetaBytes(); + it.getNBytes() + Item::getNMetaBytes();
req.message.header.request.bodylen = ntohl(bodylen); req.message.header.request.bodylen = ntohl(bodylen);
req.message.body.nmeta_bytes = ntohl(it.getNMetaBytes()); req.message.body.nmeta_bytes = ntohl(Item::getNMetaBytes());
req.message.body.flags = it.getFlags(); req.message.body.flags = it.getFlags();
req.message.body.expiration = htonl((uint32_t)it.getExptime()); req.message.body.expiration = htonl((uint32_t)it.getExptime());


uint8_t meta[30]; uint8_t meta[30];
size_t nmeta = it.getNMetaBytes(); size_t nmeta = Item::getNMetaBytes();
Item::encodeMeta(it, meta, nmeta); Item::encodeMeta(it, meta, nmeta);


sendIov[0].iov_base = (char*)req.bytes; sendIov[0].iov_base = (char*)req.bytes;
Expand Down
5 changes: 5 additions & 0 deletions tapconnection.hh
Expand Up @@ -1143,6 +1143,11 @@ private:
*/ */
uint32_t opaqueCommandCode; uint32_t opaqueCommandCode;


/**
* Revision seq number of the item to be transmitted. This variable's value is
* copied to the engine_specific field in a memcached tap message.
*/
uint32_t itemRevSeqno;


/** /**
* Is this tap connection in a suspended state (the receiver may * Is this tap connection in a suspended state (the receiver may
Expand Down

0 comments on commit 3342e7b

Please sign in to comment.