Skip to content

Commit

Permalink
MB-5478: Remove old observe code
Browse files Browse the repository at this point in the history
Change-Id: Id1bbaef9888d2590862e7d505f043726b8fe7cc0
Reviewed-on: http://review.couchbase.org/17022
Reviewed-by: Chiyoung Seo <chiyoung.seo@gmail.com>
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information
Mike Wiederhold authored and chiyoung committed Jun 14, 2012
1 parent 874b111 commit 8816d55
Show file tree
Hide file tree
Showing 13 changed files with 5 additions and 734 deletions.
1 change: 0 additions & 1 deletion Makefile.am
Expand Up @@ -104,7 +104,6 @@ ep_la_SOURCES = \
statwriter.hh \
stored-value.cc stored-value.hh \
syncobject.hh \
observe_registry.cc observe_registry.hh \
tapconnection.cc tapconnection.hh \
tapconnmap.cc tapconnmap.hh \
tapthrottle.cc tapthrottle.hh \
Expand Down
3 changes: 0 additions & 3 deletions command_ids.h
Expand Up @@ -41,9 +41,6 @@
#define CMD_GET_LOCKED 0x94
#define CMD_UNLOCK_KEY 0x95

#define CMD_OBSERVE 0xb1
#define CMD_UNOBSERVE 0xb2

/**
* Return the last closed checkpoint Id for a given VBucket.
*/
Expand Down
11 changes: 1 addition & 10 deletions docs/stats.org
Expand Up @@ -158,16 +158,7 @@ For introductory information on stats within membase, start with the
| | loaded from the persistence layer |
| ep_latency_get_cmd | The total elapse time for get command |
| ep_latency_arith_cmd | The total eplase time for arith command |
| ep_total_observe_set | Number of observe sets |
| ep_stats_observe_polls | Number of stats observe calls |
| ep_observe_calls | Number of observe calls |
| ep_unobserve_calls | Number of unobserve calls |
| ep_observe_registry_size | Number of key in the observe registry |
| ep_observe_errors | Number of time an observe failed to be |
| | added to the observe registry (Not due to |
| | protocol violation) |
| ep_obs_reg_clean_job | Number of time the observe registry |
| | cleaner job has run |

| ep_inconsistent_slave_chk | Flag indicating if we allow a "downstream" |
| | master to receive checkpoint messages |
| ep_mlog_compactor_runs | Number of times mutation log compactor is |
Expand Down
10 changes: 1 addition & 9 deletions ep.cc
Expand Up @@ -397,7 +397,7 @@ EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine
accessLog(engine.getConfiguration().getAlogPath(),
engine.getConfiguration().getAlogBlockSize()),
diskFlushAll(false),
tctx(stats, t, mutationLog, theEngine.observeRegistry),
tctx(stats, t, mutationLog),
bgFetchDelay(0)
{
getLogger()->log(EXTENSION_LOG_INFO, NULL,
Expand Down Expand Up @@ -616,14 +616,6 @@ void EventuallyPersistentStore::initialize() {
Priority::CheckpointRemoverPriority,
checkpointRemoverInterval);

shared_ptr<DispatcherCallback> obsRegCb(new ObserveRegistryCleaner(
engine.getObserveRegistry(),
stats, 60));

nonIODispatcher->schedule(obsRegCb, NULL,
Priority::ObserveRegistryCleanerPriority,
10);

if (mutationLog.isEnabled()) {
shared_ptr<MutationLogCompactor>
compactor(new MutationLogCompactor(this, mutationLog, mlogCompactorConfig, stats));
Expand Down
8 changes: 2 additions & 6 deletions ep.hh
Expand Up @@ -40,7 +40,6 @@
#include "locks.hh"
#include "kvstore.hh"
#include "stored-value.hh"
#include "observe_registry.hh"
#include "atomic.hh"
#include "dispatcher.hh"
#include "vbucket.hh"
Expand Down Expand Up @@ -300,10 +299,8 @@ class PersistenceCallback;
class TransactionContext {
public:

TransactionContext(EPStats &st, KVStore *ks, MutationLog &log,
ObserveRegistry &obsReg)
: stats(st), underlying(ks), mutationLog(log), _remaining(0), intxn(false),
observeRegistry(obsReg) {}
TransactionContext(EPStats &st, KVStore *ks, MutationLog &log)
: stats(st), underlying(ks), mutationLog(log), _remaining(0), intxn(false) {}

/**
* Call this whenever entering a transaction.
Expand Down Expand Up @@ -381,7 +378,6 @@ private:
Atomic<size_t> numUncommittedItems;
bool intxn;
std::list<queued_item> uncommittedItems;
ObserveRegistry &observeRegistry;
std::list<PersistenceCallback*> transactionCallbacks;
};

Expand Down
138 changes: 1 addition & 137 deletions ep_engine.cc
Expand Up @@ -653,66 +653,6 @@ extern "C" {
return rv;
}

static ENGINE_ERROR_CODE observeCmd(EventuallyPersistentEngine *e,
protocol_binary_request_header *request,
const void *cookie,
ADD_RESPONSE response) {
protocol_binary_request_observe *req =
reinterpret_cast<protocol_binary_request_observe*>(request);
assert(req);

uint16_t keylen = ntohs(req->message.header.request.keylen);
uint8_t extlen = req->message.header.request.extlen;
uint16_t vbucket = ntohs(req->message.header.request.vbucket);
uint32_t bodylen = ntohl(req->message.header.request.bodylen);
uint64_t cas = ntohll(req->message.header.request.cas);
uint32_t exp = ntohl(req->message.body.expiration);

// Return invalid if no key or observe set is specified
if (keylen == 0 || (bodylen - keylen - extlen) == 0) {
sendResponse(response, NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES,
PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
return ENGINE_FAILED;
}

const char *keyp = reinterpret_cast<const char*>(req->bytes);
keyp += sizeof(request->bytes) + extlen;
std::string key(keyp, keylen);

const char *obs_set_pos = reinterpret_cast<const char*>(req->bytes);
obs_set_pos += sizeof(request->bytes) + extlen + keylen;
std::string obs_set(obs_set_pos, (bodylen - extlen - keylen));

return e->observe(cookie, key, cas, vbucket, obs_set, exp, response);
}

static ENGINE_ERROR_CODE unobserveCmd(EventuallyPersistentEngine *e,
protocol_binary_request_header *request,
const void *cookie,
ADD_RESPONSE response) {
uint16_t keylen = ntohs(request->request.keylen);
uint16_t vbucket = ntohs(request->request.vbucket);
uint32_t bodylen = ntohl(request->request.bodylen);
uint64_t cas = ntohll(request->request.cas);

// Return invalid if no key or observe set is specified
if (keylen == 0 || (bodylen - keylen) == 0) {
sendResponse(response, NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES,
PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
return ENGINE_FAILED;
}

const char *keyp = reinterpret_cast<const char*>(request->bytes);
keyp += sizeof(request->bytes);
std::string key(keyp, keylen);

const char *obs_set_pos = reinterpret_cast<const char*>(request->bytes);
obs_set_pos += sizeof(request->bytes) + keylen;
std::string obs_set(obs_set_pos, (bodylen - keylen));

return e->unobserve(cookie, key, cas, vbucket, obs_set, response);
}

static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
const void *cookie,
protocol_binary_request_header *request,
Expand Down Expand Up @@ -903,16 +843,6 @@ extern "C" {
case CMD_UNLOCK_KEY:
res = unlockKey(h, request, &msg, &msg_size);
break;
case CMD_OBSERVE:
{
rv = observeCmd(h, request, cookie, response);
return rv;
}
case CMD_UNOBSERVE:
{
rv = unobserveCmd(h, request, cookie, response);
return rv;
}
case CMD_DEREGISTER_TAP_CLIENT:
{
rv = h->deregisterTapClient(cookie, request, response);
Expand Down Expand Up @@ -1178,7 +1108,7 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(GET_SERVER_API get_server
startedEngineThreads(false),
getServerApiFunc(get_server_api), getlExtension(NULL),
tapConnMap(NULL), tapConfig(NULL), checkpointConfig(NULL),
mutation_count(0), observeRegistry(&epstore, &stats), warmingUp(true),
mutation_count(0), warmingUp(true),
flushAllEnabled(false), startupTime(0)
{
interface.interface = 1;
Expand Down Expand Up @@ -2643,19 +2573,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(),
add_stat, cookie);
add_casted_stat("ep_total_observe_sets", epstats.totalObserveSets,
add_stat, cookie);
add_casted_stat("ep_stats_observe_polls", epstats.statsObservePolls,
add_stat, cookie);
add_casted_stat("ep_observe_calls", epstats.observeCalls, add_stat,
cookie);
add_casted_stat("ep_unobserve_calls", epstats.unobserveCalls, add_stat,
cookie);
add_casted_stat("ep_observe_registry_size", epstats.obsRegSize, add_stat,
cookie);
add_casted_stat("ep_observe_errors", epstats.obsErrors, add_stat, cookie);
add_casted_stat("ep_obs_reg_clean_job", epstats.obsCleanerRuns, add_stat,
cookie);

add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
add_stat, cookie);
Expand Down Expand Up @@ -3310,21 +3227,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDispatcherStats(const void *cook
return ENGINE_SUCCESS;
}

ENGINE_ERROR_CODE EventuallyPersistentEngine::doObserveStats(const void* cookie,
ADD_STAT add_stat,
const char* stat_key,
int nkey) {
stats.statsObservePolls++;
std::string obs_set(stat_key, nkey);
state_map* smap = getObserveRegistry().getObserveSetState(obs_set);
std::map<std::string, std::string>::iterator itr;
for (itr = smap->begin(); itr != smap->end(); itr++) {
add_casted_stat(itr->first.c_str(), itr->second.c_str(), add_stat, cookie);
}
delete smap;
return ENGINE_SUCCESS;
}

ENGINE_ERROR_CODE EventuallyPersistentEngine::doKlogStats(const void* cookie,
ADD_STAT add_stat) {
const MutationLog *mutationLog(epstore->getMutationLog());
Expand Down Expand Up @@ -3354,8 +3256,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getStats(const void* cookie,
ENGINE_ERROR_CODE rv = ENGINE_KEY_ENOENT;
if (stat_key == NULL) {
rv = doEngineStats(cookie, add_stat);
} else if (nkey > 8 && strncmp(stat_key, "observe ", 8) == 0) {
rv = doObserveStats(cookie, add_stat, stat_key + 8, nkey - 8);
} else if (nkey > 7 && strncmp(stat_key, "tapagg ", 7) == 0) {
rv = doTapAggStats(cookie, add_stat, stat_key + 7, nkey - 7);
} else if (nkey == 3 && strncmp(stat_key, "tap", 3) == 0) {
Expand Down Expand Up @@ -3543,42 +3443,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::touch(const void *cookie,
return rv;
}

ENGINE_ERROR_CODE EventuallyPersistentEngine::observe(const void *cookie,
std::string key,
uint64_t cas,
uint16_t vbucket,
std::string obs_set,
uint32_t expiration,
ADD_RESPONSE response) {
protocol_binary_response_status rv;

stats.observeCalls++;
getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "observe %s %llu %s %d",
key.c_str(), cas, obs_set.c_str(), expiration);
rv = getObserveRegistry().observeKey(key, cas, vbucket, expiration, obs_set);
if (rv == PROTOCOL_BINARY_RESPONSE_ETMPFAIL) {
return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, 0, memoryCondition(),
0, cookie);
} else if (rv == PROTOCOL_BINARY_RESPONSE_EBUSY) {
return sendResponse(response, "Observe set full", 16, NULL, 0, NULL, 0, 0,
rv, 0, cookie);
}
return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, 0, rv, 0, cookie);
}

ENGINE_ERROR_CODE EventuallyPersistentEngine::unobserve(const void *cookie,
std::string key,
uint64_t cas,
uint16_t vbucket,
std::string obs_set,
ADD_RESPONSE response) {
stats.unobserveCalls++;
getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "unobserve %s %llu %s",
key.c_str(), cas, obs_set.c_str());
getObserveRegistry().unobserveKey(key, cas, vbucket, obs_set);
return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, 0, 0, 0, cookie);
}

ENGINE_ERROR_CODE EventuallyPersistentEngine::handleRestoreCmd(const void *cookie,
protocol_binary_request_header *request,
ADD_RESPONSE response)
Expand Down
23 changes: 0 additions & 23 deletions ep_engine.h
Expand Up @@ -9,7 +9,6 @@
#include "ep_extension.h"
#include "dispatcher.hh"
#include "item_pager.hh"
#include "observe_registry.hh"

#include <cstdio>
#include <map>
Expand Down Expand Up @@ -500,21 +499,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
return epstore->unlockKey(key, vbucket, cas, currentTime);
}

ENGINE_ERROR_CODE observe(const void *cookie,
std::string key,
uint64_t cas,
uint16_t vbucket,
std::string obs_set,
uint32_t expiration,
ADD_RESPONSE response);

ENGINE_ERROR_CODE unobserve(const void *cookie,
std::string key,
uint64_t cas,
uint16_t vbucket,
std::string obs_set,
ADD_RESPONSE response);

RCPtr<VBucket> getVBucket(uint16_t vbucket) {
return epstore->getVBucket(vbucket);
}
Expand Down Expand Up @@ -555,10 +539,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {

SERVER_HANDLE_V1* getServerApi() { return serverApi; }

ObserveRegistry &getObserveRegistry() {
return observeRegistry;
}

Configuration &getConfiguration() {
return configuration;
}
Expand Down Expand Up @@ -701,8 +681,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
return ret;
}

ENGINE_ERROR_CODE doObserveStats(const void* cookie, ADD_STAT add_s,
const char* stat_key, int nkey);
ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
Expand Down Expand Up @@ -789,7 +767,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
size_t getlDefaultTimeout;
size_t getlMaxTimeout;
EPStats stats;
ObserveRegistry observeRegistry;
Configuration configuration;
Atomic<bool> warmingUp;
struct {
Expand Down

0 comments on commit 8816d55

Please sign in to comment.