Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into fix_geo_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
saz97 committed Jun 26, 2024
2 parents 3415b32 + 8730946 commit ac63f85
Show file tree
Hide file tree
Showing 26 changed files with 1,977 additions and 1,074 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ name: "CodeQL"

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
schedule:
- cron: '25 19 * * 6'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/codis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ name: Codis

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0" ]
pull_request:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Pika

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
pull_request:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]

env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tools_go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Tools_go_build

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
paths:
- 'tools/**'
pull_request:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
paths:
- 'tools/**'

Expand Down
29 changes: 14 additions & 15 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ class PikaServer : public pstd::noncopyable {
bool SlotsMigrateBatch(const std::string &ip, int64_t port, int64_t time_out, int64_t slots, int64_t keys_num, const std::shared_ptr<DB>& db);
void GetSlotsMgrtSenderStatus(std::string *ip, int64_t* port, int64_t *slot, bool *migrating, int64_t *moved, int64_t *remained);
bool SlotsMigrateAsyncCancel();
std::shared_mutex bgsave_protector_;
BgSaveInfo bgsave_info_;
std::shared_mutex bgslots_protector_;

/*
* BGSlotsReload used
Expand All @@ -337,28 +336,28 @@ class PikaServer : public pstd::noncopyable {
BGSlotsReload bgslots_reload_;

BGSlotsReload bgslots_reload() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_reload_;
}
bool GetSlotsreloading() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_reload_.reloading;
}
void SetSlotsreloading(bool reloading) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_reload_.reloading = reloading;
}
void SetSlotsreloadingCursor(int64_t cursor) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_reload_.cursor = cursor;
}
int64_t GetSlotsreloadingCursor() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_reload_.cursor;
}

void SetSlotsreloadingEndTime() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_reload_.end_time = time(nullptr);
}
void Bgslotsreload(const std::shared_ptr<DB>& db);
Expand Down Expand Up @@ -399,33 +398,33 @@ class PikaServer : public pstd::noncopyable {
net::BGThread bgslots_cleanup_thread_;

BGSlotsCleanup bgslots_cleanup() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_cleanup_;
}
bool GetSlotscleaningup() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_cleanup_.cleaningup;
}
void SetSlotscleaningup(bool cleaningup) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cleaningup = cleaningup;
}
void SetSlotscleaningupCursor(int64_t cursor) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cursor = cursor;
}
void SetCleanupSlots(std::vector<int> cleanup_slots) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cleanup_slots.swap(cleanup_slots);
}
std::vector<int> GetCleanupSlots() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_cleanup_.cleanup_slots;
}

void Bgslotscleanup(std::vector<int> cleanup_slots, const std::shared_ptr<DB>& db);
void StopBgslotscleanup() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cleaningup = false;
std::vector<int> cleanup_slots;
bgslots_cleanup_.cleanup_slots.swap(cleanup_slots);
Expand Down
13 changes: 7 additions & 6 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1190,9 +1190,9 @@ void InfoCmd::InfoKeyspace(std::string& info) {
if (argv_.size() > 1 && strcasecmp(argv_[1].data(), kAllSection.data()) == 0) {
tmp_stream << "# Start async statistics\r\n";
} else if (argv_.size() == 3 && strcasecmp(argv_[1].data(), kKeyspaceSection.data()) == 0) {
tmp_stream << "# Start async statistics\r\n";
tmp_stream << "# Start async statistics\r\n";
} else {
tmp_stream << "# Use \"info keyspace 1\" to do async statistics\r\n";
tmp_stream << "# Use \"info keyspace 1\" to do async statistics\r\n";
}
std::shared_lock rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
Expand All @@ -1201,7 +1201,8 @@ void InfoCmd::InfoKeyspace(std::string& info) {
key_scan_info = db_item.second->GetKeyScanInfo();
key_infos = key_scan_info.key_infos;
duration = key_scan_info.duration;
if (key_infos.size() != (size_t)(storage::DataType::kNones)) {
if (key_infos.size() != (size_t)(storage::DataTypeNum)) {
LOG(ERROR) << "key_infos size is not equal with expected, potential data inconsistency";
info.append("info keyspace error\r\n");
return;
}
Expand All @@ -1216,7 +1217,7 @@ void InfoCmd::InfoKeyspace(std::string& info) {
tmp_stream << "# Duration: " << std::to_string(duration) + "s"
<< "\r\n";
}

tmp_stream << db_name << " Strings_keys=" << key_infos[0].keys << ", expires=" << key_infos[0].expires
<< ", invalid_keys=" << key_infos[0].invaild_keys << "\r\n";
tmp_stream << db_name << " Hashes_keys=" << key_infos[1].keys << ", expires=" << key_infos[1].expires
Expand Down Expand Up @@ -2911,8 +2912,8 @@ void DbsizeCmd::Do() {
}
KeyScanInfo key_scan_info = dbs->GetKeyScanInfo();
std::vector<storage::KeyInfo> key_infos = key_scan_info.key_infos;
if (key_infos.size() != (size_t)(storage::DataType::kNones)) {
res_.SetRes(CmdRes::kErrOther, "keyspace error");
if (key_infos.size() != (size_t)(storage::DataTypeNum)) {
res_.SetRes(CmdRes::kErrOther, "Mismatch in expected data types and actual key info count");
return;
}
uint64_t dbsize = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
8 changes: 4 additions & 4 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1449,8 +1449,8 @@ bool PikaServer::SlotsMigrateAsyncCancel() {
void PikaServer::Bgslotsreload(const std::shared_ptr<DB>& db) {
// Only one thread can go through
{
std::lock_guard ml(bgsave_protector_);
if (bgslots_reload_.reloading || bgsave_info_.bgsaving) {
std::lock_guard ml(bgslots_protector_);
if (bgslots_reload_.reloading || db->IsBgSaving()) {
return;
}
bgslots_reload_.reloading = true;
Expand Down Expand Up @@ -1514,8 +1514,8 @@ void DoBgslotsreload(void* arg) {
void PikaServer::Bgslotscleanup(std::vector<int> cleanupSlots, const std::shared_ptr<DB>& db) {
// Only one thread can go through
{
std::lock_guard ml(bgsave_protector_);
if (bgslots_cleanup_.cleaningup || bgslots_reload_.reloading || bgsave_info_.bgsaving) {
std::lock_guard ml(bgslots_protector_);
if (bgslots_cleanup_.cleaningup || bgslots_reload_.reloading || db->IsBgSaving()) {
return;
}
bgslots_cleanup_.cleaningup = true;
Expand Down
26 changes: 21 additions & 5 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "src/base_value_format.h"
#include "src/base_meta_value_format.h"
#include "src/lists_meta_value_format.h"
#include "src/pika_stream_meta_value.h"
#include "src/strings_value_format.h"
#include "src/zsets_data_key_format.h"
#include "src/debug.h"
Expand All @@ -36,11 +37,12 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
* The field designs of the remaining zset,set,hash and stream in meta-value
* are the same, so the same filtering strategy is used
*/
ParsedBaseKey parsed_key(key);
auto type = static_cast<enum DataType>(static_cast<uint8_t>(value[0]));
DEBUG("==========================START==========================");
if (type == DataType::kStrings) {
ParsedStringsValue parsed_strings_value(value);
DEBUG("[StringsFilter] key: {}, value = {}, timestamp: {}, cur_time: {}", key.ToString().c_str(),
DEBUG("[string type] key: %s, value = %s, timestamp: %llu, cur_time: %llu", parsed_key.Key().ToString().c_str(),
parsed_strings_value.UserValue().ToString().c_str(), parsed_strings_value.Etime(), cur_time);
if (parsed_strings_value.Etime() != 0 && parsed_strings_value.Etime() < cur_time) {
DEBUG("Drop[Stale]");
Expand All @@ -49,9 +51,17 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
DEBUG("Reserve");
return false;
}
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(value);
DEBUG("[stream meta type], key: %s, entries_added = %llu, first_id: %s, last_id: %s, version: %llu",
parsed_key.Key().ToString().c_str(), parsed_stream_meta_value.entries_added(),
parsed_stream_meta_value.first_id().ToString().c_str(),
parsed_stream_meta_value.last_id().ToString().c_str(),
parsed_stream_meta_value.version());
return false;
} else if (type == DataType::kLists) {
ParsedListsMetaValue parsed_lists_meta_value(value);
DEBUG("[ListMetaFilter], key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
DEBUG("[list meta type], key: %s, count = %d, timestamp: %llu, cur_time: %llu, version: %llu", parsed_key.Key().ToString().c_str(),
parsed_lists_meta_value.Count(), parsed_lists_meta_value.Etime(), cur_time,
parsed_lists_meta_value.Version());

Expand All @@ -68,8 +78,9 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
return false;
} else {
ParsedBaseMetaValue parsed_base_meta_value(value);
DEBUG("[MetaFilter] key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
parsed_base_meta_value.Count(), parsed_base_meta_value.Etime(), cur_time, parsed_base_meta_value.Version());
DEBUG("[%s meta type] key: %s, count = %d, timestamp: %llu, cur_time: %llu, version: %llu",
DataTypeToString(type), parsed_key.Key().ToString().c_str(), parsed_base_meta_value.Count(),
parsed_base_meta_value.Etime(), cur_time, parsed_base_meta_value.Version());

if (parsed_base_meta_value.Etime() != 0 && parsed_base_meta_value.Etime() < cur_time &&
parsed_base_meta_value.Version() < cur_time) {
Expand Down Expand Up @@ -143,7 +154,12 @@ class BaseDataFilter : public rocksdb::CompactionFilter {
auto type = static_cast<enum DataType>(static_cast<uint8_t>(meta_value[0]));
if (type != type_) {
return true;
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kStreams || type == DataType::kZSets) {
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_stream_meta_value.version();
cur_meta_etime_ = 0; // stream do not support ttl
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kZSets) {
ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_base_meta_value.Version();
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/base_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
namespace storage {

enum class DataType : uint8_t { kStrings = 0, kHashes = 1, kSets = 2, kLists = 3, kZSets = 4, kStreams = 5, kNones = 6, kAll = 7 };
constexpr int DataTypeNum = int(DataType::kNones);

constexpr char DataTypeTag[] = { 'k', 'h', 's', 'l', 'z', 'x', 'n', 'a'};
constexpr char* DataTypeStrings[] = { "string", "hash", "set", "list", "zset", "streams", "none", "all"};
Expand Down
15 changes: 9 additions & 6 deletions src/storage/src/pika_stream_meta_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class StreamMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamValueLength);
if (value_.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = &value_[0];
Expand Down Expand Up @@ -215,7 +216,8 @@ class ParsedStreamMetaValue {
ParsedStreamMetaValue(const Slice& value) {
assert(value.size() == kDefaultStreamValueLength);
if (value.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = const_cast<char*>(value.data());
Expand Down Expand Up @@ -294,7 +296,7 @@ class StreamCGroupMetaValue {
uint64_t needed = kDefaultStreamCGroupValueLength;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Init on a existed stream cgroup meta value!";
LOG(ERROR) << "Init on a existed stream cgroup meta value!";
return;
}
value_.resize(needed);
Expand All @@ -314,7 +316,8 @@ class StreamCGroupMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamCGroupValueLength);
if (value_.size() != kDefaultStreamCGroupValueLength) {
LOG(FATAL) << "Invalid stream cgroup meta value length: ";
LOG(ERROR) << "Invalid stream cgroup meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
if (value_.size() == kDefaultStreamCGroupValueLength) {
Expand Down Expand Up @@ -373,7 +376,7 @@ class StreamConsumerMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamConsumerValueLength);
if (value_.size() != kDefaultStreamConsumerValueLength) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size()
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size()
<< " expected: " << kDefaultStreamConsumerValueLength;
return;
}
Expand All @@ -391,7 +394,7 @@ class StreamConsumerMetaValue {
pel_ = pel;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
return;
}
uint64_t needed = kDefaultStreamConsumerValueLength;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ void Redis::SetCompactRangeOptions(const bool is_canceled) {
default_compact_range_options_.canceled = new std::atomic<bool>(is_canceled);
} else {
default_compact_range_options_.canceled->store(is_canceled);
}
}
}

Status Redis::GetProperty(const std::string& property, uint64_t* out) {
Expand All @@ -365,7 +365,7 @@ Status Redis::GetProperty(const std::string& property, uint64_t* out) {
}

Status Redis::ScanKeyNum(std::vector<KeyInfo>* key_infos) {
key_infos->resize(5);
key_infos->resize(DataTypeNum);
rocksdb::Status s;
s = ScanStringsKeyNum(&((*key_infos)[0]));
if (!s.ok()) {
Expand Down
Loading

0 comments on commit ac63f85

Please sign in to comment.