Skip to content

Commit

Permalink
Support MINID in XADD
Browse files Browse the repository at this point in the history
According to https://redis.io/commands/xadd
XADD should support MINID (in addition to MAXLEN) as another means
of trimming old entries from the stream.

First added in Redis 6.2.0 https://raw.githubusercontent.com/redis/redis/6.2/00-RELEASENOTES
  • Loading branch information
nathan-cormier committed Apr 4, 2023
1 parent e5c9dd4 commit a80f140
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 10 deletions.
24 changes: 14 additions & 10 deletions cmd_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (m *Miniredis) cmdXadd(c *server.Peer, cmd string, args []string) {
withTx(m, c, func(c *server.Peer, ctx *connCtx) {

maxlen := -1
minID := ""
if strings.ToLower(args[0]) == "maxlen" {
args = args[1:]
// we don't treat "~" special
Expand All @@ -67,6 +68,14 @@ func (m *Miniredis) cmdXadd(c *server.Peer, cmd string, args []string) {
}
maxlen = n
args = args[1:]
} else if strings.ToLower(args[0]) == "minid" {
args = args[1:]
// we don't treat "~" special
if args[0] == "~" {
args = args[1:]
}
minID = args[0]
args = args[1:]
}
if len(args) < 1 {
c.WriteError(errWrongNumber(cmd))
Expand Down Expand Up @@ -110,6 +119,9 @@ func (m *Miniredis) cmdXadd(c *server.Peer, cmd string, args []string) {
if maxlen >= 0 {
s.trim(maxlen)
}
if minID != "" {
s.trimBefore(minID)
}
db.keyVersion[key]++

c.WriteBulk(newID)
Expand Down Expand Up @@ -1329,16 +1341,8 @@ func (m *Miniredis) cmdXtrim(c *server.Peer, cmd string, args []string) {
s.trim(opts.maxLen)
c.WriteInt(entriesBefore - len(s.entries))
case "MINID":
var delete []string
for _, entry := range s.entries {
if entry.ID < opts.threshold {
delete = append(delete, entry.ID)
} else {
break
}
}
s.delete(delete)
c.WriteInt(len(delete))
n := s.trimBefore(opts.threshold)
c.WriteInt(n)
}
})
}
Expand Down
43 changes: 43 additions & 0 deletions cmd_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,49 @@ func TestStreamAdd(t *testing.T) {
equals(t, 10, len(nowz))
})

t.Run("XADD MINID", func(t *testing.T) {
now := time.Date(2023, 1, 1, 4, 4, 5, 4000000, time.UTC)
s.SetTime(now)

minID := strconv.FormatInt(now.Add(-time.Second).UnixNano()/time.Millisecond.Nanoseconds(), 10)
_, err := c.Do("XADD", "mid", "MINID", minID, "*", "one", "1")
ok(t, err)
_, err = c.Do("XADD", "mid", "MINID", minID, "*", "two", "2")
ok(t, err)
now = now.Add(time.Second)
s.SetTime(now)
_, err = c.Do("XADD", "mid", "MINID", minID, "*", "three", "3")
ok(t, err)
now = now.Add(time.Second)
s.SetTime(now)
// advance the minID, older entries will be dropped
minID = strconv.FormatInt(now.Add(-time.Second).UnixNano()/time.Millisecond.Nanoseconds(), 10)
_, err = c.Do("XADD", "mid", "MINID", minID, "*", "four", "4")
ok(t, err)

mustDo(t, c,
"XRANGE", "mid", "-", "+",
proto.Array(
proto.Array(proto.String("1672545846004-0"), proto.Strings("three", "3")),
proto.Array(proto.String("1672545847004-0"), proto.Strings("four", "4")),
),
)
// advance now & minID and test with ~
now = now.Add(time.Second)
s.SetTime(now)
minID = strconv.FormatInt(now.Add(-time.Second).UnixNano()/time.Millisecond.Nanoseconds(), 10)
_, err = c.Do("XADD", "mid", "MINID", "~", minID, "*", "five", "5")
ok(t, err)

mustDo(t, c,
"XRANGE", "mid", "-", "+",
proto.Array(
proto.Array(proto.String("1672545847004-0"), proto.Strings("four", "4")),
proto.Array(proto.String("1672545848004-0"), proto.Strings("five", "5")),
),
)
})

t.Run("error cases", func(t *testing.T) {
// Wrong type of key
mustOK(t, c,
Expand Down
22 changes: 22 additions & 0 deletions integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ func TestStream(t *testing.T) {
c.Do("SET", "str", "I am a string")
c.Error("not an integer", "XADD", "str", "MAXLEN", "four", "*", "foo", "bar")
})

testRaw(t, func(c *client) {
c.Do("XADD", "planets", "MINID", "450", "450-0", "name", "Venus")
c.Do("XADD", "planets", "MINID", "450", "450-1", "name", "Venus")
c.Do("XADD", "planets", "MINID", "450", "456-1", "name", "Mercury")
c.Do("XADD", "planets", "MINID", "450", "456-2", "name", "Mercury")
c.Do("XADD", "planets", "MINID", "450", "456-3", "name", "Mercury")
c.Do("XADD", "planets", "MINID", "450", "456-4", "name", "Mercury")
c.Do("XADD", "planets", "MINID", "450", "456-5", "name", "Mercury")
c.Do("XADD", "planets", "MINID", "450", "456-6", "name", "Mercury")
c.Do("XADD", "planets", "MINID", "~", "450", "456-7", "name", "Mercury")
c.Do("XLEN", "planets")

c.Error("equal or smaller than the target", "XADD", "planets", "MINID", "450", "449-0", "name", "Earth")
c.Error("equal or smaller than the target", "XADD", "planets", "MINID", "450", "450", "name", "Earth")
c.Error("wrong number", "XADD", "planets", "MINID", "~")
c.Error("wrong number", "XADD", "planets", "MINID")
c.Error("wrong number", "XADD", "planets", "MINID", "100")

c.Do("SET", "str", "I am a string")
c.Error("key holding the wrong kind of value", "XADD", "str", "MINID", "400", "*", "foo", "bar")
})
})

t.Run("transactions", func(t *testing.T) {
Expand Down
17 changes: 17 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,23 @@ func (s *streamKey) trim(n int) {
}
}

// trimBefore deletes entries with an id less than the provided id
// and returns the number of entries deleted
func (s *streamKey) trimBefore(id string) int {
s.mu.Lock()
var delete []string
for _, entry := range s.entries {
if entry.ID < id {
delete = append(delete, entry.ID)
} else {
break
}
}
s.mu.Unlock()
s.delete(delete)
return len(delete)
}

// all entries after "id"
func (s *streamKey) after(id string) []StreamEntry {
s.mu.Lock()
Expand Down

0 comments on commit a80f140

Please sign in to comment.