From 45e39912980ce50990569e53c31e168c110a7953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 14 Jul 2022 15:35:22 +0200 Subject: [PATCH] Fix loss of message after invalidating previous message in queue with single broadcast. --- queue.go | 6 ------ queue_test.go | 25 ++++++++++++++++++------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/queue.go b/queue.go index c970176e1..2eb33c544 100644 --- a/queue.go +++ b/queue.go @@ -248,12 +248,6 @@ func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) { if cur.name != "" { delete(q.tm, cur.name) } - - if q.tq.Len() == 0 { - // At idle there's no reason to let the id generator keep going - // indefinitely. - q.idGen = 0 - } } // addItem adds the given item into the overall datastructure. You must already diff --git a/queue_test.go b/queue_test.go index dcf3f245d..a297c9e59 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,6 +1,7 @@ package memberlist import ( + "bytes" "testing" "github.com/google/btree" @@ -128,24 +129,18 @@ func TestTransmitLimited_GetBroadcasts_Limit(t *testing.T) { partial1 := q.GetBroadcasts(3, 80) require.Equal(t, 3, len(partial1), "missing messages: %v", prettyPrintMessages(partial1)) - require.Equal(t, int64(4), q.idGen, "id generator doesn't reset until empty") - partial2 := q.GetBroadcasts(3, 80) require.Equal(t, 3, len(partial2), "missing messages: %v", prettyPrintMessages(partial2)) - require.Equal(t, int64(4), q.idGen, "id generator doesn't reset until empty") - // Only two not expired partial3 := q.GetBroadcasts(3, 80) require.Equal(t, 2, len(partial3), "missing messages: %v", prettyPrintMessages(partial3)) - require.Equal(t, int64(0), q.idGen, "id generator resets on empty") - // Should get nothing partial5 := q.GetBroadcasts(3, 80) require.Equal(t, 0, len(partial5), "missing messages: %v", prettyPrintMessages(partial5)) - require.Equal(t, int64(0), q.idGen, "id generator resets on empty") + require.Equal(t, int64(4), q.idGen, "id generator doesn't change when queue gets empty") } func prettyPrintMessages(msgs [][]byte) []string { @@ -226,3 +221,19 @@ func TestTransmitLimited_ordering(t *testing.T) { t.Fatalf("bad val %v, %d", dump[4].b.(*memberlistBroadcast).node, dump[4].transmits) } } + +func TestTransmitLimitedQueue_GenIdConflict(t *testing.T) { + broadcasts := &TransmitLimitedQueue{RetransmitMult: 3, NumNodes: func() int { return 10 }} + + broadcasts.QueueBroadcast(&memberlistBroadcast{node: "A", msg: []byte("A timestamp update")}) + // This invalidates previous message. This used to also reset internal idGen to 0 (because invalidation + // made queue empty), which then caused that "C left" message received later replaced "A left" message by mistake. + broadcasts.QueueBroadcast(&memberlistBroadcast{node: "A", msg: []byte("A left")}) + broadcasts.QueueBroadcast(&memberlistBroadcast{node: "B", msg: []byte("B timestamp update")}) + broadcasts.QueueBroadcast(&memberlistBroadcast{node: "C", msg: []byte("C left")}) + + require.Equal(t, 3, broadcasts.NumQueued()) + + messages := broadcasts.GetBroadcasts(0, 1024) + require.Equal(t, "B timestamp update, C left, A left", string(bytes.Join(messages, []byte(", ")))) +}