diff --git a/Makefile.am b/Makefile.am index cc882ddc2..a22a00f88 100644 --- a/Makefile.am +++ b/Makefile.am @@ -104,7 +104,6 @@ ep_la_SOURCES = \ statwriter.hh \ stored-value.cc stored-value.hh \ syncobject.hh \ - observe_registry.cc observe_registry.hh \ tapconnection.cc tapconnection.hh \ tapconnmap.cc tapconnmap.hh \ tapthrottle.cc tapthrottle.hh \ diff --git a/command_ids.h b/command_ids.h index c130cc49c..c0d7b92bc 100644 --- a/command_ids.h +++ b/command_ids.h @@ -41,9 +41,6 @@ #define CMD_GET_LOCKED 0x94 #define CMD_UNLOCK_KEY 0x95 -#define CMD_OBSERVE 0xb1 -#define CMD_UNOBSERVE 0xb2 - /** * Return the last closed checkpoint Id for a given VBucket. */ diff --git a/docs/stats.org b/docs/stats.org index b7c5e812b..dd6e0debe 100644 --- a/docs/stats.org +++ b/docs/stats.org @@ -158,16 +158,7 @@ For introductory information on stats within membase, start with the | | loaded from the persistence layer | | ep_latency_get_cmd | The total elapse time for get command | | ep_latency_arith_cmd | The total eplase time for arith command | -| ep_total_observe_set | Number of observe sets | -| ep_stats_observe_polls | Number of stats observe calls | -| ep_observe_calls | Number of observe calls | -| ep_unobserve_calls | Number of unobserve calls | -| ep_observe_registry_size | Number of key in the observe registry | -| ep_observe_errors | Number of time an observe failed to be | -| | added to the observe registry (Not due to | -| | protocol violation) | -| ep_obs_reg_clean_job | Number of time the observe registry | -| | cleaner job has run | + | ep_inconsistent_slave_chk | Flag indicating if we allow a "downstream" | | | master to receive checkpoint messages | | ep_mlog_compactor_runs | Number of times mutation log compactor is | diff --git a/ep.cc b/ep.cc index 98adfd489..92d6872fd 100644 --- a/ep.cc +++ b/ep.cc @@ -397,7 +397,7 @@ EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine accessLog(engine.getConfiguration().getAlogPath(), engine.getConfiguration().getAlogBlockSize()), diskFlushAll(false), - tctx(stats, t, mutationLog, theEngine.observeRegistry), + tctx(stats, t, mutationLog), bgFetchDelay(0) { getLogger()->log(EXTENSION_LOG_INFO, NULL, @@ -616,14 +616,6 @@ void EventuallyPersistentStore::initialize() { Priority::CheckpointRemoverPriority, checkpointRemoverInterval); - shared_ptr obsRegCb(new ObserveRegistryCleaner( - engine.getObserveRegistry(), - stats, 60)); - - nonIODispatcher->schedule(obsRegCb, NULL, - Priority::ObserveRegistryCleanerPriority, - 10); - if (mutationLog.isEnabled()) { shared_ptr compactor(new MutationLogCompactor(this, mutationLog, mlogCompactorConfig, stats)); diff --git a/ep.hh b/ep.hh index aca39d516..f10026715 100644 --- a/ep.hh +++ b/ep.hh @@ -40,7 +40,6 @@ #include "locks.hh" #include "kvstore.hh" #include "stored-value.hh" -#include "observe_registry.hh" #include "atomic.hh" #include "dispatcher.hh" #include "vbucket.hh" @@ -300,10 +299,8 @@ class PersistenceCallback; class TransactionContext { public: - TransactionContext(EPStats &st, KVStore *ks, MutationLog &log, - ObserveRegistry &obsReg) - : stats(st), underlying(ks), mutationLog(log), _remaining(0), intxn(false), - observeRegistry(obsReg) {} + TransactionContext(EPStats &st, KVStore *ks, MutationLog &log) + : stats(st), underlying(ks), mutationLog(log), _remaining(0), intxn(false) {} /** * Call this whenever entering a transaction. @@ -381,7 +378,6 @@ private: Atomic numUncommittedItems; bool intxn; std::list uncommittedItems; - ObserveRegistry &observeRegistry; std::list transactionCallbacks; }; diff --git a/ep_engine.cc b/ep_engine.cc index 8f7b5feb7..ffd97dc22 100644 --- a/ep_engine.cc +++ b/ep_engine.cc @@ -653,66 +653,6 @@ extern "C" { return rv; } - static ENGINE_ERROR_CODE observeCmd(EventuallyPersistentEngine *e, - protocol_binary_request_header *request, - const void *cookie, - ADD_RESPONSE response) { - protocol_binary_request_observe *req = - reinterpret_cast(request); - assert(req); - - uint16_t keylen = ntohs(req->message.header.request.keylen); - uint8_t extlen = req->message.header.request.extlen; - uint16_t vbucket = ntohs(req->message.header.request.vbucket); - uint32_t bodylen = ntohl(req->message.header.request.bodylen); - uint64_t cas = ntohll(req->message.header.request.cas); - uint32_t exp = ntohl(req->message.body.expiration); - - // Return invalid if no key or observe set is specified - if (keylen == 0 || (bodylen - keylen - extlen) == 0) { - sendResponse(response, NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES, - PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie); - return ENGINE_FAILED; - } - - const char *keyp = reinterpret_cast(req->bytes); - keyp += sizeof(request->bytes) + extlen; - std::string key(keyp, keylen); - - const char *obs_set_pos = reinterpret_cast(req->bytes); - obs_set_pos += sizeof(request->bytes) + extlen + keylen; - std::string obs_set(obs_set_pos, (bodylen - extlen - keylen)); - - return e->observe(cookie, key, cas, vbucket, obs_set, exp, response); - } - - static ENGINE_ERROR_CODE unobserveCmd(EventuallyPersistentEngine *e, - protocol_binary_request_header *request, - const void *cookie, - ADD_RESPONSE response) { - uint16_t keylen = ntohs(request->request.keylen); - uint16_t vbucket = ntohs(request->request.vbucket); - uint32_t bodylen = ntohl(request->request.bodylen); - uint64_t cas = ntohll(request->request.cas); - - // Return invalid if no key or observe set is specified - if (keylen == 0 || (bodylen - keylen) == 0) { - sendResponse(response, NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES, - PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie); - return ENGINE_FAILED; - } - - const char *keyp = reinterpret_cast(request->bytes); - keyp += sizeof(request->bytes); - std::string key(keyp, keylen); - - const char *obs_set_pos = reinterpret_cast(request->bytes); - obs_set_pos += sizeof(request->bytes) + keylen; - std::string obs_set(obs_set_pos, (bodylen - keylen)); - - return e->unobserve(cookie, key, cas, vbucket, obs_set, response); - } - static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e, const void *cookie, protocol_binary_request_header *request, @@ -903,16 +843,6 @@ extern "C" { case CMD_UNLOCK_KEY: res = unlockKey(h, request, &msg, &msg_size); break; - case CMD_OBSERVE: - { - rv = observeCmd(h, request, cookie, response); - return rv; - } - case CMD_UNOBSERVE: - { - rv = unobserveCmd(h, request, cookie, response); - return rv; - } case CMD_DEREGISTER_TAP_CLIENT: { rv = h->deregisterTapClient(cookie, request, response); @@ -1178,7 +1108,7 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(GET_SERVER_API get_server startedEngineThreads(false), getServerApiFunc(get_server_api), getlExtension(NULL), tapConnMap(NULL), tapConfig(NULL), checkpointConfig(NULL), - mutation_count(0), observeRegistry(&epstore, &stats), warmingUp(true), + mutation_count(0), warmingUp(true), flushAllEnabled(false), startupTime(0) { interface.interface = 1; @@ -2643,19 +2573,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie, add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie); add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(), add_stat, cookie); - add_casted_stat("ep_total_observe_sets", epstats.totalObserveSets, - add_stat, cookie); - add_casted_stat("ep_stats_observe_polls", epstats.statsObservePolls, - add_stat, cookie); - add_casted_stat("ep_observe_calls", epstats.observeCalls, add_stat, - cookie); - add_casted_stat("ep_unobserve_calls", epstats.unobserveCalls, add_stat, - cookie); - add_casted_stat("ep_observe_registry_size", epstats.obsRegSize, add_stat, - cookie); - add_casted_stat("ep_observe_errors", epstats.obsErrors, add_stat, cookie); - add_casted_stat("ep_obs_reg_clean_job", epstats.obsCleanerRuns, add_stat, - cookie); add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns, add_stat, cookie); @@ -3310,21 +3227,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDispatcherStats(const void *cook return ENGINE_SUCCESS; } -ENGINE_ERROR_CODE EventuallyPersistentEngine::doObserveStats(const void* cookie, - ADD_STAT add_stat, - const char* stat_key, - int nkey) { - stats.statsObservePolls++; - std::string obs_set(stat_key, nkey); - state_map* smap = getObserveRegistry().getObserveSetState(obs_set); - std::map::iterator itr; - for (itr = smap->begin(); itr != smap->end(); itr++) { - add_casted_stat(itr->first.c_str(), itr->second.c_str(), add_stat, cookie); - } - delete smap; - return ENGINE_SUCCESS; -} - ENGINE_ERROR_CODE EventuallyPersistentEngine::doKlogStats(const void* cookie, ADD_STAT add_stat) { const MutationLog *mutationLog(epstore->getMutationLog()); @@ -3354,8 +3256,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getStats(const void* cookie, ENGINE_ERROR_CODE rv = ENGINE_KEY_ENOENT; if (stat_key == NULL) { rv = doEngineStats(cookie, add_stat); - } else if (nkey > 8 && strncmp(stat_key, "observe ", 8) == 0) { - rv = doObserveStats(cookie, add_stat, stat_key + 8, nkey - 8); } else if (nkey > 7 && strncmp(stat_key, "tapagg ", 7) == 0) { rv = doTapAggStats(cookie, add_stat, stat_key + 7, nkey - 7); } else if (nkey == 3 && strncmp(stat_key, "tap", 3) == 0) { @@ -3543,42 +3443,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::touch(const void *cookie, return rv; } -ENGINE_ERROR_CODE EventuallyPersistentEngine::observe(const void *cookie, - std::string key, - uint64_t cas, - uint16_t vbucket, - std::string obs_set, - uint32_t expiration, - ADD_RESPONSE response) { - protocol_binary_response_status rv; - - stats.observeCalls++; - getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "observe %s %llu %s %d", - key.c_str(), cas, obs_set.c_str(), expiration); - rv = getObserveRegistry().observeKey(key, cas, vbucket, expiration, obs_set); - if (rv == PROTOCOL_BINARY_RESPONSE_ETMPFAIL) { - return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, 0, memoryCondition(), - 0, cookie); - } else if (rv == PROTOCOL_BINARY_RESPONSE_EBUSY) { - return sendResponse(response, "Observe set full", 16, NULL, 0, NULL, 0, 0, - rv, 0, cookie); - } - return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, 0, rv, 0, cookie); -} - -ENGINE_ERROR_CODE EventuallyPersistentEngine::unobserve(const void *cookie, - std::string key, - uint64_t cas, - uint16_t vbucket, - std::string obs_set, - ADD_RESPONSE response) { - stats.unobserveCalls++; - getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "unobserve %s %llu %s", - key.c_str(), cas, obs_set.c_str()); - getObserveRegistry().unobserveKey(key, cas, vbucket, obs_set); - return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, 0, 0, 0, cookie); -} - ENGINE_ERROR_CODE EventuallyPersistentEngine::handleRestoreCmd(const void *cookie, protocol_binary_request_header *request, ADD_RESPONSE response) diff --git a/ep_engine.h b/ep_engine.h index d565c1a56..f59c87467 100644 --- a/ep_engine.h +++ b/ep_engine.h @@ -9,7 +9,6 @@ #include "ep_extension.h" #include "dispatcher.hh" #include "item_pager.hh" -#include "observe_registry.hh" #include #include @@ -500,21 +499,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 { return epstore->unlockKey(key, vbucket, cas, currentTime); } - ENGINE_ERROR_CODE observe(const void *cookie, - std::string key, - uint64_t cas, - uint16_t vbucket, - std::string obs_set, - uint32_t expiration, - ADD_RESPONSE response); - - ENGINE_ERROR_CODE unobserve(const void *cookie, - std::string key, - uint64_t cas, - uint16_t vbucket, - std::string obs_set, - ADD_RESPONSE response); - RCPtr getVBucket(uint16_t vbucket) { return epstore->getVBucket(vbucket); } @@ -555,10 +539,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 { SERVER_HANDLE_V1* getServerApi() { return serverApi; } - ObserveRegistry &getObserveRegistry() { - return observeRegistry; - } - Configuration &getConfiguration() { return configuration; } @@ -701,8 +681,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 { return ret; } - ENGINE_ERROR_CODE doObserveStats(const void* cookie, ADD_STAT add_s, - const char* stat_key, int nkey); ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat); ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat); ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat); @@ -789,7 +767,6 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 { size_t getlDefaultTimeout; size_t getlMaxTimeout; EPStats stats; - ObserveRegistry observeRegistry; Configuration configuration; Atomic warmingUp; struct { diff --git a/observe_registry.cc b/observe_registry.cc deleted file mode 100644 index 0694a6f34..000000000 --- a/observe_registry.cc +++ /dev/null @@ -1,347 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -/* - * Copyright 2011 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "config.h" -#include "observe_registry.hh" -#include "command_ids.h" -#include "vbucket.hh" - -protocol_binary_response_status ObserveRegistry::observeKey(const std::string &key, - const uint64_t cas, - const uint16_t vbucket, - const uint64_t expiration, - const std::string &obs_set_name) { - LockHolder rl(registry_mutex); - std::map::iterator itr = registry.find(obs_set_name); - ObserveSet* obs_set; - if (itr == registry.end()) { - obs_set = addObserveSet(obs_set_name, expiration); - if (obs_set == NULL) { - return PROTOCOL_BINARY_RESPONSE_ETMPFAIL; - } - } else if (itr->second->isExpired()) { - removeObserveSet(itr); - obs_set = addObserveSet(obs_set_name, expiration); - if (obs_set == NULL) { - return PROTOCOL_BINARY_RESPONSE_ETMPFAIL; - } - } else { - obs_set = itr->second; - } - return obs_set->add(key, cas, vbucket); -} - -void ObserveRegistry::unobserveKey(const std::string &key, - const uint64_t cas, - const uint16_t vbucket, - const std::string &obs_set_name) { - LockHolder rl(registry_mutex); - std::map::iterator itr = registry.find(obs_set_name); - if (itr != registry.end()) { - if (itr->second->isExpired()) { - removeObserveSet(itr); - } else { - itr->second->remove(key, cas, vbucket); - } - } -} - -void ObserveRegistry::removeExpired() { - LockHolder lh(registry_mutex); - std::map::iterator itr; - for (itr = registry.begin(); itr != registry.end(); itr++) { - if (itr->second->isExpired()) { - removeObserveSet(itr); - } - } -} - -state_map* ObserveRegistry::getObserveSetState(const std::string &obs_set_name) { - LockHolder rl(registry_mutex); - std::map::iterator obs_set = registry.find(obs_set_name); - if (obs_set == registry.end()) { - getLogger()->log(EXTENSION_LOG_DEBUG, NULL, - "Tryed to get state for non-existent observe set %s", - obs_set_name.c_str()); - return new state_map; - } - return obs_set->second->getState(); -} - -void ObserveRegistry::itemsPersisted(std::list &itemlist) { - LockHolder lh(registry_mutex); - std::list::iterator itr; - for (itr = itemlist.begin(); itr != itemlist.end(); itr++) { - std::map::iterator obs_itr; - for (obs_itr = registry.begin(); obs_itr != registry.end(); obs_itr++) { - if (!obs_itr->second->isExpired()) { - StoredValue *sv = (*epstore)->getStoredValue((*itr)->getKey(), - (*itr)->getVBucketId(), - false); - obs_itr->second->keyEvent((*itr)->getKey().c_str(), - sv ? sv->getCas() : 0, - (*itr)->getVBucketId(), - OBS_PERSISTED_EVENT); - } else { - removeObserveSet(obs_itr); - } - } - } -} - -void ObserveRegistry::itemModified(const Item &itm) { - LockHolder lh(registry_mutex); - std::map::iterator itr; - for (itr = registry.begin(); itr != registry.end(); itr++) { - if (!itr->second->isExpired()) { - itr->second->keyEvent(itm.getKey().c_str(), itm.getCas(), - itm.getVBucketId(), OBS_MODIFIED_EVENT); - } else { - removeObserveSet(itr); - } - } -} - -void ObserveRegistry::itemDeleted(const std::string &key, const uint64_t cas, - const uint16_t vbucket) { - LockHolder lh(registry_mutex); - std::map::iterator itr; - for (itr = registry.begin(); itr != registry.end(); itr++) { - if (!itr->second->isExpired()) { - itr->second->keyEvent(key, cas, vbucket, OBS_DELETED_EVENT); - } else { - removeObserveSet(itr); - } - } -} - -void ObserveRegistry::itemReplicated(const Item &itm) { - LockHolder lh(registry_mutex); - std::map::iterator itr; - for (itr = registry.begin(); itr != registry.end(); itr++) { - if (!itr->second->isExpired()) { - itr->second->keyEvent(itm.getKey().c_str(), itm.getCas(), - itm.getVBucketId(), OBS_REPLICATED_EVENT); - } else { - removeObserveSet(itr); - } - } -} - -void ObserveRegistry::removeObserveSet(std::map::iterator itr) { - if (itr != registry.end()) { - delete itr->second; - registry.erase(itr); - } -} - -ObserveSet* ObserveRegistry::addObserveSet(const std::string &obs_set_name, - const uint16_t expiration) { - std::pair::iterator,bool> res; - res = registry.insert(std::pair(obs_set_name, - new ObserveSet(epstore, stats, expiration))); - if (!res.second) { - stats->obsErrors++; - return NULL; - } - stats->totalObserveSets++; - getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "Created new observe set: %s", - obs_set_name.c_str()); - return res.first->second; -} - -const hrtime_t ObserveSet::ONE_SECOND = 1000000000; - -protocol_binary_response_status ObserveSet::add(const std::string &key, uint64_t cas, - const uint16_t vbucket) { - if ((*epstore)->getVBucket(vbucket)->getState() != vbucket_state_dead) { - std::map::iterator obs_set = observe_set.find(vbucket); - if (obs_set == observe_set.end()) { - std::pair::iterator,bool> res; - res = observe_set.insert(std::pair(vbucket, - new VBObserveSet(epstore, stats))); - if (!res.second) { - lastTouched = gethrtime(); - stats->obsErrors++; - return PROTOCOL_BINARY_RESPONSE_ETMPFAIL; - } - obs_set = res.first; - } - lastTouched = gethrtime(); - if (size >= MAX_OBS_SET_SIZE) { - stats->obsErrors++; - return PROTOCOL_BINARY_RESPONSE_EBUSY; - } else if (obs_set->second->add(key, cas, vbucket)) { - size++; - return PROTOCOL_BINARY_RESPONSE_SUCCESS; - } else { - stats->obsErrors++; - return PROTOCOL_BINARY_RESPONSE_ETMPFAIL; - } - } - return PROTOCOL_BINARY_RESPONSE_SUCCESS; -} - -void ObserveSet::remove(const std::string &key, const uint64_t cas, - const uint16_t vbucket) { - if (observe_set.find(vbucket) != observe_set.end()) { - VBObserveSet *vb_observe_set = observe_set.find(vbucket)->second; - if (vb_observe_set->remove(key, cas)) { - size--; - } - lastTouched = gethrtime(); - } -} - -void ObserveSet::keyEvent(const std::string &key, const uint64_t cas, - const uint16_t vbucket, int event) { - std::map::iterator itr = observe_set.find(vbucket); - if (itr != observe_set.end()) { - itr->second->keyEvent(key, cas, event); - lastTouched = gethrtime(); - } -} - -bool ObserveSet::isExpired() { - hrtime_t now = gethrtime(); - if ((now - lastTouched) > expiration) { - return true; - } - return false; -} - - -state_map* ObserveSet::getState() { - state_map *obs_state = new state_map(); - std::map::iterator itr; - for (itr = observe_set.begin(); itr != observe_set.end(); itr++) { - if ((*epstore)->getVBucket(itr->first)->getState() == vbucket_state_active) { - VBObserveSet *vb_observe_set = itr->second; - vb_observe_set->getState(obs_state); - } - } - return obs_state; -} - -ObserveSet::~ObserveSet() { - std::map::iterator itr; - for (itr = observe_set.begin(); itr != observe_set.end(); itr++) { - size -= itr->second->size(); - delete itr->second; - stats->totalObserveSets--; - } -} - -VBObserveSet::~VBObserveSet() { - stats->obsRegSize -= keylist.size(); -} - -// Returns true if an item was added to the list -bool VBObserveSet::add(const std::string &key, const uint64_t cas, - const uint16_t vbucket) { - observed_key_t obs_key(key, cas); - std::list::iterator itr; - for (itr = keylist.begin(); itr != keylist.end(); itr++) { - if (itr->key.compare(key) == 0 && itr->cas == cas) { - return true; - } - } - StoredValue *sv = (*epstore)->getStoredValue(key, vbucket, false); - if (sv == NULL) { - obs_key.deleted = true; - } else { - obs_key.mutated = (sv->getCas() != cas); - obs_key.persisted = (sv->getCas() == cas && sv->isClean()); - } - if ((*epstore)->getVBucket(vbucket)->getState() == vbucket_state_replica) { - obs_key.replicas = -1; - } - stats->obsRegSize++; - keylist.push_back(obs_key); - return true; -} - -// Returns true if an item was removed from the list, returns false if -// the item didn't exist -bool VBObserveSet::remove(const std::string &key, const uint64_t cas) { - std::list::iterator itr; - for (itr = keylist.begin(); itr != keylist.end(); itr++) { - observed_key_t obs_key = *itr; - if (obs_key.key.compare(key) == 0 && obs_key.cas == cas) { - stats->obsRegSize--; - keylist.erase(itr); - return true; - } - } - return false; -} - -void VBObserveSet::getState(state_map *sm) { - std::list::iterator itr; - for (itr = keylist.begin(); itr != keylist.end(); itr++) { - std::stringstream state_key; - std::stringstream state_value; - state_key << itr->key << "," << itr->cas; - state_value << (int)itr->replicas << ","; - if (itr->deleted) { - state_value << "deleted"; - } - if (itr->mutated) { - if (itr->deleted) { - state_value << ","; - } - state_value << "mutated"; - } - if (itr->persisted) { - if (itr->deleted || itr->mutated) { - state_value << ","; - } - state_value << "persisted"; - } - if (!itr->persisted && !itr->mutated && !itr->deleted) { - state_value << "none"; - } - (*sm)[state_key.str()] = state_value.str(); - } -} - -void VBObserveSet::keyEvent(const std::string &key, const uint64_t cas, - int event) { - std::list::iterator itr; - for (itr = keylist.begin(); itr != keylist.end(); itr++) { - if (itr->key.compare(key) == 0 && event == OBS_DELETED_EVENT) { - itr->deleted = true; - } else if (itr->key.compare(key) == 0 && itr->cas != cas && - event == OBS_MODIFIED_EVENT) { - itr->mutated = true; - itr->deleted = false; - } else if (itr->key.compare(key) == 0 && itr->cas == cas) { - if (event == OBS_PERSISTED_EVENT) { - itr->persisted = true; - } else if (event == OBS_REPLICATED_EVENT) { - itr->replicas++; - } - } - } -} - -bool ObserveRegistryCleaner::callback(Dispatcher &d, TaskId t) { - ++stats.obsCleanerRuns; - observeRegistry.removeExpired(); - d.snooze(t, sleepTime); - return true; -} diff --git a/observe_registry.hh b/observe_registry.hh deleted file mode 100644 index 9aa7a1732..000000000 --- a/observe_registry.hh +++ /dev/null @@ -1,171 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -/* - * Copyright 2011 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef OBSERVE_REGISTRY_HH -#define OBSERVE_REGISTRY_HH 1 - -#define MAX_OBS_SET_SIZE 1000 - -#include -#include - -#include "common.hh" -#include "mutex.hh" -#include "locks.hh" -#include "dispatcher.hh" -#include "queueditem.hh" - -class ObserveRegistry; - -#include "ep.hh" - -typedef struct observed_key_t { - observed_key_t(std::string aKey, uint64_t(aCas)) - : key(aKey), cas(aCas), replicas(0), mutated(false), persisted(false), - deleted(false) { - } - - std::string key; - uint64_t cas; - uint8_t replicas; - bool mutated; - bool persisted; - bool deleted; -} observed_key_t; - -typedef std::map state_map; - -class ObserveSet; -class VBObserveSet; -class EventuallyPersistentStore; - - -class ObserveRegistry { -public: - - ObserveRegistry(EventuallyPersistentStore **e, EPStats *stats_ptr) - : epstore(e), stats(stats_ptr) { - } - - protocol_binary_response_status observeKey(const std::string &key, - const uint64_t cas, - const uint16_t vbucket, - const uint64_t expiration, - const std::string &obs_set_name); - - void unobserveKey(const std::string &key, - const uint64_t cas, - const uint16_t vbucket, - const std::string &obs_set_name); - - void removeExpired(); - - state_map* getObserveSetState(const std::string &obs_set_name); - - void itemsPersisted(std::list &itemlist); - void itemModified(const Item &item); - void itemReplicated(const Item &itm); - void itemDeleted(const std::string &key, const uint64_t cas, - const uint16_t vbucket); - -private: - - void removeObserveSet(std::map::iterator itr); - ObserveSet* addObserveSet(const std::string &obs_set_name, - const uint16_t expiration); - - std::map registry; - Mutex registry_mutex; - EventuallyPersistentStore **epstore; - EPStats *stats; -}; - -class ObserveSet { -public: - - ObserveSet(EventuallyPersistentStore **e, EPStats *stats_ptr, uint32_t exp) - : expiration(exp * ObserveSet::ONE_SECOND), epstore(e), stats(stats_ptr), - lastTouched(gethrtime()), size(0) { - } - - ~ObserveSet(); - - protocol_binary_response_status add(const std::string &key, const uint64_t cas, - const uint16_t vbucket); - void remove(const std::string &key, const uint64_t cas, - const uint16_t vbucket); - void keyEvent(const std::string &key, const uint64_t, - const uint16_t vbucket, int event); - bool isExpired(); - - state_map* getState(); - -private: - - static const hrtime_t ONE_SECOND; - const hrtime_t expiration; - std::map observe_set; - EventuallyPersistentStore **epstore; - EPStats *stats; - hrtime_t lastTouched; - int size; -}; - -class VBObserveSet { -public: - - VBObserveSet(EventuallyPersistentStore **e, EPStats *stats_ptr) - : epstore(e), stats(stats_ptr) { - } - - ~VBObserveSet(); - - bool add(const std::string &key, const uint64_t cas, const uint16_t vbucket); - bool remove(const std::string &key, const uint64_t cas); - int size(void) { return keylist.size(); }; - void getState(state_map* sm); - void keyEvent(const std::string &key, const uint64_t cas, - int event); - -private: - - std::list keylist; - EventuallyPersistentStore **epstore; - EPStats *stats; -}; - -class ObserveRegistryCleaner : public DispatcherCallback { -public: - - ObserveRegistryCleaner(ObserveRegistry &o, EPStats &st, size_t sTime) - : observeRegistry(o), stats(st), sleepTime(sTime), available(true) { - } - bool callback(Dispatcher &d, TaskId t); - - std::string description() { - return std::string("Cleaning observe registry."); - } - -private: - - ObserveRegistry &observeRegistry; - EPStats &stats; - double sleepTime; - bool available; -}; - -#endif /* OBSERVE_REGISTRY_HH */ diff --git a/priority.cc b/priority.cc index 0b510a8ae..a4fe68909 100644 --- a/priority.cc +++ b/priority.cc @@ -41,5 +41,4 @@ const Priority Priority::TapConnectionReaperPriority("tapconnection_reaper_prior const Priority Priority::ItemPagerPriority("item_pager_priority", 7); const Priority Priority::BackfillTaskPriority("backfill_task_priority", 8); const Priority Priority::HTResizePriority("hashtable_resize_priority", 211); -const Priority Priority::ObserveRegistryCleanerPriority("obs_reg_cleaneer_priority", 315); const Priority Priority::TapResumePriority("tap_resume_priority", 316); diff --git a/priority.hh b/priority.hh index 37d6e6448..a46a2740d 100644 --- a/priority.hh +++ b/priority.hh @@ -36,7 +36,6 @@ public: static const Priority TapResumePriority; static const Priority TapConnectionReaperPriority; static const Priority HTResizePriority; - static const Priority ObserveRegistryCleanerPriority; bool operator==(const Priority &other) const { return other.getPriorityValue() == this->priority; diff --git a/stats.hh b/stats.hh index e1a1189a3..db45f5118 100644 --- a/stats.hh +++ b/stats.hh @@ -275,25 +275,6 @@ public: //! The number of delete with meta operations Atomic numOpsDelMeta; - // - // Observe Stats - // - - //! The number of observe sets - Atomic totalObserveSets; - //! The number of stats observe polls - Atomic statsObservePolls; - //! The number of observe polls - Atomic observeCalls; - //! The number of unobserve polls - Atomic unobserveCalls; - //! The number of items in the observe registry - Atomic obsRegSize; - //! The number of observe errors - Atomic obsErrors; - //! The number of times the observe registry cleaner has run - Atomic obsCleanerRuns; - //! The number of tiems the mutation log compactor is exectued Atomic mlogCompactorRuns; @@ -428,11 +409,6 @@ public: vbucketDelMaxWalltime.set(0); vbucketDelTotWalltime.set(0); - statsObservePolls.set(0); - observeCalls.set(0); - unobserveCalls.set(0); - obsErrors.set(0); - obsCleanerRuns.set(0); mlogCompactorRuns.set(0); pendingOpsHisto.reset(); diff --git a/win32/Makefile.mingw b/win32/Makefile.mingw index 56581ee82..1dabeb947 100644 --- a/win32/Makefile.mingw +++ b/win32/Makefile.mingw @@ -81,7 +81,6 @@ EP_ENGINE_CC_SRC = \ mutation_log_compactor.cc \ mutex.cc \ objectregistry.cc \ - observe_registry.cc \ priority.cc \ queueditem.cc \ restore_impl.cc \