Skip to content

Commit

Permalink
fix: some streams errors such as pkpatternmatchdel etc (#2726)
Browse files Browse the repository at this point in the history
* fix pkpatternmatchdel error

---------

Co-authored-by: wangshaoyi <wangshaoyi@360.cn>
  • Loading branch information
wangshao1 and wangshaoyi committed Jun 21, 2024
1 parent 8dea10f commit 3eb2e48
Show file tree
Hide file tree
Showing 5 changed files with 548 additions and 527 deletions.
26 changes: 21 additions & 5 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "src/base_value_format.h"
#include "src/base_meta_value_format.h"
#include "src/lists_meta_value_format.h"
#include "src/pika_stream_meta_value.h"
#include "src/strings_value_format.h"
#include "src/zsets_data_key_format.h"
#include "src/debug.h"
Expand All @@ -36,11 +37,12 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
* The field designs of the remaining zset,set,hash and stream in meta-value
* are the same, so the same filtering strategy is used
*/
ParsedBaseKey parsed_key(key);
auto type = static_cast<enum DataType>(static_cast<uint8_t>(value[0]));
DEBUG("==========================START==========================");
if (type == DataType::kStrings) {
ParsedStringsValue parsed_strings_value(value);
DEBUG("[StringsFilter] key: {}, value = {}, timestamp: {}, cur_time: {}", key.ToString().c_str(),
DEBUG("[string type] key: %s, value = %s, timestamp: %llu, cur_time: %llu", parsed_key.Key().ToString().c_str(),
parsed_strings_value.UserValue().ToString().c_str(), parsed_strings_value.Etime(), cur_time);
if (parsed_strings_value.Etime() != 0 && parsed_strings_value.Etime() < cur_time) {
DEBUG("Drop[Stale]");
Expand All @@ -49,9 +51,17 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
DEBUG("Reserve");
return false;
}
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(value);
DEBUG("[stream meta type], key: %s, entries_added = %llu, first_id: %s, last_id: %s, version: %llu",
parsed_key.Key().ToString().c_str(), parsed_stream_meta_value.entries_added(),
parsed_stream_meta_value.first_id().ToString().c_str(),
parsed_stream_meta_value.last_id().ToString().c_str(),
parsed_stream_meta_value.version());
return false;
} else if (type == DataType::kLists) {
ParsedListsMetaValue parsed_lists_meta_value(value);
DEBUG("[ListMetaFilter], key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
DEBUG("[list meta type], key: %s, count = %d, timestamp: %llu, cur_time: %llu, version: %llu", parsed_key.Key().ToString().c_str(),
parsed_lists_meta_value.Count(), parsed_lists_meta_value.Etime(), cur_time,
parsed_lists_meta_value.Version());

Expand All @@ -68,8 +78,9 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
return false;
} else {
ParsedBaseMetaValue parsed_base_meta_value(value);
DEBUG("[MetaFilter] key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
parsed_base_meta_value.Count(), parsed_base_meta_value.Etime(), cur_time, parsed_base_meta_value.Version());
DEBUG("[%s meta type] key: %s, count = %d, timestamp: %llu, cur_time: %llu, version: %llu",
DataTypeToString(type), parsed_key.Key().ToString().c_str(), parsed_base_meta_value.Count(),
parsed_base_meta_value.Etime(), cur_time, parsed_base_meta_value.Version());

if (parsed_base_meta_value.Etime() != 0 && parsed_base_meta_value.Etime() < cur_time &&
parsed_base_meta_value.Version() < cur_time) {
Expand Down Expand Up @@ -143,7 +154,12 @@ class BaseDataFilter : public rocksdb::CompactionFilter {
auto type = static_cast<enum DataType>(static_cast<uint8_t>(meta_value[0]));
if (type != type_) {
return true;
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kStreams || type == DataType::kZSets) {
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_stream_meta_value.version();
cur_meta_etime_ = 0; // stream do not support ttl
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kZSets) {
ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_base_meta_value.Version();
Expand Down
15 changes: 9 additions & 6 deletions src/storage/src/pika_stream_meta_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class StreamMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamValueLength);
if (value_.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = &value_[0];
Expand Down Expand Up @@ -215,7 +216,8 @@ class ParsedStreamMetaValue {
ParsedStreamMetaValue(const Slice& value) {
assert(value.size() == kDefaultStreamValueLength);
if (value.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = const_cast<char*>(value.data());
Expand Down Expand Up @@ -294,7 +296,7 @@ class StreamCGroupMetaValue {
uint64_t needed = kDefaultStreamCGroupValueLength;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Init on a existed stream cgroup meta value!";
LOG(ERROR) << "Init on a existed stream cgroup meta value!";
return;
}
value_.resize(needed);
Expand All @@ -314,7 +316,8 @@ class StreamCGroupMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamCGroupValueLength);
if (value_.size() != kDefaultStreamCGroupValueLength) {
LOG(FATAL) << "Invalid stream cgroup meta value length: ";
LOG(ERROR) << "Invalid stream cgroup meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
if (value_.size() == kDefaultStreamCGroupValueLength) {
Expand Down Expand Up @@ -373,7 +376,7 @@ class StreamConsumerMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamConsumerValueLength);
if (value_.size() != kDefaultStreamConsumerValueLength) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size()
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size()
<< " expected: " << kDefaultStreamConsumerValueLength;
return;
}
Expand All @@ -391,7 +394,7 @@ class StreamConsumerMetaValue {
pel_ = pel;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
return;
}
uint64_t needed = kDefaultStreamConsumerValueLength;
Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1703,19 +1703,19 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
iter->SeekToFirst();
key = iter->key().ToString();
while (iter->Valid()) {
auto meta_type = static_cast<enum DataType>(static_cast<uint8_t>(iter->value()[0]));
ParsedBaseMetaKey parsed_meta_key(iter->key().ToString());
key = iter->key().ToString();
meta_value = iter->value().ToString();

if (meta_type == DataType::kStrings) {
meta_value = iter->value().ToString();
ParsedStringsValue parsed_strings_value(&meta_value);
if (!parsed_strings_value.IsStale() &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) != 0)) {
batch.Delete(key);
}
} else if (meta_type == DataType::kLists) {
meta_value = iter->value().ToString();
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (!parsed_lists_meta_value.IsStale() && (parsed_lists_meta_value.Count() != 0U) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) !=
Expand All @@ -1732,7 +1732,6 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
batch.Put(handles_[kMetaCF], key, stream_meta_value.value());
}
} else {
meta_value = iter->value().ToString();
ParsedBaseMetaValue parsed_meta_value(&meta_value);
if (!parsed_meta_value.IsStale() && (parsed_meta_value.Count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) !=
Expand Down
5 changes: 4 additions & 1 deletion src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1401,11 +1401,14 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start,

Status Storage::PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret) {
Status s;
*ret = 0;
for (const auto& inst : insts_) {
s = inst->PKPatternMatchDel(pattern, ret);
int32_t tmp_ret = 0;
s = inst->PKPatternMatchDel(pattern, &tmp_ret);
if (!s.ok()) {
return s;
}
*ret += tmp_ret;
}
return s;
}
Expand Down
Loading

0 comments on commit 3eb2e48

Please sign in to comment.