Skip to content

Commit

Permalink
fix XDEL, XACK checks, and fix FLUSHALL for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
alicebob committed May 13, 2021
1 parent 76ba0c5 commit 496dfc4
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 27 deletions.
13 changes: 5 additions & 8 deletions cmd_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,20 +400,18 @@ parsing:
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
res := make(map[string][]StreamEntry)

db := m.db(ctx.selectedDB)

res := map[string][]StreamEntry{}
for i := range streams {
stream := streams[i]
id := ids[i]

entries, err := db.streamRead(stream, group, consumer, id, count)
entries, err := db.streamReadgroup(stream, group, consumer, id, count)
if err != nil {
c.WriteError(err.Error())
return
}

if len(entries) == 0 {
continue
}
Expand All @@ -425,7 +423,6 @@ parsing:
c.WriteLen(-1)
return
}

c.WriteLen(len(res))

for _, stream := range streams {
Expand Down Expand Up @@ -482,13 +479,13 @@ func (m *Miniredis) cmdXdel(c *server.Peer, cmd string, args []string) {
return
}

stream, args := args[0], args[1:]
stream, ids := args[0], args[1:]

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
cnt, err := db.streamDelete(stream, args)
cnt, err := db.streamDelete(stream, ids)
if err != nil {
c.WriteError(fmt.Sprintf("ERR %s", err.Error()))
c.WriteError(err.Error())
return
}

Expand Down
2 changes: 1 addition & 1 deletion cmd_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func TestStreamReadGroup(t *testing.T) {

mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">",
proto.Error("stream planets not exists"),
proto.Error("NOGROUP No such key 'planets' or consumer group 'processing' in XREADGROUP with GROUP option"),
)

mustOK(t, c,
Expand Down
29 changes: 18 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func (db *RedisDB) flush() {
db.setKeys = map[string]setKey{}
db.sortedsetKeys = map[string]sortedSet{}
db.ttl = map[string]time.Duration{}
db.streamKeys = map[string]streamKey{}
db.streamGroupKeys = map[string]streamGroupKey{}
}

// move something to another db. Will return ok. Or not.
Expand Down Expand Up @@ -661,23 +663,22 @@ func (db *RedisDB) streamGroupCreate(stream, group, id string) error {
return nil
}

func (db *RedisDB) streamRead(stream, group, consumer, id string, count int) ([]StreamEntry, error) {
func (db *RedisDB) streamReadgroup(stream, group, consumer, id string, count int) ([]StreamEntry, error) {
streamData, ok := db.streamKeys[stream]
if !ok {
return nil, fmt.Errorf("stream %s not exists", stream)
return nil, errReadgroup(stream, group)
}

if _, ok := db.streamGroupKeys[stream]; !ok {
// Error for group because this is key for group
return nil, fmt.Errorf("group %s not exists", group)
return nil, errReadgroup(stream, group)
}

groupData, ok := db.streamGroupKeys[stream][group]
if !ok {
return nil, fmt.Errorf("group %s not exists", group)
return nil, errReadgroup(stream, group)
}

res := make([]StreamEntry, 0)
var res []StreamEntry

if id == ">" {
next := sort.Search(len(streamData), func(i int) bool {
Expand Down Expand Up @@ -747,11 +748,14 @@ func (db *RedisDB) streamDelete(stream string, ids []string) (int, error) {
count := 0

for _, id := range ids {
if _, err := parseStreamID(id); err != nil {
return 0, errors.New(msgInvalidStreamID)
}

pos := sort.Search(len(streamData), func(i int) bool {
return streamCmp(id, streamData[i].ID) <= 0
})

if pos == len(streamData) {
if streamData[pos].ID != id {
continue
}

Expand All @@ -768,18 +772,21 @@ func (db *RedisDB) streamDelete(stream string, ids []string) (int, error) {

func (db *RedisDB) streamAck(stream, group string, ids []string) (int, error) {
if _, ok := db.streamGroupKeys[stream]; !ok {
// Error for group because this is key for group
return 0, fmt.Errorf("group %s not exists", group)
return 0, nil
}

groupData, ok := db.streamGroupKeys[stream][group]
if !ok {
return 0, fmt.Errorf("group %s not exists", group)
return 0, nil
}

count := 0

for _, id := range ids {
if _, err := parseStreamID(id); err != nil {
return 0, errors.New(msgInvalidStreamID)
}

pos := sort.Search(len(groupData.pending), func(i int) bool {
return streamCmp(id, groupData.pending[i].ID) == 0
})
Expand Down
1 change: 1 addition & 0 deletions integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ func TestServer(t *testing.T) {
testRaw(t, func(c *client) {
c.Do("SET", "foo", "bar")
c.Do("SET", "baz", "bak")
c.Do("XADD", "planets", "123-456", "name", "Earth")
c.Do("DBSIZE")
c.Do("SELECT", "2")
c.Do("DBSIZE")
Expand Down
72 changes: 65 additions & 7 deletions integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestStream(t *testing.T) {
c.Do("DEL", "planets2")
c.Do("XLEN", "planets")

// error cases
c.Error("wrong number", "XADD",
"planets",
"1000",
Expand All @@ -56,13 +57,6 @@ func TestStream(t *testing.T) {
c.Error("wrong number", "XADD")
})

testRaw(t, func(c *client) {
c.Do("XDEL", "newplanets", "123-123")
c.Do("XADD", "newplanets", "123-123", "foo", "bar")
c.Do("XDEL", "newplanets", "123-123")
c.Do("XDEL", "newplanets", "123-123")
})

testRaw(t, func(c *client) {
c.Do("XADD", "planets", "MAXLEN", "4", "456-1", "name", "Mercury")
c.Do("XADD", "planets", "MAXLEN", "4", "456-2", "name", "Mercury")
Expand Down Expand Up @@ -106,6 +100,43 @@ func TestStream(t *testing.T) {
c.Do("XADD", "planets", "MAXLEN", "four", "*", "name", "Mercury")
c.Do("EXEC")
})

t.Run("XDEL", func(t *testing.T) {
testRaw(t, func(c *client) {
c.Do("XDEL", "newplanets", "123-123")
c.Do("XADD", "newplanets", "123-123", "foo", "bar")
c.Do("XADD", "newplanets", "123-124", "baz", "bak")
c.Do("XADD", "newplanets", "123-125", "bal", "bag")
c.Do("XDEL", "newplanets", "123-123", "123-125", "123-123")
c.Do("XDEL", "newplanets", "123-123")
c.Do("XDEL", "notexisting", "123-123")

c.Do("XADD", "gaps", "400-400", "foo", "bar")
c.Do("XADD", "gaps", "400-600", "foo", "bar")
c.Do("XDEL", "gaps", "400-500")

// errors
c.Do("XADD", "existing", "123-123", "foo", "bar")
c.Error("wrong number", "XDEL") // no key
c.Error("wrong number", "XDEL", "existing") // no id
c.Error("Invalid stream ID", "XDEL", "existing", "aa-bb")
c.Do("XDEL", "notexisting", "aa-bb") // invalid id

c.Do("MULTI")
c.Do("XDEL", "existing", "aa-bb")
c.Do("EXEC")
})
})

t.Run("FLUSHALL", func(t *testing.T) {
testRaw(t, func(c *client) {
c.Do("XADD", "planets", "0-1", "name", "Mercury")
c.Do("XGROUP", "CREATE", "planets", "universe", "$")
c.Do("FLUSHALL")
c.Do("XREAD", "STREAMS", "planets", "0")
c.Error("consumer group", "XREADGROUP", "GROUP", "universe", "alice", "STREAMS", "planets", ">")
})
})
}

func TestStreamRange(t *testing.T) {
Expand Down Expand Up @@ -221,6 +252,33 @@ func TestStreamGroup(t *testing.T) {
c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">")
c.Do("XACK", "planets", "processing", "0-1")
c.Do("XDEL", "planets", "0-1")

// errors
c.Error("consumer group", "XREADGROUP", "GROUP", "nosuch", "alice", "STREAMS", "planets", ">")
c.Error("consumer group", "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "nosuchplanets", ">")
})

t.Run("XACK", func(t *testing.T) {
testRaw(t, func(c *client) {
c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM")
c.Do("XADD", "planets", "4000-1", "name", "Mercury")
c.Do("XADD", "planets", "4000-2", "name", "Venus")
c.Do("XADD", "planets", "4000-3", "name", "not Pluto")
c.Do("XADD", "planets", "4000-4", "name", "Mars")
c.Do("XREADGROUP", "GROUP", "processing", "alice", "COUNT", "1", "STREAMS", "planets", ">")
c.Do("XACK", "planets", "processing", "4000-2", "4000-3")
c.Do("XACK", "planets", "processing", "4000-4")
c.Do("XACK", "planets", "processing", "2000-1")

c.Do("XACK", "nosuch", "processing", "0-1")
c.Do("XACK", "planets", "nosuch", "0-1")

// error cases
c.Error("wrong number", "XACK")
c.Error("wrong number", "XACK", "planets")
c.Error("wrong number", "XACK", "planets", "processing")
c.Error("Invalid stream", "XACK", "planets", "processing", "invalid")
})
})

testRESP3(t, func(c *client) {
Expand Down
4 changes: 4 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func errLuaParseError(err error) string {
return fmt.Sprintf("ERR Error compiling script (new function): %s", err.Error())
}

func errReadgroup(key, group string) error {
return fmt.Errorf("NOGROUP No such key '%s' or consumer group '%s' in XREADGROUP with GROUP option", key, group)
}

// withTx wraps the non-argument-checking part of command handling code in
// transaction logic.
func withTx(
Expand Down

0 comments on commit 496dfc4

Please sign in to comment.