Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MINID, NOMKSTREAM and LIMIT options for XADD command #1201

Merged
merged 2 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
default_stages: [commit]
exclude: 'src\/redis\/.*'
exclude: 'contrib\/charts\/dragonfly\/ci\/.*'
exclude: |
(?x)(
src/redis/.* |
contrib/charts/dragonfly/ci/.*
)
repos:
- repo: local
hooks:
Expand Down
25 changes: 25 additions & 0 deletions src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ typedef struct streamPropInfo {
robj *groupname;
} streamPropInfo;

typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */

/* XADD + XTRIM common options */
int trim_strategy; /* TRIM_STRATEGY_* */
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
long long maxlen; /* After trimming, leave stream at this length . */
/* TRIM_STRATEGY_MINID options */
streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
} streamAddTrimArgs;

/* Prototypes of exported APIs. */
// struct client;

Expand All @@ -119,6 +139,10 @@ typedef struct streamPropInfo {

#define SCG_INVALID_ENTRIES_READ -1

#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
#define TRIM_STRATEGY_MINID 2

stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
Expand Down Expand Up @@ -147,6 +171,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrim(stream *s, streamAddTrimArgs *args);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
void streamFreeCG(streamCG *cg);
Expand Down
24 changes: 0 additions & 24 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,30 +673,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
return C_OK;
}

typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */

/* XADD + XTRIM common options */
int trim_strategy; /* TRIM_STRATEGY_* */
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
long long maxlen; /* After trimming, leave stream at this length . */
/* TRIM_STRATEGY_MINID options */
streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
} streamAddTrimArgs;

#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
#define TRIM_STRATEGY_MINID 2

/* Trim the stream 's' according to args->trim_strategy, and return the
* number of elements removed from the stream. The 'approx' option, if non-zero,
* specifies that the trimming must be performed in a approximated way in
Expand Down
82 changes: 69 additions & 13 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,20 @@ struct RangeId {
bool exclude = false;
};

enum class TrimStrategy {
kAddOptsTrimNone = TRIM_STRATEGY_NONE,
kAddOptsTrimMaxLen = TRIM_STRATEGY_MAXLEN,
kAddOptsTrimMinId = TRIM_STRATEGY_MINID,
};

struct AddOpts {
ParsedStreamId parsed_id;
uint32_t max_limit = kuint32max;
bool max_limit_approx = false;
ParsedStreamId minid;
uint32_t max_len = kuint32max;
uint32_t limit = 0;
TrimStrategy trim_strategy = TrimStrategy::kAddOptsTrimNone;
bool trim_approx = false;
bool no_mkstream = false;
};

struct GroupInfo {
Expand Down Expand Up @@ -471,10 +481,19 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
auto& db_slice = op_args.shard->db_slice();
pair<PrimeIterator, bool> add_res;

try {
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
if (opts.no_mkstream) {
auto res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it) {
return res_it.status();
}
add_res.first = res_it.value();
add_res.second = false;
} else {
try {
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
}

robj* stream_obj = nullptr;
Expand Down Expand Up @@ -508,10 +527,26 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
return OpStatus::OUT_OF_MEMORY;
}

if (opts.max_limit < kuint32max) {
/* Notify xtrim event if needed. */
streamTrimByLength(stream_inst, opts.max_limit, opts.max_limit_approx);
// TODO: when replicating, we should propagate it as exact limit in case of trimming.
if (!opts.limit) {
if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen) {
/* Notify xtrim event if needed. */
streamTrimByLength(stream_inst, opts.max_len, opts.trim_approx);
// TODO: when replicating, we should propagate it as exact limit in case of trimming.
} else if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId) {
streamTrimByID(stream_inst, opts.minid.val, opts.trim_approx);
}
} else {
streamAddTrimArgs add_args = {
.trim_strategy = static_cast<int>(opts.trim_strategy),
.approx_trim = opts.trim_approx,
.limit = opts.limit,
};
if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen) {
add_args.maxlen = opts.max_len;
} else if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId) {
add_args.minid = opts.minid.val;
}
streamTrim(stream_inst, &add_args);
}
return result_id;
}
Expand Down Expand Up @@ -927,17 +962,38 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
for (; id_indx < args.size(); ++id_indx) {
ToUpper(&args[id_indx]);
string_view arg = ArgS(args, id_indx);
if (arg == "MAXLEN") {
if (arg == "NOMKSTREAM") {
add_opts.no_mkstream = true;
} else if (arg == "MAXLEN" || arg == "MINID") {
if (arg == "MAXLEN") {
add_opts.trim_strategy = TrimStrategy::kAddOptsTrimMaxLen;
} else {
add_opts.trim_strategy = TrimStrategy::kAddOptsTrimMinId;
}
if (id_indx + 2 >= args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
++id_indx;
if (ArgS(args, id_indx) == "~") {
add_opts.max_limit_approx = true;
add_opts.trim_approx = true;
++id_indx;
}
arg = ArgS(args, id_indx);
if (!absl::SimpleAtoi(arg, &add_opts.max_limit)) {
if (add_opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen &&
!absl::SimpleAtoi(arg, &add_opts.max_len)) {
return (*cntx)->SendError(kSyntaxErr);
}
if (add_opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId &&
!ParseID(arg, false, 0, &add_opts.minid)) {
return (*cntx)->SendError(kSyntaxErr);
}

} else if (arg == "LIMIT" && add_opts.trim_strategy != TrimStrategy::kAddOptsTrimNone) {
if (id_indx + 2 >= args.size() || !add_opts.trim_approx) {
return (*cntx)->SendError(kSyntaxErr);
}
++id_indx;
if (!absl::SimpleAtoi(ArgS(args, id_indx), &add_opts.limit)) {
return (*cntx)->SendError(kSyntaxErr);
}
} else {
Expand Down
24 changes: 24 additions & 0 deletions src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ TEST_F(StreamFamilyTest, Add) {

resp = Run({"xadd", "key", "badid", "f1", "val1"});
EXPECT_THAT(resp, ErrArg("Invalid stream ID"));

resp = Run({"xadd", "key", "nomkstream", "*", "field2", "value2"});
ASSERT_THAT(resp, ArgType(RespExpr::STRING));

resp = Run({"xadd", "noexist", "nomkstream", "*", "field", "value"});
EXPECT_THAT(resp, ErrArg("no such key"));
}

TEST_F(StreamFamilyTest, AddExtended) {
Expand All @@ -62,6 +68,24 @@ TEST_F(StreamFamilyTest, AddExtended) {

auto resp3 = Run({"xadd", "key", id2, "f1", "val1"});
EXPECT_THAT(resp3, ErrArg("equal or smaller than"));

Run({"xadd", "key2", "5-0", "field", "val"});
Run({"xadd", "key2", "6-0", "field1", "val1"});
Run({"xadd", "key2", "7-0", "field2", "val2"});
auto resp = Run({"xadd", "key2", "minid", "6", "*", "field3", "val3"});
EXPECT_THAT(Run({"xlen", "key2"}), IntArg(3));
EXPECT_THAT(Run({"xrange", "key2", "5-0", "5-0"}), ArrLen(0));

for (int i = 0; i < 700; i++) {
Run({"xadd", "key3", "*", "field", "val"});
}
resp = Run({"xadd", "key3", "maxlen", "~", "500", "*", "field", "val"});
EXPECT_THAT(Run({"xlen", "key3"}), IntArg(501));
for (int i = 0; i < 700; i++) {
Run({"xadd", "key4", "*", "field", "val"});
}
resp = Run({"xadd", "key4", "maxlen", "~", "500", "limit", "100", "*", "field", "val"});
EXPECT_THAT(Run({"xlen", "key4"}), IntArg(601));
}

TEST_F(StreamFamilyTest, Range) {
Expand Down