Skip to content

Commit

Permalink
XGROUP CREATE: MKSTREAM option for automatic stream creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
antirez committed Oct 17, 2018
1 parent 2f8f29a commit cb27dd1
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1683,13 +1683,14 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) {
* Consumer groups commands
* ----------------------------------------------------------------------- */

/* XGROUP CREATE <key> <groupname> <id or $>
/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
* XGROUP SETID <key> <groupname> <id or $>
* XGROUP DESTROY <key> <groupname>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
void xgroupCommand(client *c) {
const char *help[] = {
"CREATE <key> <groupname> <id or $> -- Create a new consumer group.",
"CREATE <key> <groupname> <id or $> [opt] -- Create a new consumer group.",
" option MKSTREAM: create the empty stream if it does not exist.",
"SETID <key> <groupname> <id or $> -- Set the current group ID.",
"DESTROY <key> <groupname> -- Remove the specified group.",
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
Expand All @@ -1703,8 +1704,31 @@ NULL

/* Lookup the key now, this is common for all the subcommands but HELP. */
if (c->argc >= 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
robj *o = lookupKeyWrite(c->db,c->argv[2]);

/* CREATE has an MKSTREAM option that creates the stream if it
* does not exist. */
if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
addReplySubcommandSyntaxError(c);
return;
}
if (o == NULL) {
o = createStreamObject();
dbAdd(c->db,c->argv[2],o);
}
}

/* At this point key must exist, or there is an error. */
if (o == NULL) {
addReplyError(c,
"The XGROUP subcommand requires the key to exist. "
"Note that for CREATE you may want to use the MKSTREAM "
"option to create an empty stream automatically.");
return;
}

if (checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
grpname = c->argv[3]->ptr;

Expand All @@ -1721,7 +1745,7 @@ NULL
}

/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 5) {
if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
Expand Down

0 comments on commit cb27dd1

Please sign in to comment.