Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redis-compatible cursors for SCAN commands #1489

Merged
merged 56 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e779de8
tmp update
jihuayu Jun 5, 2023
0fefed8
return 0 when not got enough item
jihuayu Jun 5, 2023
9c0dc30
update code
jihuayu Jun 5, 2023
87b1f01
format code
jihuayu Jun 5, 2023
49f1d16
format code
jihuayu Jun 6, 2023
bc67a1e
format code
jihuayu Jun 6, 2023
e975ec6
update cmd
jihuayu Jun 6, 2023
7779dbd
update
jihuayu Jun 6, 2023
c37b8c6
Merge branch 'apache:unstable' into issues/1402
jihuayu Jun 6, 2023
e1d42ad
update code
jihuayu Jun 6, 2023
608865a
update include
jihuayu Jun 7, 2023
ac53d8a
make the code more elegant
jihuayu Jun 9, 2023
822b5fb
add conf
jihuayu Jun 9, 2023
45e8a99
update code
jihuayu Jun 9, 2023
847bf00
format code
jihuayu Jun 9, 2023
71358a1
update code
jihuayu Jun 9, 2023
56df070
Merge branch 'apache:unstable' into issues/1402
jihuayu Jun 9, 2023
6c61e9e
update code
jihuayu Jun 10, 2023
a9529f0
update conf
jihuayu Jun 10, 2023
f034ffb
update
jihuayu Jun 15, 2023
f562cf8
fix a bug
jihuayu Jun 15, 2023
0e29fb6
update
jihuayu Jun 16, 2023
5b1e322
Don't allow the instance replication of itself and it's own replicas …
uds5501 Jun 12, 2023
454921b
Add support of the new command SINTERCARD(Redis 7) (#1444)
infdahai Jun 13, 2023
743e084
Fix data race when joining the task runner (#1493)
git-hulk Jun 13, 2023
2d2aeec
Fix a few typos in CMakeLists.txt (#1496)
torwig Jun 15, 2023
39a0bf3
Fix ZRANGE command should return an empty array when count = 0 (#1492)
infdahai Jun 15, 2023
d965f84
Merge branch 'unstable' into issues/1402
jihuayu Jun 16, 2023
573c64c
add static_assert for CURSOR_DICT_SIZE
jihuayu Jun 17, 2023
29e3bdf
change name to cursor_counter_
jihuayu Jun 17, 2023
211185e
change config name
jihuayu Jun 17, 2023
5f8fa62
add tests
jihuayu Jun 18, 2023
d6323bd
Merge branch 'unstable' into issues/1402
git-hulk Jun 19, 2023
66bd3a9
update tests name
jihuayu Jun 19, 2023
ee244b4
Update kvrocks.conf
jihuayu Jun 19, 2023
a36c141
Update kvrocks.conf
jihuayu Jun 19, 2023
44ff19b
Update src/server/server.h
jihuayu Jun 19, 2023
2e622e9
Update src/server/server.h
jihuayu Jun 19, 2023
6c1c87c
Update src/server/server.h
jihuayu Jun 19, 2023
2ab56a3
update config default value
jihuayu Jun 19, 2023
5f655af
update tests
jihuayu Jun 19, 2023
c068e45
add GetIndexFromNumberCursor function
jihuayu Jun 19, 2023
c31e4fa
add cursor_type to cursor_dict_
jihuayu Jun 19, 2023
54153b4
Merge branch 'unstable' into issues/1402
git-hulk Jun 20, 2023
47fb25a
change cursor format
jihuayu Jun 20, 2023
f942685
make NumberCursor class
jihuayu Jun 20, 2023
10b1a42
Merge branch 'unstable' into issues/1402
git-hulk Jun 22, 2023
9bcb4b4
update
jihuayu Jun 22, 2023
777da13
update CursorDictType
jihuayu Jun 22, 2023
05b6fe7
update
jihuayu Jun 22, 2023
03128fe
update
jihuayu Jun 22, 2023
89592ab
update scan_test,
jihuayu Jun 23, 2023
c1926a2
Merge branch 'unstable' into issues/1402
git-hulk Jun 23, 2023
45cd940
update cli_test
jihuayu Jun 24, 2023
d9b2786
Merge branch 'apache:unstable' into issues/1402
jihuayu Jun 24, 2023
4411894
update redis_cursor_compatible defalut value
jihuayu Jun 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ max-backup-keep-hours 24
# Default: 16
max-bitmap-to-string-mb 16

# Whether to enable SCAN-like cursor compatible with Redis.
# If enabled, the cursor will be unsigned 64-bit integers.
# If disabled, the cursor will be a string.
# Default: no
redis-cursor-compatible no

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
5 changes: 3 additions & 2 deletions src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,13 @@ class CommandHScan : public CommandSubkeyScanBase {
redis::Hash hash_db(svr->storage, conn->GetNamespace());
std::vector<std::string> fields;
std::vector<std::string> values;
auto s = hash_db.Scan(key_, cursor_, limit_, prefix_, &fields, &values);
auto key_name = svr->GetKeyNameFromCursor(cursor_, CursorType::kTypeHash);
auto s = hash_db.Scan(key_, key_name, limit_, prefix_, &fields, &values);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = GenerateOutput(fields, values);
*output = GenerateOutput(svr, fields, values, CursorType::kTypeHash);
return Status::OK();
}
};
Expand Down
15 changes: 8 additions & 7 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,11 +760,11 @@ class CommandScan : public CommandScanBase {
return Commander::Parse(args);
}

static std::string GenerateOutput(const std::vector<std::string> &keys, std::string end_cursor) {
static std::string GenerateOutput(Server *svr, const std::vector<std::string> &keys, const std::string &end_cursor) {
std::vector<std::string> list;
if (!end_cursor.empty()) {
end_cursor = kCursorPrefix + end_cursor;
list.emplace_back(redis::BulkString(end_cursor));
list.emplace_back(
redis::BulkString(svr->GenerateCursorFromKeyName(end_cursor, CursorType::kTypeBase, kCursorPrefix)));
} else {
list.emplace_back(redis::BulkString("0"));
}
Expand All @@ -776,14 +776,15 @@ class CommandScan : public CommandScanBase {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::Database redis_db(svr->storage, conn->GetNamespace());
auto key_name = svr->GetKeyNameFromCursor(cursor_, CursorType::kTypeBase);

std::vector<std::string> keys;
std::string end_cursor;
auto s = redis_db.Scan(cursor_, limit_, prefix_, &keys, &end_cursor);
std::string end_key;
auto s = redis_db.Scan(key_name, limit_, prefix_, &keys, &end_key);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = GenerateOutput(keys, end_cursor);
*output = GenerateOutput(svr, keys, end_key);
return Status::OK();
}
};
Expand Down
5 changes: 3 additions & 2 deletions src/commands/cmd_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,13 @@ class CommandSScan : public CommandSubkeyScanBase {
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::Set set_db(svr->storage, conn->GetNamespace());
std::vector<std::string> members;
auto s = set_db.Scan(key_, cursor_, limit_, prefix_, &members);
auto key_name = svr->GetKeyNameFromCursor(cursor_, CursorType::kTypeSet);
auto s = set_db.Scan(key_, key_name, limit_, prefix_, &members);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = CommandScanBase::GenerateOutput(members);
*output = CommandScanBase::GenerateOutput(svr, members, CursorType::kTypeSet);
return Status::OK();
}
};
Expand Down
5 changes: 3 additions & 2 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ class CommandZScan : public CommandSubkeyScanBase {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<std::string> members;
std::vector<double> scores;
auto s = zset_db.Scan(key_, cursor_, limit_, prefix_, &members, &scores);
auto key_name = svr->GetKeyNameFromCursor(cursor_, CursorType::kTypeZSet);
auto s = zset_db.Scan(key_, key_name, limit_, prefix_, &members, &scores);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand All @@ -1329,7 +1330,7 @@ class CommandZScan : public CommandSubkeyScanBase {
for (const auto &score : scores) {
score_strings.emplace_back(util::Float2String(score));
}
*output = GenerateOutput(members, score_strings);
*output = GenerateOutput(svr, members, score_strings, CursorType::kTypeZSet);
return Status::OK();
}
};
Expand Down
12 changes: 8 additions & 4 deletions src/commands/scan_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "commander.h"
#include "error_constants.h"
#include "parse_util.h"
#include "server/server.h"

namespace redis {

Expand Down Expand Up @@ -63,10 +64,11 @@ class CommandScanBase : public Commander {
}
}

std::string GenerateOutput(const std::vector<std::string> &keys) const {
std::string GenerateOutput(Server *svr, const std::vector<std::string> &keys, CursorType cursor_type) const {
std::vector<std::string> list;
if (keys.size() == static_cast<size_t>(limit_)) {
list.emplace_back(redis::BulkString(keys.back()));
auto end_cursor = svr->GenerateCursorFromKeyName(keys.back(), cursor_type);
list.emplace_back(redis::BulkString(end_cursor));
} else {
list.emplace_back(redis::BulkString("0"));
}
Expand Down Expand Up @@ -109,11 +111,13 @@ class CommandSubkeyScanBase : public CommandScanBase {
return Commander::Parse(args);
}

std::string GenerateOutput(const std::vector<std::string> &fields, const std::vector<std::string> &values) {
std::string GenerateOutput(Server *svr, const std::vector<std::string> &fields,
const std::vector<std::string> &values, CursorType cursor_type) {
std::vector<std::string> list;
auto items_count = fields.size();
if (items_count == static_cast<size_t>(limit_)) {
list.emplace_back(redis::BulkString(fields.back()));
auto end_cursor = svr->GenerateCursorFromKeyName(fields.back(), cursor_type);
list.emplace_back(redis::BulkString(end_cursor));
} else {
list.emplace_back(redis::BulkString("0"));
}
Expand Down
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ Config::Config() {
{"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)},
{"log-retention-days", false, new IntField(&log_retention_days, -1, -1, INT_MAX)},
{"persist-cluster-nodes-enabled", false, new YesNoField(&persist_cluster_nodes_enabled, true)},
{"redis-cursor-compatible", false, new YesNoField(&redis_cursor_compatible, false)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ struct Config {
int pipeline_size;
int sequence_gap;

bool redis_cursor_compatible = true;
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
int log_retention_days;
// profiling
int profiling_sample_ratio = 0;
Expand Down
53 changes: 53 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <sys/utsname.h>

#include <atomic>
#include <cstdint>
#include <functional>
#include <iomanip>
#include <jsoncons/json.hpp>
#include <memory>
Expand Down Expand Up @@ -60,6 +62,9 @@ Server::Server(engine::Storage *storage, Config *config)
stats.commands_stats[iter.first].latency = 0;
}

// init cursor_dict_
cursor_dict_ = std::make_unique<CursorDictType>();

#ifdef ENABLE_OPENSSL
// init ssl context
if (config->tls_port) {
Expand Down Expand Up @@ -1753,3 +1758,51 @@ std::list<std::pair<std::string, uint32_t>> Server::GetSlaveHostAndPort() {
slave_threads_mu_.unlock();
return result;
}

// The numeric cursor consists of a 16-bit counter, a 16-bit time stamp, a 29-bit hash,and a 3-bit cursor type. The
// hash is used to prevent information leakage. The time_stamp is used to prevent the generation of the same cursor in
// the extremely short period before and after a restart.
NumberCursor::NumberCursor(CursorType cursor_type, uint16_t counter, const std::string &key_name) {
auto hash = static_cast<uint32_t>(std::hash<std::string>{}(key_name));
auto time_stamp = static_cast<uint16_t>(util::GetTimeStamp());
cursor_ = static_cast<uint64_t>(counter) | static_cast<uint64_t>(time_stamp) << 16 |
static_cast<uint64_t>(hash) << 32 | static_cast<uint64_t>(cursor_type) << 61;
}

bool NumberCursor::IsMatch(const CursorDictElement &element, CursorType cursor_type) const {
return cursor_ == element.cursor.cursor_ && cursor_type == getCursorType();
jihuayu marked this conversation as resolved.
Show resolved Hide resolved
}

std::string Server::GenerateCursorFromKeyName(const std::string &key_name, CursorType cursor_type, const char *prefix) {
if (!config_->redis_cursor_compatible) {
// add prefix for SCAN
return prefix + key_name;
}
auto counter = cursor_counter_.fetch_add(1);
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
auto number_cursor = NumberCursor(cursor_type, counter, key_name);
cursor_dict_->at(number_cursor.GetIndex()) = {number_cursor, key_name};
return number_cursor.ToString();
}

std::string Server::GetKeyNameFromCursor(const std::string &cursor, CursorType cursor_type) {
// When cursor is 0, cursor string is empty
if (cursor.empty() || !config_->redis_cursor_compatible) {
return cursor;
}

auto s = ParseInt<uint64_t>(cursor, 10);
// When Cursor 0 or not a Integer return empty string.
// Although the parameter 'cursor' is not expected to be 0, we still added a check for 0 to increase the robustness of
// the code.
if (!s.IsOK() || *s == 0) {
return {};
}
auto number_cursor = NumberCursor(*s);
// Because the index information is fully stored in the cursor, we can directly obtain the index from the cursor.
auto item = cursor_dict_->at(number_cursor.GetIndex());
if (number_cursor.IsMatch(item, cursor_type)) {
return item.key_name;
}

return {};
}
45 changes: 45 additions & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

#include <inttypes.h>

#include <array>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <list>
#include <map>
#include <memory>
Expand Down Expand Up @@ -84,6 +88,39 @@ struct ChannelSubscribeNum {
size_t subscribe_num;
};

// CURSOR_DICT_SIZE must be 2^n where n <= 16
constexpr const size_t CURSOR_DICT_SIZE = 1024 * 16;
static_assert((CURSOR_DICT_SIZE & (CURSOR_DICT_SIZE - 1)) == 0, "CURSOR_DICT_SIZE must be 2^n");
static_assert(CURSOR_DICT_SIZE <= (1 << 16), "CURSOR_DICT_SIZE must be less than or equal to 2^16");

enum class CursorType : uint8_t {
kTypeNone = 0, // none
kTypeBase = 1, // cursor for SCAN
kTypeHash = 2, // cursor for HSCAN
kTypeSet = 3, // cursor for SSCAN
kTypeZSet = 4, // cursor for ZSCAN
};
struct CursorDictElement;

class NumberCursor {
public:
NumberCursor() = default;
explicit NumberCursor(CursorType cursor_type, uint16_t counter, const std::string &key_name);
explicit NumberCursor(uint64_t number_cursor) : cursor_(number_cursor) {}
size_t GetIndex() const { return cursor_ % CURSOR_DICT_SIZE; }
bool IsMatch(const CursorDictElement &element, CursorType cursor_type) const;
std::string ToString() const { return std::to_string(cursor_); }

private:
CursorType getCursorType() const { return static_cast<CursorType>(cursor_ >> 61); }
uint64_t cursor_;
};

struct CursorDictElement {
NumberCursor cursor;
std::string key_name;
};

enum SlowLog {
kSlowLogMaxArgc = 32,
kSlowLogMaxString = 128,
Expand Down Expand Up @@ -196,6 +233,9 @@ class Server {
void GetLatestKeyNumStats(const std::string &ns, KeyNumStats *stats);
time_t GetLastScanTime(const std::string &ns);

std::string GenerateCursorFromKeyName(const std::string &key_name, CursorType cursor_type, const char *prefix = "");
std::string GetKeyNameFromCursor(const std::string &cursor, CursorType cursor_type);

int DecrClientNum();
int IncrClientNum();
int IncrMonitorClientNum();
Expand Down Expand Up @@ -319,4 +359,9 @@ class Server {
std::atomic<size_t> watched_key_size_ = 0;
std::map<std::string, std::set<redis::Connection *>> watched_key_map_;
std::shared_mutex watched_key_mutex_;

// SCAN ring buffer
std::atomic<uint16_t> cursor_counter_ = {0};
using CursorDictType = std::array<CursorDictElement, CURSOR_DICT_SIZE>;
std::unique_ptr<CursorDictType> cursor_dict_;
};
3 changes: 1 addition & 2 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,8 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
keys->emplace_back(user_key);
cnt++;
}

if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
if (!keys->empty()) {
if (!keys->empty() && cnt >= limit) {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
end_cursor->append(user_key);
}
break;
Expand Down
8 changes: 8 additions & 0 deletions tests/gocase/integration/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ func TestRedisCli(t *testing.T) {
require.Len(t, strings.Split(r, "\n"), 10)
})

t.Run("Use reids-cli --bigkeys", func(t *testing.T) {
runCli(t, srv, nil, "--bigkeys").Success()
})

t.Run("Use reids-cli --memkeys", func(t *testing.T) {
runCli(t, srv, nil, "--memkeys").Success()
})

formatArgs := func(args ...string) string {
cmd := fmt.Sprintf("*%d\r\n", len(args))
for _, arg := range args {
Expand Down
15 changes: 14 additions & 1 deletion tests/gocase/unit/scan/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,26 @@ func TestScanEmptyKey(t *testing.T) {
require.Equal(t, []string{"", "fab", "fiz", "foobar"}, keys)
}

func TestScan(t *testing.T) {
func TestScanWithNumberCursor(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
ScanTest(t, rdb, ctx)
}

func TestScanWithStringCursor(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
require.NoError(t, rdb.ConfigSet(ctx, "redis-cursor-compatible", "no").Err())
ScanTest(t, rdb, ctx)
}
git-hulk marked this conversation as resolved.
Show resolved Hide resolved

func ScanTest(t *testing.T, rdb *redis.Client, ctx context.Context) {

t.Run("SCAN Basic", func(t *testing.T) {
require.NoError(t, rdb.FlushDB(ctx).Err())
Expand Down