Skip to content

Commit

Permalink
Merge pull request #12 from saisrikark/maxlen
Browse files Browse the repository at this point in the history
README and testcases update for MaxLen and Approx options
  • Loading branch information
dranikpg committed Nov 17, 2023
2 parents df2c754 + b8e0da4 commit 4917a00
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ errors.Is(msg.Err, errMyTypeFailedToParse)
Streams are simple wrappers for basic redis commands on a stream.

```go
stream := NewStream[Event](rdb, "my-stream", &Options{TTL: time.Hour})
stream := NewStream[Event](rdb, "my-stream", &Options{TTL: time.Hour, MaxLen: 1000, Approx: true})
stream.Add(ctx, Event{
Kind: "Example event",
Priority: 1,
})
```
The Options.TTL parameter will evict stream entries after the specified duration has elapsed (or it can be set to `NoExpiration`).

The Options.MaxLen parameter will remove older stream entries to accommodate newer entries after the maximum number of entries is reached.
The Options.Approx parameter provides better efficiency by using almost exact trimming.
#### Metadata

The package defines a Metadata type as:
Expand Down
56 changes: 56 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,59 @@ func TestStream_TTL(t *testing.T) {
assert.Equal(t, vals[0].Stream, "s1")
assert.Equal(t, vals[0].Data.Name, "Fourth")
}

func TestStream_MaxLen(t *testing.T) {
_, rdb := startMiniredis(t)
ctx := context.TODO()

streamSize := 5
numberOfMessages := 10

stream := NewStream[Person](rdb, "s1", &Options{MaxLen: int64(streamSize)})
message := Person{Name: "Gorilla"}

valuesCheck := func(index int, size int, vals []Message[Person]) {
// ensure number of messages is accurate
if index < streamSize {
assert.Len(t, vals, index+1)
} else {
assert.Len(t, vals, streamSize)
}

// ensure ordering is preserved
vMap := map[int]bool{}
prevNumber := -1
for _, val := range vals {
assert.Greater(t, val.Data.Age, prevNumber, "ordering not preserved")
prevNumber = val.Data.Age
vMap[prevNumber] = true
}

// ensure that the values we have are the same as what is expected
if index < size {
for i := 0; i <= index; i++ {
if _, ok := vMap[i]; !ok {
assert.Fail(t, "value missing", i)
}
}
} else {
for i := index - size; i < index; i++ {
if _, ok := vMap[i+1]; !ok {
assert.Fail(t, "value missing", i)
}
}
}
}

for i := 0; i < numberOfMessages; i++ {
message.Age = i

_, err := stream.Add(ctx, message)
assert.NoError(t, err)

vals, err := stream.Range(ctx, "-", "+")
assert.NoError(t, err)

valuesCheck(i, streamSize, vals)
}
}

0 comments on commit 4917a00

Please sign in to comment.