Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ASKING command #2273

Merged
merged 5 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,13 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving this slot
}

if (myself_ && myself_->importing_slot == slot && conn->IsImporting()) {
if (myself_ && myself_->importing_slot == slot &&
(conn->IsImporting() || conn->IsFlagEnabled(redis::Connection::kAsking))) {
// While data migrating, the topology of the destination node has not been changed.
// The destination node has to serve the requests from the migrating slot,
// although the slot is not belong to itself. Therefore, we record the importing slot
// and mark the importing connection to accept the importing data.
return Status::OK(); // I'm serving the importing connection
return Status::OK(); // I'm serving the importing connection or asking connection
}

if (myself_ && imported_slots_.count(slot)) {
Expand Down
12 changes: 11 additions & 1 deletion src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,19 @@ class CommandReadWrite : public Commander {
}
};

class CommandAsking : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
conn->EnableFlag(redis::Connection::kAsking);
*output = redis::SimpleString("OK");
caipengbo marked this conversation as resolved.
Show resolved Hide resolved
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster no-multi", 0, 0, 0),
MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster no-multi", 0, 0, 0), )
MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster no-multi", 0, 0, 0),
MakeCmdAttr<CommandAsking>("asking", 1, "cluster", 0, 0, 0), )

} // namespace redis
6 changes: 6 additions & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ std::string Connection::GetFlags() const {
if (IsFlagEnabled(kSlave)) flags.append("S");
if (IsFlagEnabled(kCloseAfterReply)) flags.append("c");
if (IsFlagEnabled(kMonitor)) flags.append("M");
if (IsFlagEnabled(kAsking)) flags.append("A");
if (!subscribe_channels_.empty() || !subscribe_patterns_.empty()) flags.append("P");
if (flags.empty()) flags = "N";
return flags;
Expand Down Expand Up @@ -504,6 +505,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}
}

// reset the ASKING flag after executing the next query
if (IsFlagEnabled(kAsking)) {
DisableFlag(kAsking);
}

// We don't execute commands, but queue them, ant then execute in EXEC command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdMulti)) {
multi_cmds_.emplace_back(cmd_tokens);
Expand Down
1 change: 1 addition & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Connection : public EvbufCallbackBase<Connection> {
kCloseAsync = 1 << 7,
kMultiExec = 1 << 8,
kReadOnly = 1 << 9,
kAsking = 1 << 10,
};

explicit Connection(bufferevent *bev, Worker *owner);
Expand Down
41 changes: 41 additions & 0 deletions tests/gocase/integration/slotimport/slotimport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,44 @@ func TestImportedServer(t *testing.T) {
require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})
}

func TestServiceImportingSlot(t *testing.T) {
ctx := context.Background()

mockID0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
mockSrv0Host := "127.0.0.1"
mockSrv0Port := 6666

srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srv1.Close() }()
rdb1 := srv1.NewClient()
defer func() { require.NoError(t, rdb1.Close()) }()
id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())

clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", mockID0, mockSrv0Host, mockSrv0Port)
clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, srv1.Host(), srv1.Port())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())

slotNum := 1
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 0).Val())

// create a new client that is not importing
cli := srv1.NewClient()
slotKey := util.SlotTable[slotNum]

t.Run("IMPORT - query a key in importing slot without asking", func(t *testing.T) {
util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
})

t.Run("IMPORT - query a key in importing slot after asking", func(t *testing.T) {
require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
require.NoError(t, cli.Type(ctx, slotKey).Err())
})

t.Run("IMPORT - asking flag will be reset after executing", func(t *testing.T) {
require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
require.NoError(t, cli.Type(ctx, slotKey).Err())
util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
})
}
Loading