Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update slotmap when slot is not served by any node #192

Merged
merged 8 commits into from
Sep 29, 2023
33 changes: 28 additions & 5 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2230,14 +2230,22 @@ static void *redis_cluster_command_execute(redisClusterContext *cc,

node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
goto error;
/* Update the slotmap since the slot is not served. */
if (redisClusterUpdateSlotmap(cc) != REDIS_OK) {
goto error;
}
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
/* Return error since the slot is still not served. */
goto error;
}
}

c = ctx_get_by_node(cc, node);
if (c == NULL || c->err) {
/* Failed to connect. Maybe there was a failover and this node is gone.
* Update slotmap to find out. */
if (cluster_update_route(cc) != REDIS_OK) {
if (redisClusterUpdateSlotmap(cc) != REDIS_OK) {
goto error;
}

Expand Down Expand Up @@ -3613,6 +3621,18 @@ static void unlinkAsyncContextAndNode(void *data) {
}
}

/* Reply callback function for AUTH */
void authReplyCallback(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;

if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
__redisClusterAsyncSetError(
acc, REDIS_ERR_OTHER, (reply ? reply->str : "failed to send AUTH"));
redisAsyncDisconnect(ac);
}
}

redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
redisClusterNode *node) {
redisAsyncContext *ac;
Expand Down Expand Up @@ -3674,11 +3694,11 @@ redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
// Authenticate when needed
if (acc->cc->password != NULL) {
if (acc->cc->username != NULL) {
ret = redisAsyncCommand(ac, NULL, NULL, "AUTH %s %s",
ret = redisAsyncCommand(ac, authReplyCallback, acc, "AUTH %s %s",
acc->cc->username, acc->cc->password);
} else {
ret =
redisAsyncCommand(ac, NULL, NULL, "AUTH %s", acc->cc->password);
ret = redisAsyncCommand(ac, authReplyCallback, acc, "AUTH %s",
acc->cc->password);
}

if (ret != REDIS_OK) {
Expand Down Expand Up @@ -4135,6 +4155,9 @@ int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc,

node = node_get_by_table(cc, (uint32_t)slot_num);
if (node == NULL) {
/* Initiate a slotmap update since the slot is not served. */
throttledUpdateSlotMapAsync(acc, NULL);

/* node_get_by_table() has set the error on cc. */
__redisClusterAsyncSetError(acc, cc->err, cc->errstr);
goto error;
Expand Down
8 changes: 8 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,11 @@ add_test(NAME cluster-scale-down-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/cluster-scale-down-test.sh"
"$<TARGET_FILE:clusterclient>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME slots-not-served-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/slots-not-served-test.sh"
"$<TARGET_FILE:clusterclient>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME slots-not-served-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/slots-not-served-test-async.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
11 changes: 9 additions & 2 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,15 @@ void sendNextCommand(int fd, short kind, void *arg) {
} else {
int status = redisClusterAsyncCommand(
acc, replyCallback, (void *)((intptr_t)num_running), cmd);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
if (status == REDIS_OK) {
num_running++;
} else {
printf("error: %s\n", acc->errstr);
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

/* Schedule a read from stdin and handle next command. */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, NULL);
}
}

if (async)
Expand Down
10 changes: 9 additions & 1 deletion tests/ct_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ void test_async_password_wrong(void) {
assert(acc->err == REDIS_ERR_OTHER);
assert(strcmp(acc->errstr, "slotmap not available") == 0);

/* Since the owner of the slot is unknown the redisClusterAsyncCommand will
initite a slotmap update. This update will also get a WRONGPASS error. */

event_base_dispatch(base);
bjosv marked this conversation as resolved.
Show resolved Hide resolved

redisClusterAsyncFree(acc);
Expand Down Expand Up @@ -434,7 +437,12 @@ void test_async_password_missing(void) {
assert(acc->err == REDIS_ERR_OTHER);
assert(strcmp(acc->errstr, "slotmap not available") == 0);

event_base_dispatch(base);
/* Since the owner of the slot is unknown the redisClusterAsyncCommand will
initite a slotmap update. This update will register a new socket fd in the
event system, which will block libevents API `event_base_dispatch()` until
the socket is closed. Use alternative API to run the event loop once only
to avoid this problem. */
event_base_loop(base, EVLOOP_ONCE);

redisClusterAsyncFree(acc);
bjosv marked this conversation as resolved.
Show resolved Hide resolved
event_base_free(base);
Expand Down
67 changes: 67 additions & 0 deletions tests/scripts/slots-not-served-test-async.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/bash

# Usage: $0 /path/to/clusterclient-binary
bjosv marked this conversation as resolved.
Show resolved Hide resolved

clientprog=${1:-./clusterclient-async}
testname=slots-not-served-test-async

# Sync processes waiting for CONT signals.
perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' &
syncpid1=$!;

# Start simulated redis node #1
timeout 5s ./simulated-redis.pl -p 7401 -d --sigcont $syncpid1 <<'EOF' &
# The initial slotmap is not covering all slots.
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 1, ["127.0.0.1", 7401, "nodeid7401"]]]
EXPECT CLOSE

# Slotmap update due to slot not served.
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 16383, ["127.0.0.1", 7401, "nodeid7401"]]]

EXPECT ["GET", "foo"]
SEND "bar"
EXPECT CLOSE
EOF
server1=$!

# Wait until node is ready to accept client connections
wait $syncpid1;

# Run client
timeout 3s "$clientprog" --events 127.0.0.1:7401 > "$testname.out" <<'EOF'
GET foo
# Allow slotmap update to finish.
!sleep
GET foo
EOF
clientexit=$?

# Wait for server to exit
wait $server1; server1exit=$?

# Check exit statuses
if [ $server1exit -ne 0 ]; then
echo "Simulated server #1 exited with status $server1exit"
exit $server1exit
fi
if [ $clientexit -ne 0 ]; then
echo "$clientprog exited with status $clientexit"
exit $clientexit
fi

# Check the output from clusterclient
expected="Event: slotmap-updated
Event: ready
error: slot not served by any node
Event: slotmap-updated
bar
Event: free-context"

echo "$expected" | diff -u - "$testname.out" || exit 99

# Clean up
rm "$testname.out"
76 changes: 76 additions & 0 deletions tests/scripts/slots-not-served-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/bin/bash

# Usage: $0 /path/to/clusterclient-binary

clientprog=${1:-./clusterclient}
testname=slots-not-served-test

# Sync processes waiting for CONT signals.
perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' &
syncpid1=$!;

# Start simulated redis node #1
timeout 5s ./simulated-redis.pl -p 7401 -d --sigcont $syncpid1 <<'EOF' &
# The initial slotmap is not covering all slots.
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 1, ["127.0.0.1", 7401, "nodeid7401"]]]
EXPECT CLOSE

# Slotmap update due to the slot for `foo1` is not served.
# The reply is still missing slots.
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 1, ["127.0.0.1", 7401, "nodeid7401"]]]
EXPECT CLOSE

# Slotmap update due to the slot for `foo2` is not served.
# The reply now has full slot coverage.
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 16383, ["127.0.0.1", 7401, "nodeid7401"]]]
EXPECT CLOSE

EXPECT CONNECT
EXPECT ["GET", "foo2"]
SEND "bar2"
EXPECT CLOSE
EOF
server1=$!

# Wait until node is ready to accept client connections
wait $syncpid1;

# Run client
timeout 3s "$clientprog" --events 127.0.0.1:7401 > "$testname.out" <<'EOF'
GET foo1
GET foo2
EOF
clientexit=$?

# Wait for server to exit
wait $server1; server1exit=$?

# Check exit statuses
if [ $server1exit -ne 0 ]; then
echo "Simulated server #1 exited with status $server1exit"
exit $server1exit
fi
if [ $clientexit -ne 0 ]; then
echo "$clientprog exited with status $clientexit"
exit $clientexit
fi

# Check the output from clusterclient
expected="Event: slotmap-updated
Event: ready
Event: slotmap-updated
error: slot not served by any node
Event: slotmap-updated
bar2
Event: free-context"

echo "$expected" | diff -u - "$testname.out" || exit 99

# Clean up
rm "$testname.out"