Skip to content

Commit

Permalink
Add support of the command XCLAIM (#2202)
Browse files Browse the repository at this point in the history
Co-authored-by: 纪华裕 <jihuayu123@gmail.com>
  • Loading branch information
Beihao-Zhou and jihuayu committed May 13, 2024
1 parent 1e23484 commit a0dae44
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,128 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};

class CommandXClaim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 6) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());
group_name_ = GET_OR_RET(parser.TakeStr());
consumer_name_ = GET_OR_RET(parser.TakeStr());
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
min_idle_time_ms_ = parse_result.GetValue();
if (min_idle_time_ms_ < 0) {
min_idle_time_ms_ = 0;
}

while (parser.Good() && !isOption(parser.RawPeek())) {
auto raw_id = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(raw_id, &id);
if (!s.IsOK()) {
return s;
}
entry_ids_.emplace_back(id);
}

while (parser.Good()) {
if (parser.EatEqICase("idle")) {
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (parse_result.GetValue() < 0) {
return {Status::RedisParseErr, "IDLE for XCLAIM must be non-negative"};
}
stream_claim_options_.idle_time_ms = parse_result.GetValue();
} else if (parser.EatEqICase("time")) {
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (parse_result.GetValue() < 0) {
return {Status::RedisParseErr, "TIME for XCLAIM must be non-negative"};
}
stream_claim_options_.with_time = true;
stream_claim_options_.last_delivery_time_ms = parse_result.GetValue();
} else if (parser.EatEqICase("retrycount")) {
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (parse_result.GetValue() < 0) {
return {Status::RedisParseErr, "RETRYCOUNT for XCLAIM must be non-negative"};
}
stream_claim_options_.with_retry_count = true;
stream_claim_options_.last_delivery_count = parse_result.GetValue();
} else if (parser.EatEqICase("force")) {
stream_claim_options_.force = true;
} else if (parser.EatEqICase("justid")) {
stream_claim_options_.just_id = true;
} else if (parser.EatEqICase("lastid")) {
auto last_id = GET_OR_RET(parser.TakeStr());
auto s = ParseStreamEntryID(last_id, &stream_claim_options_.last_delivered_id);
if (!s.IsOK()) {
return s;
}
} else {
return parser.InvalidSyntax();
}
}
return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
StreamClaimResult result;
auto s = stream_db.ClaimPelEntries(stream_name_, group_name_, consumer_name_, min_idle_time_ms_, entry_ids_,
stream_claim_options_, &result);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (s.IsNotFound()) {
return {Status::RedisExecErr, errNoSuchKey};
}

if (!stream_claim_options_.just_id) {
output->append(redis::MultiLen(result.entries.size()));

for (const auto &e : result.entries) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(e.key));
output->append(conn->MultiBulkString(e.values));
}
} else {
output->append(redis::MultiLen(result.ids.size()));
for (const auto &id : result.ids) {
output->append(redis::BulkString(id));
}
}

return Status::OK();
}

private:
std::string stream_name_;
std::string group_name_;
std::string consumer_name_;
uint64_t min_idle_time_ms_;
std::vector<StreamEntryID> entry_ids_;
StreamClaimOptions stream_claim_options_;

bool static isOption(const std::string &arg) {
static const std::unordered_set<std::string> options = {"idle", "time", "retrycount", "force", "justid", "lastid"};
return options.find(util::ToLower(arg)) != options.end();
}
};

class CommandXGroup : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1534,6 +1656,7 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 0, 0),
Expand Down
132 changes: 132 additions & 0 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,138 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, const uint64_t min_idle_time_ms,
const std::vector<StreamEntryID> &entry_ids, const StreamClaimOptions &options,
StreamClaimResult *result) {
std::string ns_key = AppendNamespacePrefix(stream_name);
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;

std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_group_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " + group_name + " for key name " +
stream_name.ToString());
}
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);

std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
int created_number = 0;
s = createConsumerWithoutLock(stream_name, group_name, consumer_name, &created_number);
if (!s.ok()) {
return s;
}
group_metadata.consumer_number += created_number;
}
StreamConsumerMetadata consumer_metadata;
if (!s.IsNotFound()) {
consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
}
auto now = util::GetTimeStampMS();
consumer_metadata.last_idle_ms = now;
consumer_metadata.last_active_ms = now;

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
batch->PutLogData(log_data.Encode());

for (const auto &id : entry_ids) {
std::string raw_value;
rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) continue;

std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
std::string value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &value);
StreamPelEntry pel_entry;

if (!s.ok() && s.IsNotFound() && options.force) {
pel_entry = {0, 0, ""};
group_metadata.pending_number += 1;
}

if (s.ok()) {
pel_entry = decodeStreamPelEntryValue(value);
}

if (s.ok() || (s.IsNotFound() && options.force)) {
if (now - pel_entry.last_delivery_time_ms < min_idle_time_ms) continue;

std::vector<std::string> values;
if (options.just_id) {
result->ids.emplace_back(id.ToString());
} else {
auto rv = DecodeRawStreamEntryValue(raw_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
result->entries.emplace_back(id.ToString(), std::move(values));
}

if (pel_entry.consumer_name != "") {
std::string original_consumer_key =
internalKeyFromConsumerName(ns_key, metadata, group_name, pel_entry.consumer_name);
std::string get_original_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, original_consumer_key,
&get_original_consumer_value);
if (!s.ok()) {
return s;
}
StreamConsumerMetadata original_consumer_metadata =
decodeStreamConsumerMetadataValue(get_original_consumer_value);
original_consumer_metadata.pending_number -= 1;
batch->Put(stream_cf_handle_, original_consumer_key,
encodeStreamConsumerMetadataValue(original_consumer_metadata));
}

pel_entry.consumer_name = consumer_name;
consumer_metadata.pending_number += 1;
if (options.with_time) {
pel_entry.last_delivery_time_ms = options.last_delivery_time_ms;
} else {
pel_entry.last_delivery_time_ms = now - options.idle_time_ms;
}

if (pel_entry.last_delivery_time_ms < 0 || pel_entry.last_delivery_time_ms > now) {
pel_entry.last_delivery_time_ms = now;
}

if (options.with_retry_count) {
pel_entry.last_delivery_count = options.last_delivery_count;
} else if (!options.just_id) {
pel_entry.last_delivery_count += 1;
}

std::string pel_value = encodeStreamPelEntryValue(pel_entry);
batch->Put(stream_cf_handle_, entry_key, pel_value);
}
}

if (options.with_last_id && options.last_delivered_id > group_metadata.last_delivered_id) {
group_metadata.last_delivered_id = options.last_delivered_id;
}

batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_metadata));
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
Expand Down
4 changes: 4 additions & 0 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class Stream : public SubKeyScanner {
rocksdb::Status DeleteEntries(const Slice &stream_name, const std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status DeletePelEntries(const Slice &stream_name, const std::string &group_name,
const std::vector<StreamEntryID> &entry_ids, uint64_t *acknowledged);
rocksdb::Status ClaimPelEntries(const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, uint64_t min_idle_time_ms,
const std::vector<StreamEntryID> &entry_ids, const StreamClaimOptions &options,
StreamClaimResult *result);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info);
rocksdb::Status GetGroupInfo(const Slice &stream_name,
Expand Down
17 changes: 17 additions & 0 deletions src/types/redis_stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ struct StreamXGroupCreateOptions {
std::string last_id;
};

struct StreamClaimOptions {
uint64_t idle_time_ms = 0;
bool with_time = false;
bool with_retry_count = false;
bool force = false;
bool just_id = false;
bool with_last_id = false;
uint64_t last_delivery_time_ms;
uint64_t last_delivery_count;
StreamEntryID last_delivered_id;
};

struct StreamConsumerGroupMetadata {
uint64_t consumer_number = 0;
uint64_t pending_number = 0;
Expand Down Expand Up @@ -207,6 +219,11 @@ struct StreamReadResult {
: name(std::move(name)), entries(std::move(result)) {}
};

struct StreamClaimResult {
std::vector<std::string> ids;
std::vector<StreamEntry> entries;
};

Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> ParseNextStreamEntryIDStrategy(const std::string &input);
Expand Down

0 comments on commit a0dae44

Please sign in to comment.