Skip to content

Commit

Permalink
Merge pull request #4 from grafana/fix-queue-id-reset
Browse files Browse the repository at this point in the history
Fix loss of message after invalidating previous message in queue with single broadcast
  • Loading branch information
pstibrany committed Jul 14, 2022
2 parents bd88e10 + 45e3991 commit 09ffed8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
6 changes: 0 additions & 6 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,6 @@ func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) {
if cur.name != "" {
delete(q.tm, cur.name)
}

if q.tq.Len() == 0 {
// At idle there's no reason to let the id generator keep going
// indefinitely.
q.idGen = 0
}
}

// addItem adds the given item into the overall datastructure. You must already
Expand Down
25 changes: 18 additions & 7 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memberlist

import (
"bytes"
"testing"

"github.com/google/btree"
Expand Down Expand Up @@ -128,24 +129,18 @@ func TestTransmitLimited_GetBroadcasts_Limit(t *testing.T) {
partial1 := q.GetBroadcasts(3, 80)
require.Equal(t, 3, len(partial1), "missing messages: %v", prettyPrintMessages(partial1))

require.Equal(t, int64(4), q.idGen, "id generator doesn't reset until empty")

partial2 := q.GetBroadcasts(3, 80)
require.Equal(t, 3, len(partial2), "missing messages: %v", prettyPrintMessages(partial2))

require.Equal(t, int64(4), q.idGen, "id generator doesn't reset until empty")

// Only two not expired
partial3 := q.GetBroadcasts(3, 80)
require.Equal(t, 2, len(partial3), "missing messages: %v", prettyPrintMessages(partial3))

require.Equal(t, int64(0), q.idGen, "id generator resets on empty")

// Should get nothing
partial5 := q.GetBroadcasts(3, 80)
require.Equal(t, 0, len(partial5), "missing messages: %v", prettyPrintMessages(partial5))

require.Equal(t, int64(0), q.idGen, "id generator resets on empty")
require.Equal(t, int64(4), q.idGen, "id generator doesn't change when queue gets empty")
}

func prettyPrintMessages(msgs [][]byte) []string {
Expand Down Expand Up @@ -226,3 +221,19 @@ func TestTransmitLimited_ordering(t *testing.T) {
t.Fatalf("bad val %v, %d", dump[4].b.(*memberlistBroadcast).node, dump[4].transmits)
}
}

func TestTransmitLimitedQueue_GenIdConflict(t *testing.T) {
broadcasts := &TransmitLimitedQueue{RetransmitMult: 3, NumNodes: func() int { return 10 }}

broadcasts.QueueBroadcast(&memberlistBroadcast{node: "A", msg: []byte("A timestamp update")})
// This invalidates previous message. This used to also reset internal idGen to 0 (because invalidation
// made queue empty), which then caused that "C left" message received later replaced "A left" message by mistake.
broadcasts.QueueBroadcast(&memberlistBroadcast{node: "A", msg: []byte("A left")})
broadcasts.QueueBroadcast(&memberlistBroadcast{node: "B", msg: []byte("B timestamp update")})
broadcasts.QueueBroadcast(&memberlistBroadcast{node: "C", msg: []byte("C left")})

require.Equal(t, 3, broadcasts.NumQueued())

messages := broadcasts.GetBroadcasts(0, 1024)
require.Equal(t, "B timestamp update, C left, A left", string(bytes.Join(messages, []byte(", "))))
}

0 comments on commit 09ffed8

Please sign in to comment.