diff --git a/Makefile.am b/Makefile.am index 00dcfabcc..5d591b91c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -213,7 +213,8 @@ EXTRA_TESTS = ep_testsuite_la_CPPFLAGS = -I$(top_srcdir) -I$(top_srcdir)/sqlite-kvstore \ $(AM_CPPFLAGS) ${NO_WERROR} ep_testsuite_la_SOURCES= ep_testsuite.cc ep_testsuite.h \ - atomic.cc locks.hh mutex.cc mutex.hh testlogger_libify.cc + atomic.cc locks.hh mutex.cc mutex.hh \ + item.cc testlogger_libify.cc ep_testsuite_la_LDFLAGS= -module -dynamic # This is because automake can't figure out how to build the same code diff --git a/ep_engine.cc b/ep_engine.cc index 08ff4532d..0e44cd5be 100644 --- a/ep_engine.cc +++ b/ep_engine.cc @@ -1381,7 +1381,7 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie connection->encodeVBucketStateTransition(ev, es, nes, vbucket); break; case TAP_OPAQUE: - connection->opaqueCommandCode = ev.state; + connection->opaqueCommandCode = (uint32_t) ev.state; *vbucket = ev.vbucket; *es = &connection->opaqueCommandCode; *nes = sizeof(connection->opaqueCommandCode); @@ -1400,8 +1400,9 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie case TAP_DELETION: *itm = it; if (ret == TAP_MUTATION || ret == TAP_DELETION) { - *es = (void*)it->getMetaData(); - *nes = it->getNMetaBytes(); + connection->itemRevSeqno = htonl(it->getSeqno()); + *es = &connection->itemRevSeqno; + *nes = sizeof(connection->itemRevSeqno); } break; case TAP_NOOP: @@ -1608,7 +1609,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie, size_t nkey, uint32_t flags, uint32_t exptime, - uint64_t, // cas + uint64_t cas, const void *data, size_t ndata, uint16_t vbucket) @@ -1705,17 +1706,13 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie, if (tc) { bool meta = false; - if (nengine > 0) { - uint32_t s; - uint64_t c; - uint32_t l; - uint32_t f; - - if (Item::decodeMeta((uint8_t*)engine_specific, s, c, l, f)) { - itm->setCas(c); - itm->setSeqno(s); - meta = true; - } + if (nengine == sizeof(uint32_t)) { + uint32_t seqnum; + memcpy(&seqnum, engine_specific, sizeof(seqnum)); + seqnum = ntohl(seqnum); + itm->setCas(cas); + itm->setSeqno(seqnum); + meta = true; } if (tc->isBackfillPhase(vbucket)) { diff --git a/item.cc b/item.cc index 54cd4bf35..2bb3a6dd4 100644 --- a/item.cc +++ b/item.cc @@ -19,6 +19,8 @@ #include "tools/cJSON.h" Atomic Item::casCounter(1); +// header(2 bytes) + seqno(uint32) + cas(uint64_t) + value_len(uint32_t) + flags(uint32_t) +const uint32_t Item::metaDataSize(3 * sizeof(uint32_t) + sizeof(uint64_t) + 2); bool Item::append(const Item &i) { assert(value.get() != NULL); diff --git a/item.hh b/item.hh index 66afdc36b..21c4a7b4a 100644 --- a/item.hh +++ b/item.hh @@ -202,16 +202,6 @@ public: return static_cast(key.length()); } - uint32_t getNMetaBytes() const { - return sizeof(meta); - } - - const char *getMetaData() { - size_t nb = sizeof(meta); - encodeMeta(seqno, cas, getNBytes(), flags, meta, nb); - return (const char*)meta; - } - uint32_t getNBytes() const { return value.get() ? static_cast(value->length()) : 0; } @@ -366,6 +356,9 @@ public: return true; } + static uint32_t getNMetaBytes() { + return metaDataSize; + } private: /** @@ -393,8 +386,6 @@ private: uint32_t seqno; uint16_t vbucketId; - uint8_t meta[22]; - static uint64_t nextCas(void) { uint64_t ret = gethrtime(); if ((ret & 1000) == 0) { @@ -409,6 +400,7 @@ private: } static Atomic casCounter; + static const uint32_t metaDataSize; DISALLOW_COPY_AND_ASSIGN(Item); }; diff --git a/mc-kvstore/mc-engine.cc b/mc-kvstore/mc-engine.cc index 558f5340b..7ab9d4a69 100644 --- a/mc-kvstore/mc-engine.cc +++ b/mc-kvstore/mc-engine.cc @@ -1066,14 +1066,14 @@ void MemcachedEngine::setmq(const Item &it, Callback &cb) { req.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES; req.message.header.request.vbucket = ntohs(it.getVBucketId()); uint32_t bodylen = req.message.header.request.extlen + it.getNKey() - + it.getNBytes() + it.getNMetaBytes(); + + it.getNBytes() + Item::getNMetaBytes(); req.message.header.request.bodylen = ntohl(bodylen); - req.message.body.nmeta_bytes = ntohl(it.getNMetaBytes()); + req.message.body.nmeta_bytes = ntohl(Item::getNMetaBytes()); req.message.body.flags = it.getFlags(); req.message.body.expiration = htonl((uint32_t)it.getExptime()); uint8_t meta[30]; - size_t nmeta = it.getNMetaBytes(); + size_t nmeta = Item::getNMetaBytes(); Item::encodeMeta(it, meta, nmeta); sendIov[0].iov_base = (char*)req.bytes; diff --git a/tapconnection.hh b/tapconnection.hh index 75a5ccae7..668edfe99 100644 --- a/tapconnection.hh +++ b/tapconnection.hh @@ -1143,6 +1143,11 @@ private: */ uint32_t opaqueCommandCode; + /** + * Revision seq number of the item to be transmitted. This variable's value is + * copied to the engine_specific field in a memcached tap message. + */ + uint32_t itemRevSeqno; /** * Is this tap connection in a suspended state (the receiver may