Skip to content

Commit

Permalink
Gossip forgotten nodes on CLUSTER FORGET (redis#10869)
Browse files Browse the repository at this point in the history
Gossip the cluster node blacklist in ping and pong messages.
This means that CLUSTER FORGET doesn't need to be sent to all nodes in a cluster.
It can be sent to one or more nodes and then be propagated to the rest of them.

For each blacklisted node, its node id and its remaining blacklist TTL is gossiped in a
cluster bus ping extension (introduced in redis#9530).
  • Loading branch information
zuiderkwast authored and enjoy-binbin committed Jul 31, 2023
1 parent 9922c57 commit 8974781
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
53 changes: 52 additions & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,21 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) {
return extension_size;
}

/* Write the forgotten node ping extension at the start of the cursor, update
* the cursor to point to the end of the written extension and return the number
* of bytes written. */
int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) {
serverAssert(sdslen(name) == CLUSTER_NAMELEN);
clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node;
memcpy(ext->name, name, CLUSTER_NAMELEN);
ext->ttl = htonu64(ttl);
uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode);
(*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE);
(*cursor)->length = htonl(extension_size);
*cursor = (clusterMsgPingExt *) (ext->name + sizeof(clusterMsgPingExtForgottenNode));
return extension_size;
}

/* We previously validated the extensions, so this function just needs to
* handle the extensions. */
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
Expand All @@ -2052,6 +2067,19 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname;
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
if (n && n != myself && !(nodeIsSlave(myself) && myself->slaveof == n)) {
sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN);
dictEntry *de = dictAddRaw(server.cluster->nodes_black_list, id, NULL);
serverAssert(de != NULL);
uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl);
dictSetUnsignedIntegerVal(de, expire);
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
}
} else {
/* Unknown type, we will ignore it but log what happened. */
serverLog(LL_WARNING, "Received unknown extension type %d", type);
Expand Down Expand Up @@ -2951,6 +2979,8 @@ void clusterSendPing(clusterLink *link, int type) {
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
estlen += getHostnamePingExtSize();
estlen += dictSize(server.cluster->nodes_black_list) *
(sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode));

/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
Expand Down Expand Up @@ -3031,6 +3061,22 @@ void clusterSendPing(clusterLink *link, int type) {
extensions++;
}

/* Gossip forgotten nodes */
if (dictSize(server.cluster->nodes_black_list) > 0) {
dictIterator *di = dictGetIterator(server.cluster->nodes_black_list);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
sds name = dictGetKey(de);
uint64_t expire = dictGetUnsignedIntegerVal(de);
if ((time_t)expire < server.unixtime) continue; /* already expired */
uint64_t ttl = expire - server.unixtime;
hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
totlen += writeForgottenNodePingExt(&cursor, name, ttl);
extensions++;
}
dictReleaseIterator(di);
}

/* Compute the actual total length and send! */
totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
Expand Down Expand Up @@ -5639,7 +5685,12 @@ NULL
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
if (clusterBlacklistExists((char*)c->argv[2]->ptr))
/* Already forgotten. The deletion may have been gossipped by
* another node, so we pretend it succeeded. */
addReply(c,shared.ok);
else
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else if (n == myself) {
addReplyError(c,"I tried hard but I can't forget myself...");
Expand Down
9 changes: 9 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ typedef struct {
* consistent manner. */
typedef enum {
CLUSTERMSG_EXT_TYPE_HOSTNAME,
CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE,
} clusterMsgPingtypes;

/* Helper function for making sure extensions are eight byte aligned. */
Expand All @@ -262,12 +263,20 @@ typedef struct {
char hostname[1]; /* The announced hostname, ends with \0. */
} clusterMsgPingExtHostname;

typedef struct {
char name[CLUSTER_NAMELEN]; /* Node name. */
uint64_t ttl; /* Remaining time to blacklist the node, in seconds. */
} clusterMsgPingExtForgottenNode;

static_assert(sizeof(clusterMsgPingExtForgottenNode) % 8 == 0, "");

typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union {
clusterMsgPingExtHostname hostname;
clusterMsgPingExtForgottenNode forgotten_node;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
Expand Down
23 changes: 18 additions & 5 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
# Returns 1 if no node knows node_id, 0 if any node knows it.
proc node_is_forgotten {node_id} {
for {set j 0} {$j < [llength $::servers]} {incr j} {
set cluster_nodes [R $j CLUSTER NODES]
if { [string match "*$node_id*" $cluster_nodes] } {
return 0
}
}
return 1
}

# Isolate a node from the cluster and give it a new nodeid
proc isolate_node {id} {
set node_id [R $id CLUSTER MYID]
R $id CLUSTER RESET HARD
for {set j 0} {$j < [llength $::servers]} {incr j} {
if { $j eq $id } {
continue
}
R $j CLUSTER FORGET $node_id
# Here we additionally test that CLUSTER FORGET propagates to all nodes.
set other_id [expr $id == 0 ? 1 : 0]
R $other_id CLUSTER FORGET $node_id
wait_for_condition 50 100 {
[node_is_forgotten $node_id]
} else {
fail "CLUSTER FORGET was not propagated to all nodes"
}
}

Expand Down

0 comments on commit 8974781

Please sign in to comment.