From a5b66935aa505eac9d4a995439b50b9d8ba48da8 Mon Sep 17 00:00:00 2001 From: Josh Hook Date: Thu, 22 Sep 2022 09:45:57 +0100 Subject: [PATCH 1/3] Resolve $ to latest ID in XREAD --- cmd_stream.go | 5 ++++- cmd_stream_test.go | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cmd_stream.go b/cmd_stream.go index dea316cc..be7067b3 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -924,11 +924,14 @@ parsing: } opts.streams, opts.ids = args[0:len(args)/2], args[len(args)/2:] - for _, id := range opts.ids { + for i, id := range opts.ids { if _, err := parseStreamID(id); id != `$` && err != nil { setDirty(c) c.WriteError(msgInvalidStreamID) return + } else if id == "$" { + db := m.DB(getCtx(c).selectedDB) + opts.ids[i] = db.streamKeys[opts.streams[i]].lastID() } } args = nil diff --git a/cmd_stream_test.go b/cmd_stream_test.go index d0dfb69d..f5632677 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -417,6 +417,29 @@ func TestStreamRead(t *testing.T) { ), ), ) + + // XREAD blocking test using latest ID + go t.Run("XREAD_blocking_async", func(t *testing.T) { + mustDo(t, c, + "XREAD", "BLOCK", "0", "STREAMS", "planets", "$", + proto.Array( + proto.Array(proto.String("planets"), + proto.Array( + proto.Array(proto.String("5-1"), proto.Strings("name", "block", "idx", "6")), + ), + ), + ), + ) + }) + + // Wait for the blocking XREAD to start and then run XADD + xaddClient, err := proto.Dial(s.Addr()) + ok(t, err) + defer xaddClient.Close() + + time.Sleep(time.Second) + _, err = xaddClient.Do("XADD", "planets", "5-1", "name", "block", "idx", "6") + ok(t, err) }) t.Run("error cases", func(t *testing.T) { From 34da0a43c7b0f83ce501261d8a997c6dffbe9cf7 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 12 Oct 2022 16:09:19 +0200 Subject: [PATCH 2/3] cleanup test a bit --- cmd_stream_test.go | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/cmd_stream_test.go b/cmd_stream_test.go index f5632677..8ad6dd64 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -5,6 +5,7 @@ import ( "math" "regexp" "strconv" + "sync" "testing" "time" @@ -418,28 +419,33 @@ func TestStreamRead(t *testing.T) { ), ) - // XREAD blocking test using latest ID - go t.Run("XREAD_blocking_async", func(t *testing.T) { - mustDo(t, c, - "XREAD", "BLOCK", "0", "STREAMS", "planets", "$", - proto.Array( - proto.Array(proto.String("planets"), - proto.Array( - proto.Array(proto.String("5-1"), proto.Strings("name", "block", "idx", "6")), + t.Run("blocking async", func(t *testing.T) { + // XREAD blocking test using latest ID + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + mustDo(t, c, + "XREAD", "BLOCK", "0", "STREAMS", "planets", "$", + proto.Array( + proto.Array(proto.String("planets"), + proto.Array( + proto.Array(proto.String("5-1"), proto.Strings("name", "block", "idx", "6")), + ), ), ), - ), - ) - }) + ) + }() - // Wait for the blocking XREAD to start and then run XADD - xaddClient, err := proto.Dial(s.Addr()) - ok(t, err) - defer xaddClient.Close() + // Wait for the blocking XREAD to start and then run XADD + xaddClient, err := proto.Dial(s.Addr()) + ok(t, err) + defer xaddClient.Close() - time.Sleep(time.Second) - _, err = xaddClient.Do("XADD", "planets", "5-1", "name", "block", "idx", "6") - ok(t, err) + _, err = xaddClient.Do("XADD", "planets", "5-1", "name", "block", "idx", "6") + ok(t, err) + wg.Wait() + }) }) t.Run("error cases", func(t *testing.T) { From ca6b916ff2b9558a577e723e1ca0d920a0991794 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 12 Oct 2022 17:33:04 +0200 Subject: [PATCH 3/3] fix race --- stream.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/stream.go b/stream.go index c09051a2..2a1fe537 100644 --- a/stream.go +++ b/stream.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" ) @@ -17,6 +18,7 @@ type streamKey struct { entries []StreamEntry groups map[string]*streamGroup lastAllocatedID string + mu sync.Mutex } // a StreamEntry is an entry in a stream. The ID is always of the form @@ -52,6 +54,7 @@ func newStreamKey() *streamKey { } } +// generateID doesn't lock the mutex func (s *streamKey) generateID(now time.Time) string { ts := uint64(now.UnixNano()) / 1_000_000 @@ -71,6 +74,7 @@ func (s *streamKey) generateID(now time.Time) string { return next } +// lastID doesn't lock the mutex func (s *streamKey) lastID() string { if len(s.entries) == 0 { return "0-0" @@ -80,6 +84,9 @@ func (s *streamKey) lastID() string { } func (s *streamKey) copy() *streamKey { + s.mu.Lock() + defer s.mu.Unlock() + cpy := &streamKey{ entries: s.entries, } @@ -194,6 +201,9 @@ func reversedStreamEntries(o []StreamEntry) []StreamEntry { } func (s *streamKey) createGroup(group, id string) error { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.groups[group]; ok { return errors.New("BUSYGROUP Consumer Group name already exists") } @@ -213,6 +223,9 @@ func (s *streamKey) createGroup(group, id string) error { // If id is empty or "*" the ID will be generated automatically. // `values` should have an even length. func (s *streamKey) add(entryID string, values []string, now time.Time) (string, error) { + s.mu.Lock() + defer s.mu.Unlock() + if entryID == "" || entryID == "*" { entryID = s.generateID(now) } @@ -236,6 +249,9 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, } func (s *streamKey) trim(n int) { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.entries) > n { s.entries = s.entries[len(s.entries)-n:] } @@ -243,6 +259,9 @@ func (s *streamKey) trim(n int) { // all entries after "id" func (s *streamKey) after(id string) []StreamEntry { + s.mu.Lock() + defer s.mu.Unlock() + pos := sort.Search(len(s.entries), func(i int) bool { return streamCmp(id, s.entries[i].ID) < 0 }) @@ -252,6 +271,9 @@ func (s *streamKey) after(id string) []StreamEntry { // get a stream entry by ID // Also returns the position in the entries slice, if found. func (s *streamKey) get(id string) (int, *StreamEntry) { + s.mu.Lock() + defer s.mu.Unlock() + pos := sort.Search(len(s.entries), func(i int) bool { return streamCmp(id, s.entries[i].ID) <= 0 })