Skip to content

Commit

Permalink
Drop the stream XGROUP command in 2.7 since it's not ready to use
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Dec 7, 2023
1 parent 6cb5b3a commit 6d1571d
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 119 deletions.
2 changes: 1 addition & 1 deletion src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ class CommandXSetId : public Commander {

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
// MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 0, 0),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1),
Expand Down
236 changes: 118 additions & 118 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,124 +857,124 @@ func TestStreamOffset(t *testing.T) {
require.EqualValues(t, providedSeqNum, seqNum)
})

t.Run("XGROUP CREATE with different kinds of commands and XGROUP DESTROY", func(t *testing.T) {
streamName := "test-stream-a"
groupName := "test-group-a"
require.NoError(t, rdb.Del(ctx, streamName).Err())
// No such stream (No such key)
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "10").Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD").Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM", "ENTRIESREAD").Err())
require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM").Err())
require.NoError(t, rdb.XInfoStream(ctx, streamName).Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err())
// Invalid syntax
groupName = "test-group-b"
require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, groupName, "$").Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIEREAD", "10").Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "-10").Err())
require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, "1test-group-c", "$").Err())

require.NoError(t, rdb.Del(ctx, "myStream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", "$").Err())
result, err := rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result()
require.NoError(t, err)
require.Equal(t, int64(1), result)
result, err = rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result()
require.NoError(t, err)
require.Equal(t, int64(0), result)
})

t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) {
streamName := "test-stream"
groupName := "test-group"
consumerName := "test-consumer"
require.NoError(t, rdb.Del(ctx, streamName).Err())
//No such stream
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
//no such group
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())

r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val()
require.Equal(t, int64(1), r)
r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val()
require.Equal(t, int64(0), r)
})

t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) {
streamName := "test-stream"
groupName := "test-group"
require.NoError(t, rdb.Del(ctx, streamName).Err())
//No such stream
require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
//No such group
require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())

require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, "0-0").Err())
require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entries", "100").Err())
require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "-100").Err())
require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "100").Err())
})

t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) {
streamName := "test-stream"
group1 := "t1"
group2 := "t2"
consumer1 := "c1"
consumer2 := "c2"
consumer3 := "c3"
require.NoError(t, rdb.Del(ctx, streamName).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, "$").Err())
r := rdb.XInfoGroups(ctx, streamName).Val()
require.Equal(t, group1, r[0].Name)
require.Equal(t, int64(0), r[0].Consumers)
require.Equal(t, int64(0), r[0].Pending)
require.Equal(t, "1-0", r[0].LastDeliveredID)
require.Equal(t, int64(0), r[0].EntriesRead)
require.Equal(t, int64(0), r[0].Lag)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "2-0",
Values: []string{"data1", "b"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, "$").Err())
r = rdb.XInfoGroups(ctx, streamName).Val()
require.Equal(t, group2, r[1].Name)
require.Equal(t, "2-0", r[1].LastDeliveredID)

require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer1).Err())
require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer2).Err())
require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group2, consumer3).Err())
r = rdb.XInfoGroups(ctx, streamName).Val()
require.Equal(t, int64(2), r[0].Consumers)
require.Equal(t, int64(1), r[1].Consumers)

r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val()
require.Equal(t, consumer1, r1[0].Name)
require.Equal(t, consumer2, r1[1].Name)
r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
require.Equal(t, consumer3, r1[0].Name)
})
//t.Run("XGROUP CREATE with different kinds of commands and XGROUP DESTROY", func(t *testing.T) {
// streamName := "test-stream-a"
// groupName := "test-group-a"
// require.NoError(t, rdb.Del(ctx, streamName).Err())
// // No such stream (No such key)
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "10").Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD").Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM", "ENTRIESREAD").Err())
// require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM").Err())
// require.NoError(t, rdb.XInfoStream(ctx, streamName).Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err())
// // Invalid syntax
// groupName = "test-group-b"
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, groupName, "$").Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIEREAD", "10").Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "-10").Err())
// require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, "1test-group-c", "$").Err())
//
// require.NoError(t, rdb.Del(ctx, "myStream").Err())
// require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
// require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", "$").Err())
// result, err := rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result()
// require.NoError(t, err)
// require.Equal(t, int64(1), result)
// result, err = rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result()
// require.NoError(t, err)
// require.Equal(t, int64(0), result)
//})

//t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) {
// streamName := "test-stream"
// groupName := "test-group"
// consumerName := "test-consumer"
// require.NoError(t, rdb.Del(ctx, streamName).Err())
// //No such stream
// require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
// require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
// Stream: streamName,
// ID: "1-0",
// Values: []string{"data", "a"},
// }).Err())
// //no such group
// require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
// require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())
//
// r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val()
// require.Equal(t, int64(1), r)
// r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val()
// require.Equal(t, int64(0), r)
//})

//t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) {
// streamName := "test-stream"
// groupName := "test-group"
// require.NoError(t, rdb.Del(ctx, streamName).Err())
// //No such stream
// require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err())
// require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
// Stream: streamName,
// ID: "1-0",
// Values: []string{"data", "a"},
// }).Err())
// //No such group
// require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err())
// require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())
//
// require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, "0-0").Err())
// require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entries", "100").Err())
// require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "-100").Err())
// require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "100").Err())
//})

//t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) {
// streamName := "test-stream"
// group1 := "t1"
// group2 := "t2"
// consumer1 := "c1"
// consumer2 := "c2"
// consumer3 := "c3"
// require.NoError(t, rdb.Del(ctx, streamName).Err())
// require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
// Stream: streamName,
// ID: "1-0",
// Values: []string{"data", "a"},
// }).Err())
// require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, "$").Err())
// r := rdb.XInfoGroups(ctx, streamName).Val()
// require.Equal(t, group1, r[0].Name)
// require.Equal(t, int64(0), r[0].Consumers)
// require.Equal(t, int64(0), r[0].Pending)
// require.Equal(t, "1-0", r[0].LastDeliveredID)
// require.Equal(t, int64(0), r[0].EntriesRead)
// require.Equal(t, int64(0), r[0].Lag)
//
// require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
// Stream: streamName,
// ID: "2-0",
// Values: []string{"data1", "b"},
// }).Err())
// require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, "$").Err())
// r = rdb.XInfoGroups(ctx, streamName).Val()
// require.Equal(t, group2, r[1].Name)
// require.Equal(t, "2-0", r[1].LastDeliveredID)
//
// require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer1).Err())
// require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer2).Err())
// require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group2, consumer3).Err())
// r = rdb.XInfoGroups(ctx, streamName).Val()
// require.Equal(t, int64(2), r[0].Consumers)
// require.Equal(t, int64(1), r[1].Consumers)
//
// r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val()
// require.Equal(t, consumer1, r1[0].Name)
// require.Equal(t, consumer2, r1[1].Name)
// r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
// require.Equal(t, consumer3, r1[0].Name)
//})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down

0 comments on commit 6d1571d

Please sign in to comment.