Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Support for resetting the replication chain.

This change supports resetting the replication chain. For example,
for a given replication chain A->B->C, if A receives a
RESET_REPLICATION_CHAIN message from ns_server, A sends
INIT_VBUCKET_STREAM messages to B to reset its replica vbuckets, and
then backfills all items to B. Subsequently, B sends the same
sequence of messages to C.

Change-Id: Ia020580472b67b18033f8e3c51851aeb47324ff5
Reviewed-on: http://review.couchbase.org/7005
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
Reviewed-by: Bin Cui <bin.cui@gmail.com>
  • Loading branch information...
commit 391a1de2781b55a76b157b536552131bbdf950b3 1 parent 395a83e
@chiyoung chiyoung authored
View
7 command_ids.h
@@ -116,6 +116,13 @@
*/
#define CMD_DEREGISTER_TAP_CLIENT 0x9e
+/**
+ * Reset the replication chain from the node that receives this command. For example, given
+ * the replication chain, A->B->C, if A receives this command, it will reset all the replica
+ * vbuckets on B and C, which are replicated from A.
+ */
+#define CMD_RESET_REPLICATION_CHAIN 0x9f
+
/*
* IDs for the events of the SYNC command.
*/
View
15 ep_engine.cc
@@ -790,6 +790,8 @@ extern "C" {
break;
case CMD_LAST_CLOSED_CHECKPOINT:
return h->handleGetLastClosedCheckpointId(cookie, request, response);
+ case CMD_RESET_REPLICATION_CHAIN:
+ return h->resetReplicationChain(cookie, request, response);
}
// Send a special response for getl since we don't want to send the key
@@ -4019,3 +4021,16 @@ EventuallyPersistentEngine::handleGetLastClosedCheckpointId(const void *cookie,
}
return ENGINE_FAILED;
}
+
+ENGINE_ERROR_CODE
+EventuallyPersistentEngine::resetReplicationChain(const void *cookie,
+ protocol_binary_request_header *req,
+ ADD_RESPONSE response) {
+ (void) req;
+ tapConnMap.resetReplicaChain();
+ if (response(NULL, 0, NULL, 0, NULL, 0,
+ PROTOCOL_BINARY_RAW_BYTES, PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie)) {
+ return ENGINE_SUCCESS;
+ }
+ return ENGINE_FAILED;
+}
View
4 ep_engine.h
@@ -561,6 +561,10 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
protocol_binary_request_header *request,
ADD_RESPONSE response);
+ ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
+ protocol_binary_request_header *request,
+ ADD_RESPONSE response);
+
size_t getGetlDefaultTimeout() { return getlDefaultTimeout; }
size_t getGetlMaxTimeout() { return getlMaxTimeout; }
View
3  management/mc_bin_client.py
@@ -407,3 +407,6 @@ def deregister_tap_client(self, tap_name):
"""Deregister the TAP client with a given name."""
return self._doCmd(memcacheConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0)
+ def reset_replication_chain(self):
+ """Reset the replication chain."""
+ return self._doCmd(memcacheConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0)
View
3  management/memcacheConstants.py
@@ -58,6 +58,9 @@
# TAP client registration
CMD_DEREGISTER_TAP_CLIENT = 0x9e
+# Reset replication chain
+CMD_RESET_REPLICATION_CHAIN = 0x9f
+
# Replication
CMD_TAP_CONNECT = 0x40
CMD_TAP_MUTATION = 0x41
View
5 tapconnection.hh
@@ -956,6 +956,11 @@ private:
void setVBucketFilter(const std::vector<uint16_t> &vbuckets);
+ const VBucketFilter &getVBucketFilter() {
+ LockHolder lh(queueLock);
+ return vbucketFilter;
+ }
+
bool checkVBucketFilter(uint16_t vbucket) {
LockHolder lh(queueLock);
return vbucketFilter(vbucket);
View
19 tapconnmap.cc
@@ -399,6 +399,25 @@ void TapConnMap::scheduleBackfill(const std::set<uint16_t> &backfillVBuckets) {
}
}
+void TapConnMap::resetReplicaChain() {
+ LockHolder lh(notifySync);
+ rel_time_t now = ep_current_time();
+ std::list<TapConnection*>::iterator it = all.begin();
+ for (; it != all.end(); ++it) {
+ TapConnection *tc = *it;
+ TapProducer *tp = dynamic_cast<TapProducer*>(tc);
+ if (!tp || tp->getExpiryTime() <= now) {
+ continue;
+ }
+ // Get the list of vbuckets that each TAP producer is replicating
+ const std::vector<uint16_t> &vblist = tp->getVBucketFilter().getVector();
+ // TAP producer sends INITIAL_VBUCKET_STREAM messages to the destination to reset
+ // replica vbuckets, and then backfills items to the destination.
+ tp->scheduleBackfill(vblist);
+ }
+ notifySync.notify();
+}
+
void TapConnMap::notifyIOThreadMain() {
// To avoid connections to be stucked in a bogus state forever, we're going
// to ping all connections that hasn't tried to walk the tap queue
View
2  tapconnmap.hh
@@ -252,6 +252,8 @@ public:
void scheduleBackfill(const std::set<uint16_t> &backfillVBuckets);
+ void resetReplicaChain();
+
private:
TapConnection *findByName_UNLOCKED(const std::string &name);
Please sign in to comment.
Something went wrong with that request. Please try again.