Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,130 @@ unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {

int reply_count = 0;
char *hostname;
void *node_replylen = addReplyDeferredLen(c);

addReplyBulkCString(c, "id");
addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
reply_count++;

if (clusterNodeTcpPort(node)) {
addReplyBulkCString(c, "port");
addReplyLongLong(c, clusterNodeTcpPort(node));
reply_count++;
}

if (clusterNodeTlsPort(node)) {
addReplyBulkCString(c, "tls-port");
addReplyLongLong(c, clusterNodeTlsPort(node));
reply_count++;
}

addReplyBulkCString(c, "ip");
addReplyBulkCString(c, clusterNodeIp(node));
reply_count++;

addReplyBulkCString(c, "endpoint");
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
reply_count++;

hostname = clusterNodeHostname(node);
if (hostname != NULL && *hostname != '\0') {
addReplyBulkCString(c, "hostname");
addReplyBulkCString(c, hostname);
reply_count++;
}

long long node_offset;
if (clusterNodeIsMyself(node)) {
node_offset = clusterNodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = clusterNodeReplOffset(node);
}

addReplyBulkCString(c, "role");
addReplyBulkCString(c, clusterNodeIsSlave(node) ? "replica" : "master");
reply_count++;

addReplyBulkCString(c, "replication-offset");
addReplyLongLong(c, node_offset);
reply_count++;

addReplyBulkCString(c, "health");
const char *health_msg = NULL;
if (clusterNodeIsFailing(node)) {
health_msg = "fail";
} else if (clusterNodeIsSlave(node) && node_offset == 0) {
health_msg = "loading";
} else {
health_msg = "online";
}
addReplyBulkCString(c, health_msg);
reply_count++;

setDeferredMapLen(c, node_replylen, reply_count);
}

static clusterNode *clusterGetMasterFromShard(void *shard_handle) {
clusterNode *n = NULL;
void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
while((n = clusterShardNodeIteratorNext(node_it)) != NULL) {
if (!clusterNodeIsFailing(n)) {
break;
}
}
clusterShardNodeIteratorFree(node_it);
if (!n) return NULL;
return clusterNodeGetMaster(n);
}

/* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, void *shard_handle) {
serverAssert(clusterGetShardNodeCount(shard_handle) > 0);
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Use slot_info_pairs from the primary only */
clusterNode *master_node = clusterGetMasterFromShard(shard_handle);

if (master_node && clusterNodeHasSlotInfo(master_node)) {
serverAssert((clusterNodeSlotInfoCount(master_node) % 2) == 0);
addReplyArrayLen(c, clusterNodeSlotInfoCount(master_node));
for (int i = 0; i < clusterNodeSlotInfoCount(master_node); i++)
addReplyLongLong(c, (unsigned long)clusterNodeSlotInfoEntry(master_node, i));
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, clusterGetShardNodeCount(shard_handle));
void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
for (clusterNode *n = clusterShardNodeIteratorNext(node_it); n != NULL; n = clusterShardNodeIteratorNext(node_it)) {
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
clusterShardNodeIteratorFree(node_it);
}

/* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
void clusterCommandShards(client *c) {
addReplyArrayLen(c, clusterGetShardCount());
/* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0);
dictIterator *shard_it = clusterGetShardIterator();
for(void *shard_handle = clusterNextShardHandle(shard_it); shard_handle != NULL; shard_handle = clusterNextShardHandle(shard_it)) {
addShardReplyForClusterShards(c, shard_handle);
}
clusterFreeShardIterator(shard_it);
}

void clusterCommandHelp(client *c) {
const char *help[] = {
"COUNTKEYSINSLOT <slot>",
Expand Down
21 changes: 20 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ int clusterManualFailoverTimeLimit(void);
void clusterCommandSlots(client * c);
void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c);
void clusterCommandShards(client *c);

sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);

int clusterNodeCoversSlot(clusterNode *n, int slot);
Expand Down Expand Up @@ -142,4 +142,23 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);

void clusterGenNodesSlotsInfo(int filter);
void clusterFreeNodesSlotsInfo(clusterNode *n);
int clusterNodeSlotInfoCount(clusterNode *n);
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx);
int clusterNodeHasSlotInfo(clusterNode *n);

int clusterGetShardCount(void);
void *clusterGetShardIterator(void);
void *clusterNextShardHandle(void *shard_iterator);
void clusterFreeShardIterator(void *shard_iterator);
int clusterGetShardNodeCount(void *shard);
void *clusterShardHandleGetNodeIterator(void *shard);
clusterNode *clusterShardNodeIteratorNext(void *node_iterator);
void clusterShardNodeIteratorFree(void *node_iterator);
clusterNode *clusterShardNodeFirst(void *shard);

int clusterNodeTcpPort(clusterNode *node);
int clusterNodeTlsPort(clusterNode *node);
#endif /* __CLUSTER_H */
139 changes: 47 additions & 92 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5582,113 +5582,68 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
void *node_replylen = addReplyDeferredLen(c);
addReplyBulkCString(c, "id");
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
reply_count++;

if (node->tcp_port) {
addReplyBulkCString(c, "port");
addReplyLongLong(c, node->tcp_port);
reply_count++;
}

if (node->tls_port) {
addReplyBulkCString(c, "tls-port");
addReplyLongLong(c, node->tls_port);
reply_count++;
}
int clusterGetShardCount(void) {
return dictSize(server.cluster->shards);
}

addReplyBulkCString(c, "ip");
addReplyBulkCString(c, node->ip);
reply_count++;
void *clusterGetShardIterator(void) {
return dictGetSafeIterator(server.cluster->shards);
}

addReplyBulkCString(c, "endpoint");
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
reply_count++;
void *clusterNextShardHandle(void *shard_iterator) {
dictEntry *de = dictNext(shard_iterator);
if(de == NULL) return NULL;
return dictGetVal(de);
}

if (sdslen(node->hostname) != 0) {
addReplyBulkCString(c, "hostname");
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname));
reply_count++;
}
void clusterFreeShardIterator(void *shard_iterator) {
dictReleaseIterator(shard_iterator);
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
int clusterNodeHasSlotInfo(clusterNode *n) {
return n->slot_info_pairs != NULL;
}

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
reply_count++;
int clusterNodeSlotInfoCount(clusterNode *n) {
return n->slot_info_pairs_count;
}

addReplyBulkCString(c, "replication-offset");
addReplyLongLong(c, node_offset);
reply_count++;
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx) {
return n->slot_info_pairs[idx];
}

addReplyBulkCString(c, "health");
const char *health_msg = NULL;
if (nodeFailed(node)) {
health_msg = "fail";
} else if (nodeIsSlave(node) && node_offset == 0) {
health_msg = "loading";
} else {
health_msg = "online";
}
addReplyBulkCString(c, health_msg);
reply_count++;
int clusterGetShardNodeCount(void *shard) {
return listLength((list*)shard);
}

setDeferredMapLen(c, node_replylen, reply_count);
void *clusterShardHandleGetNodeIterator(void *shard) {
listIter *li = zmalloc(sizeof(listIter));
listRewind((list*)shard, li);
return li;
}

/* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, list *nodes) {
serverAssert(listLength(nodes) > 0);
clusterNode *n = listNodeValue(listFirst(nodes));
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");
void clusterShardNodeIteratorFree(void *node_iterator) {
zfree(node_iterator);
}

/* Use slot_info_pairs from the primary only */
n = clusterNodeGetMaster(n);
clusterNode *clusterShardNodeIteratorNext(void *node_iterator) {
listNode *item = listNext((listIter*)node_iterator);
if (item == NULL) return NULL;
return listNodeValue(item);
}

if (n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++)
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}
clusterNode *clusterShardNodeFirst(void *shard) {
listNode *item = listFirst((list*)shard);
if (item == NULL) return NULL;
return listNodeValue(item);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
int clusterNodeTcpPort(clusterNode *node) {
return node->tcp_port;
}

/* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
void clusterCommandShards(client *c) {
addReplyArrayLen(c, dictSize(server.cluster->shards));
/* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0);
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
addShardReplyForClusterShards(c, dictGetVal(de));
}
dictReleaseIterator(di);
int clusterNodeTlsPort(clusterNode *node) {
return node->tls_port;
}

sds genClusterInfoString(void) {
Expand Down
12 changes: 8 additions & 4 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -3778,7 +3778,9 @@ struct COMMAND_ARG HPEXPIRETIME_Args[] = {

#ifndef SKIP_CMD_TIPS_TABLE
/* HPTTL tips */
#define HPTTL_Tips NULL
const char *HPTTL_Tips[] = {
"nondeterministic_output",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
Expand Down Expand Up @@ -3956,7 +3958,9 @@ struct COMMAND_ARG HSTRLEN_Args[] = {

#ifndef SKIP_CMD_TIPS_TABLE
/* HTTL tips */
#define HTTL_Tips NULL
const char *HTTL_Tips[] = {
"nondeterministic_output",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
Expand Down Expand Up @@ -11044,13 +11048,13 @@ struct COMMAND_STRUCT redisCommandTable[] = {
{MAKE_CMD("hpexpire","Set expiry for hash field using relative time to expire (milliseconds)","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HPEXPIRE_History,0,HPEXPIRE_Tips,0,hpexpireCommand,-6,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HASH,HPEXPIRE_Keyspecs,1,NULL,4),.args=HPEXPIRE_Args},
{MAKE_CMD("hpexpireat","Set expiry for hash field using an absolute Unix timestamp (milliseconds)","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HPEXPIREAT_History,0,HPEXPIREAT_Tips,0,hpexpireatCommand,-6,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HASH,HPEXPIREAT_Keyspecs,1,NULL,4),.args=HPEXPIREAT_Args},
{MAKE_CMD("hpexpiretime","Returns the expiration time of a hash field as a Unix timestamp, in msec.","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HPEXPIRETIME_History,0,HPEXPIRETIME_Tips,0,hpexpiretimeCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_HASH,HPEXPIRETIME_Keyspecs,1,NULL,2),.args=HPEXPIRETIME_Args},
{MAKE_CMD("hpttl","Returns the TTL in milliseconds of a hash field.","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HPTTL_History,0,HPTTL_Tips,0,hpttlCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_HASH,HPTTL_Keyspecs,1,NULL,2),.args=HPTTL_Args},
{MAKE_CMD("hpttl","Returns the TTL in milliseconds of a hash field.","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HPTTL_History,0,HPTTL_Tips,1,hpttlCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_HASH,HPTTL_Keyspecs,1,NULL,2),.args=HPTTL_Args},
{MAKE_CMD("hrandfield","Returns one or more random fields from a hash.","O(N) where N is the number of fields returned","6.2.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HRANDFIELD_History,0,HRANDFIELD_Tips,1,hrandfieldCommand,-2,CMD_READONLY,ACL_CATEGORY_HASH,HRANDFIELD_Keyspecs,1,NULL,2),.args=HRANDFIELD_Args},
{MAKE_CMD("hscan","Iterates over fields and values of a hash.","O(1) for every call. O(N) for a complete iteration, including enough command calls for the cursor to return back to 0. N is the number of elements inside the collection.","2.8.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSCAN_History,0,HSCAN_Tips,1,hscanCommand,-3,CMD_READONLY,ACL_CATEGORY_HASH,HSCAN_Keyspecs,1,NULL,5),.args=HSCAN_Args},
{MAKE_CMD("hset","Creates or modifies the value of a field in a hash.","O(1) for each field/value pair added, so O(N) to add N field/value pairs when the command is called with multiple field/value pairs.","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSET_History,1,HSET_Tips,0,hsetCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HASH,HSET_Keyspecs,1,NULL,2),.args=HSET_Args},
{MAKE_CMD("hsetnx","Sets the value of a field in a hash only when the field doesn't exist.","O(1)","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSETNX_History,0,HSETNX_Tips,0,hsetnxCommand,4,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HASH,HSETNX_Keyspecs,1,NULL,3),.args=HSETNX_Args},
{MAKE_CMD("hstrlen","Returns the length of the value of a field.","O(1)","3.2.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HSTRLEN_History,0,HSTRLEN_Tips,0,hstrlenCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_HASH,HSTRLEN_Keyspecs,1,NULL,2),.args=HSTRLEN_Args},
{MAKE_CMD("httl","Returns the TTL in seconds of a hash field.","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HTTL_History,0,HTTL_Tips,0,httlCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_HASH,HTTL_Keyspecs,1,NULL,2),.args=HTTL_Args},
{MAKE_CMD("httl","Returns the TTL in seconds of a hash field.","O(N) where N is the number of specified fields","7.4.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HTTL_History,0,HTTL_Tips,1,httlCommand,-5,CMD_READONLY|CMD_FAST,ACL_CATEGORY_HASH,HTTL_Keyspecs,1,NULL,2),.args=HTTL_Args},
{MAKE_CMD("hvals","Returns all values in a hash.","O(N) where N is the size of the hash.","2.0.0",CMD_DOC_NONE,NULL,NULL,"hash",COMMAND_GROUP_HASH,HVALS_History,0,HVALS_Tips,1,hvalsCommand,2,CMD_READONLY,ACL_CATEGORY_HASH,HVALS_Keyspecs,1,NULL,1),.args=HVALS_Args},
/* hyperloglog */
{MAKE_CMD("pfadd","Adds elements to a HyperLogLog key. Creates the key if it doesn't exist.","O(1) to add every element.","2.8.9",CMD_DOC_NONE,NULL,NULL,"hyperloglog",COMMAND_GROUP_HYPERLOGLOG,PFADD_History,0,PFADD_Tips,0,pfaddCommand,-2,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_HYPERLOGLOG,PFADD_Keyspecs,1,NULL,2),.args=PFADD_Args},
Expand Down
3 changes: 3 additions & 0 deletions src/commands/hpttl.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"acl_categories": [
"HASH"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT"
],
"key_specs": [
{
"flags": [
Expand Down
3 changes: 3 additions & 0 deletions src/commands/httl.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"acl_categories": [
"HASH"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT"
],
"key_specs": [
{
"flags": [
Expand Down
Loading