Skip to content

Commit

Permalink
Merge pull request #209 from alicebob/xblock
Browse files Browse the repository at this point in the history
xread block
  • Loading branch information
alicebob committed May 19, 2021
2 parents adebdf7 + f9a0e29 commit 8e7ebef
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 62 deletions.
159 changes: 97 additions & 62 deletions cmd_stream.go
Expand Up @@ -557,10 +557,15 @@ func (m *Miniredis) cmdXread(c *server.Peer, cmd string, args []string) {
c.WriteError(errWrongNumber(cmd))
return
}
var count int

var opts struct {
count int
streams []string
ids []string
block bool
blockTimeout time.Duration
}
var err error
streams := make([]string, 0)
ids := make([]string, 0)

parsing:
for len(args) > 0 {
Expand All @@ -571,17 +576,27 @@ parsing:
break parsing
}

count, err = strconv.Atoi(args[1])
opts.count, err = strconv.Atoi(args[1])
if err != nil {
break parsing
}

args = args[2:]
case "BLOCK":
if len(args) < 2 {
err = errors.New(errWrongNumber(cmd))
break parsing
}
opts.block = true
ms, nerr := strconv.Atoi(args[1])
if nerr != nil {
err = errors.New(msgInvalidInt)
break parsing
}
if ms < 0 {
err = errors.New("ERR timeout is negative")
break parsing
}
opts.blockTimeout = time.Millisecond * time.Duration(ms)
args = args[2:]
case "STREAMS":
args = args[1:]
Expand All @@ -591,14 +606,15 @@ parsing:
break parsing
}

streams, ids = args[0:len(args)/2], args[len(args)/2:]
for _, id := range ids {
opts.streams, opts.ids = args[0:len(args)/2], args[len(args)/2:]
for _, id := range opts.ids {
if _, err := parseStreamID(id); err != nil {
setDirty(c)
c.WriteError(msgInvalidStreamID)
return
}
}
args = nil
break parsing
default:
err = fmt.Errorf("ERR incorrect argument %s", args[0])
Expand All @@ -612,74 +628,93 @@ parsing:
return
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
res := map[string][]StreamEntry{}
if !opts.block {
withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
res := xread(db, opts.streams, opts.ids, opts.count)
writeXread(c, opts.streams, res)
})
return
}
blocking(
m,
c,
opts.blockTimeout,
func(c *server.Peer, ctx *connCtx) bool {
db := m.db(ctx.selectedDB)
res := xread(db, opts.streams, opts.ids, opts.count)
if len(res) == 0 {
return false
}
writeXread(c, opts.streams, res)
return true
},
func(c *server.Peer) { // timeout
c.WriteLen(-1)
},
)
}

db := m.db(ctx.selectedDB)
func xread(db *RedisDB, streams []string, ids []string, count int) map[string][]StreamEntry {
res := map[string][]StreamEntry{}
for i := range streams {
stream := streams[i]
id := ids[i]

for i := range streams {
stream := streams[i]
id := ids[i]
var s, ok = db.streamKeys[stream]
if !ok {
continue
}
entries := s.entries
if len(entries) == 0 {
continue
}

var s, ok = db.streamKeys[stream]
if !ok {
continue
}
entries := s.entries
entryCount := count
if entryCount == 0 {
entryCount = len(entries)
}
entryCount := count
if entryCount == 0 {
entryCount = len(entries)
}

if len(entries) == 0 {
continue
var returnedEntries []StreamEntry
for _, entry := range entries {
if len(returnedEntries) == entryCount {
break
}

var returnedEntries []StreamEntry
for _, entry := range entries {
if len(returnedEntries) == entryCount {
break
}

// Continue if entry ID <= start
if streamCmp(entry.ID, id) <= 0 {
continue
}
returnedEntries = append(returnedEntries, entry)
if streamCmp(entry.ID, id) <= 0 {
continue
}

returnedEntries = append(returnedEntries, entry)
}
if len(returnedEntries) > 0 {
res[stream] = returnedEntries
}
}
return res
}

if len(res) == 0 {
c.WriteLen(-1)
return
func writeXread(c *server.Peer, streams []string, res map[string][]StreamEntry) {
if len(res) == 0 {
c.WriteLen(-1)
return
}
c.WriteLen(len(res))
for _, stream := range streams {
entries, ok := res[stream]
if !ok {
continue
}

c.WriteLen(len(res))

for _, stream := range streams {
entries, ok := res[stream]
if !ok {
continue
}

c.WriteLen(2)
c.WriteBulk(stream)
c.WriteLen(len(entries))
for _, entry := range entries {
c.WriteLen(2)
c.WriteBulk(stream)

c.WriteLen(len(entries))

for _, entry := range entries {
c.WriteLen(2)
c.WriteBulk(entry.ID)
c.WriteLen(len(entry.Values))

for _, v := range entry.Values {
c.WriteBulk(v)
}
c.WriteBulk(entry.ID)
c.WriteLen(len(entry.Values))
for _, v := range entry.Values {
c.WriteBulk(v)
}
}
})
}
}

// XPENDING
Expand Down
28 changes: 28 additions & 0 deletions integration/stream_test.go
Expand Up @@ -3,7 +3,9 @@
package main

import (
"sync"
"testing"
"time"
)

func TestStream(t *testing.T) {
Expand Down Expand Up @@ -203,6 +205,8 @@ func TestStream(t *testing.T) {
c.Do("XREAD", "STREAMS", "ordplanets", "ordplanets2", "2", "0")
c.Do("XREAD", "STREAMS", "ordplanets", "ordplanets2", "0", "2")
c.Do("XREAD", "STREAMS", "ordplanets", "ordplanets2", "1", "3")
c.Do("XREAD", "STREAMS", "ordplanets", "ordplanets2", "0", "999")
c.Do("XREAD", "COUNT", "1", "STREAMS", "ordplanets", "ordplanets2", "0", "0")

// failure cases
c.Error("wrong number", "XREAD")
Expand All @@ -216,6 +220,30 @@ func TestStream(t *testing.T) {
c.Error("wrong number", "XREAD", "COUNT", "10") // No streams
c.Error("stream ID", "XREAD", "STREAMS", "foo", "notint")
})

testRaw2(t, func(c, c2 *client) {
c.Do("XADD", "pl", "55-88", "name", "Mercury")
// something is available: doesn't block
c.Do("XREAD", "BLOCK", "10", "STREAMS", "pl", "0")
c.Do("XREAD", "BLOCK", "0", "STREAMS", "pl", "0")

// blocks
var wg sync.WaitGroup
wg.Add(1)
go func() {
c.Do("XREAD", "BLOCK", "1000", "STREAMS", "pl", "60")
wg.Done()
}()
time.Sleep(10 * time.Millisecond)
c2.Do("XADD", "pl", "60-1", "name", "Mercury")
wg.Wait()

// timeout
c.Do("XREAD", "BLOCK", "10", "STREAMS", "pl", "70")

c.Error("not an int", "XREAD", "BLOCK", "foo", "STREAMS", "pl", "0")
c.Error("negative", "XREAD", "BLOCK", "-12", "STREAMS", "pl", "0")
})
})
}

Expand Down

0 comments on commit 8e7ebef

Please sign in to comment.