From db91773b7e14a4015b33c4dcaead4c96252e9983 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 24 Jan 2019 16:33:26 +0100 Subject: [PATCH 1/6] Avoid to take into account wrong versions of protocols in Vsn. On Consul, sometimes, nodes do send a pMin = pMan = 0 in Vsn This causes a corruption of the acceptable versions of protocol and thus requiring version = [0, 1]. After this corruption occurs, all new nodes cannot join anymore, it then force the restart of all Consul servers to resume normal operations. While not fixing the root cause, this patch discards alive nodes claiming version 0,0,0 and will avoid this breakage. See https://github.com/hashicorp/consul/issues/3217 --- memberlist.go | 21 +++--- net_test.go | 4 ++ state.go | 16 +++-- state_test.go | 174 ++++++++++++++++++++++++++++++++------------------ 4 files changed, 140 insertions(+), 75 deletions(-) diff --git a/memberlist.go b/memberlist.go index 3a4ce967b..f289a12ae 100644 --- a/memberlist.go +++ b/memberlist.go @@ -72,6 +72,15 @@ type Memberlist struct { logger *log.Logger } +// BuildVsnArray creates the array of Vsn +func (conf *Config) BuildVsnArray() []uint8 { + return []uint8{ + ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion, + conf.DelegateProtocolMin, conf.DelegateProtocolMax, + conf.DelegateProtocolVersion, + } +} + // newMemberlist creates the network listeners. // Does not schedule execution of background maintenance. func newMemberlist(conf *Config) (*Memberlist, error) { @@ -402,11 +411,7 @@ func (m *Memberlist) setAlive() error { Addr: addr, Port: uint16(port), Meta: meta, - Vsn: []uint8{ - ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, - m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, - m.config.DelegateProtocolVersion, - }, + Vsn: m.config.BuildVsnArray(), } m.aliveNode(&a, nil, true) return nil @@ -447,11 +452,7 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error { Addr: state.Addr, Port: state.Port, Meta: meta, - Vsn: []uint8{ - ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, - m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, - m.config.DelegateProtocolVersion, - }, + Vsn: m.config.BuildVsnArray(), } notifyCh := make(chan struct{}) m.aliveNode(&a, notifyCh, true) diff --git a/net_test.go b/net_test.go index dd4af1fd1..3301def36 100644 --- a/net_test.go +++ b/net_test.go @@ -569,6 +569,10 @@ func TestSendMsg_Piggyback(t *testing.T) { Node: "rand", Addr: []byte{127, 0, 0, 255}, Meta: nil, + Vsn: []uint8{ + ProtocolVersionMin, ProtocolVersionMax, ProtocolVersionMin, + 1, 1, 1, + }, } m.encodeAndBroadcast("rand", aliveMsg, &a) diff --git a/state.go b/state.go index 6caded313..3201c25b2 100644 --- a/state.go +++ b/state.go @@ -850,6 +850,16 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { return } + if len(a.Vsn) > 3 { + pMin := a.Vsn[0] + pMax := a.Vsn[1] + pCur := a.Vsn[2] + if pMin == 0 || pMax == 0 || pMin > pMax { + m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax) + return + } + } + // Invoke the Alive delegate if any. This can be used to filter out // alive messages based on custom logic. For example, using a cluster name. // Using a merge delegate is not enough, as it is possible for passive @@ -965,12 +975,9 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { bytes.Equal(a.Vsn, versions) { return } - m.refute(state, a.Incarnation) - m.logger.Printf("[WARN] memberlist: Refuting an alive message") + m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions) } else { - m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) - // Update protocol versions if it arrived if len(a.Vsn) > 0 { state.PMin = a.Vsn[0] @@ -980,6 +987,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { state.DMax = a.Vsn[4] state.DCur = a.Vsn[5] } + m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) // Update the state and incarnation number state.Incarnation = a.Incarnation diff --git a/state_test.go b/state_test.go index 14b21bf0b..a3c8d8600 100644 --- a/state_test.go +++ b/state_test.go @@ -53,6 +53,7 @@ func TestMemberList_Probe(t *testing.T) { Addr: []byte(addr1), Port: uint16(m1.config.BindPort), Incarnation: 1, + Vsn: m1.config.BuildVsnArray(), } m1.aliveNode(&a1, nil, true) a2 := alive{ @@ -60,6 +61,7 @@ func TestMemberList_Probe(t *testing.T) { Addr: []byte(addr2), Port: uint16(m2.config.BindPort), Incarnation: 1, + Vsn: m2.config.BuildVsnArray(), } m1.aliveNode(&a2, nil, false) @@ -104,14 +106,13 @@ func TestMemberList_ProbeNode_Suspect(t *testing.T) { c.BindPort = bindPort }) defer m3.Shutdown() - - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) - a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1} + a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()} m1.aliveNode(&a3, nil, false) - a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1} + a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a4, nil, false) n := m1.nodeMap[addr4.String()] @@ -162,7 +163,7 @@ func TestMemberList_ProbeNode_Suspect_Dogpile(t *testing.T) { bindPort := m.config.BindPort - a := alive{Node: addr.String(), Addr: []byte(addr), Port: uint16(bindPort), Incarnation: 1} + a := alive{Node: addr.String(), Addr: []byte(addr), Port: uint16(bindPort), Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, true) // Make all but one peer be an real, alive instance. @@ -177,14 +178,14 @@ func TestMemberList_ProbeNode_Suspect_Dogpile(t *testing.T) { peers = append(peers, peer) - a = alive{Node: peerAddr.String(), Addr: []byte(peerAddr), Port: uint16(bindPort), Incarnation: 1} + a = alive{Node: peerAddr.String(), Addr: []byte(peerAddr), Port: uint16(bindPort), Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) } // Just use a bogus address for the last peer so it doesn't respond // to pings, but tell the memberlist it's alive. badPeerAddr := getBindAddr() - a = alive{Node: badPeerAddr.String(), Addr: []byte(badPeerAddr), Port: uint16(bindPort), Incarnation: 1} + a = alive{Node: badPeerAddr.String(), Addr: []byte(badPeerAddr), Port: uint16(bindPort), Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) // Force a probe, which should start us into the suspect state. @@ -576,25 +577,19 @@ func TestMemberList_ProbeNode_Awareness_Degraded(t *testing.T) { }) defer m3.Shutdown() - // This will enable nacks by invoking the latest protocol version. - vsn := []uint8{ - ProtocolVersionMin, - ProtocolVersionMax, - m1.config.ProtocolVersion, - m1.config.DelegateProtocolMin, - m1.config.DelegateProtocolMax, - m1.config.DelegateProtocolVersion, - } - - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) - a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} + a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()} m1.aliveNode(&a3, nil, false) + vsn4 := []uint8{ + ProtocolVersionMin, ProtocolVersionMax, ProtocolVersionMin, + 1, 1, 1, + } // Node 4 never gets started. - a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} + a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn4} m1.aliveNode(&a4, nil, false) // Start the health in a degraded state. @@ -632,6 +627,66 @@ func TestMemberList_ProbeNode_Awareness_Degraded(t *testing.T) { } } +func TestMemberList_ProbeNode_Wrong_VSN(t *testing.T) { + addr1 := getBindAddr() + addr2 := getBindAddr() + addr3 := getBindAddr() + addr4 := getBindAddr() + ip1 := []byte(addr1) + ip2 := []byte(addr2) + ip3 := []byte(addr3) + ip4 := []byte(addr4) + + m1 := HostMemberlist(addr1.String(), t, func(c *Config) { + c.ProbeTimeout = 10 * time.Millisecond + c.ProbeInterval = 200 * time.Millisecond + }) + defer m1.Shutdown() + + bindPort := m1.config.BindPort + + m2 := HostMemberlist(addr2.String(), t, func(c *Config) { + c.BindPort = bindPort + c.ProbeTimeout = 10 * time.Millisecond + c.ProbeInterval = 200 * time.Millisecond + }) + defer m2.Shutdown() + + m3 := HostMemberlist(addr3.String(), t, func(c *Config) { + c.BindPort = bindPort + c.ProbeTimeout = 10 * time.Millisecond + c.ProbeInterval = 200 * time.Millisecond + }) + defer m3.Shutdown() + + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} + m1.aliveNode(&a1, nil, true) + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} + m1.aliveNode(&a2, nil, false) + a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()} + m1.aliveNode(&a3, nil, false) + + vsn4 := []uint8{ + 0, 0, 0, + 0, 0, 0, + } + // Node 4 never gets started. + a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn4} + m1.aliveNode(&a4, nil, false) + + // Start the health in a degraded state. + m1.awareness.ApplyDelta(1) + if score := m1.GetHealthScore(); score != 1 { + t.Fatalf("bad: %d", score) + } + + // Have node m1 probe m4. + n, ok := m1.nodeMap[addr4.String()] + if ok || n != nil { + t.Fatalf("expect node a4 to be not taken into account, because of its wrong version") + } +} + func TestMemberList_ProbeNode_Awareness_Improved(t *testing.T) { addr1 := getBindAddr() addr2 := getBindAddr() @@ -704,21 +759,12 @@ func TestMemberList_ProbeNode_Awareness_MissedNack(t *testing.T) { }) defer m2.Shutdown() - // This will enable nacks by invoking the latest protocol version. - vsn := []uint8{ - ProtocolVersionMin, - ProtocolVersionMax, - m1.config.ProtocolVersion, - m1.config.DelegateProtocolMin, - m1.config.DelegateProtocolMax, - m1.config.DelegateProtocolVersion, - } - - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) + vsn := m1.config.BuildVsnArray() // Node 3 and node 4 never get started. a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn} m1.aliveNode(&a3, nil, false) @@ -971,11 +1017,11 @@ func TestMemberList_ResetNodes(t *testing.T) { }) defer m.Shutdown() - a1 := alive{Node: "test1", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a1 := alive{Node: "test1", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a1, nil, false) - a2 := alive{Node: "test2", Addr: []byte{127, 0, 0, 2}, Incarnation: 1} + a2 := alive{Node: "test2", Addr: []byte{127, 0, 0, 2}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a2, nil, false) - a3 := alive{Node: "test3", Addr: []byte{127, 0, 0, 3}, Incarnation: 1} + a3 := alive{Node: "test3", Addr: []byte{127, 0, 0, 3}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a3, nil, false) d := dead{Node: "test2", Incarnation: 1} m.deadNode(&d) @@ -1156,7 +1202,7 @@ func TestMemberList_AliveNode_NewNode(t *testing.T) { }) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) if len(m.nodes) != 1 { @@ -1204,7 +1250,7 @@ func TestMemberList_AliveNode_SuspectNode(t *testing.T) { }) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) // Listen only after first join @@ -1255,7 +1301,7 @@ func TestMemberList_AliveNode_Idempotent(t *testing.T) { }) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) // Listen only after first join @@ -1348,7 +1394,8 @@ func TestMemberList_AliveNode_ChangeMeta(t *testing.T) { Node: "test", Addr: []byte{127, 0, 0, 1}, Meta: []byte("val1"), - Incarnation: 1} + Incarnation: 1, + Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) // Listen only after first join @@ -1389,7 +1436,7 @@ func TestMemberList_AliveNode_Refute(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, true) // Clear queue @@ -1401,6 +1448,7 @@ func TestMemberList_AliveNode_Refute(t *testing.T) { Addr: []byte{127, 0, 0, 1}, Incarnation: 2, Meta: []byte("foo"), + Vsn: m.config.BuildVsnArray(), } m.aliveNode(&s, nil, false) @@ -1442,7 +1490,7 @@ func TestMemberList_SuspectNode(t *testing.T) { }) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) m.changeNode("test", func(state *nodeState) { @@ -1501,7 +1549,7 @@ func TestMemberList_SuspectNode_DoubleSuspect(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) state := m.nodeMap["test"] @@ -1540,7 +1588,7 @@ func TestMemberList_SuspectNode_OldSuspect(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) state := m.nodeMap["test"] @@ -1566,7 +1614,7 @@ func TestMemberList_SuspectNode_Refute(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, true) // Clear queue @@ -1620,7 +1668,7 @@ func TestMemberList_DeadNode(t *testing.T) { }) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) // Read the join event @@ -1666,7 +1714,7 @@ func TestMemberList_DeadNode_Double(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) state := m.nodeMap["test"] @@ -1701,7 +1749,7 @@ func TestMemberList_DeadNode_OldDead(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) state := m.nodeMap["test"] @@ -1719,7 +1767,7 @@ func TestMemberList_DeadNode_AliveReplay(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10} + a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, false) d := dead{Node: "test", Incarnation: 10} @@ -1739,7 +1787,7 @@ func TestMemberList_DeadNode_Refute(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a, nil, true) // Clear queue @@ -1778,11 +1826,11 @@ func TestMemberList_MergeState(t *testing.T) { m := GetMemberlist(t, nil) defer m.Shutdown() - a1 := alive{Node: "test1", Addr: []byte{127, 0, 0, 1}, Incarnation: 1} + a1 := alive{Node: "test1", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a1, nil, false) - a2 := alive{Node: "test2", Addr: []byte{127, 0, 0, 2}, Incarnation: 1} + a2 := alive{Node: "test2", Addr: []byte{127, 0, 0, 2}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a2, nil, false) - a3 := alive{Node: "test3", Addr: []byte{127, 0, 0, 3}, Incarnation: 1} + a3 := alive{Node: "test3", Addr: []byte{127, 0, 0, 3}, Incarnation: 1, Vsn: m.config.BuildVsnArray()} m.aliveNode(&a3, nil, false) s := suspect{Node: "test1", Incarnation: 1} @@ -1888,11 +1936,15 @@ func TestMemberlist_Gossip(t *testing.T) { }) defer m2.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} + m3 := HostMemberlist(addr2.String(), t, func(c *Config) { + }) + defer m3.Shutdown() + + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) - a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1} + a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()} m1.aliveNode(&a3, nil, false) // Gossip should send all this to m2. Retry a few times because it's UDP and @@ -1955,9 +2007,9 @@ func TestMemberlist_GossipToDead(t *testing.T) { defer m2.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) // Shouldn't send anything to m2 here, node has been dead for 2x the GossipToTheDeadTime @@ -2008,9 +2060,9 @@ func TestMemberlist_PushPull(t *testing.T) { }) defer m2.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) // Gossip should send all this to m2. It's UDP though so retry a few times From 7e706c4a55a610d59d0564a8722412b1c24165da Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Fri, 25 Jan 2019 14:23:54 +0100 Subject: [PATCH 2/6] Always set the Vsn when creating state, so race condition cannot happen --- state.go | 8 ++++++++ state_test.go | 14 +++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/state.go b/state.go index 3201c25b2..073ba6893 100644 --- a/state.go +++ b/state.go @@ -896,6 +896,14 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { }, State: stateDead, } + if len(a.Vsn) > 5 { + state.PMin = a.Vsn[0] + state.PMax = a.Vsn[1] + state.PCur = a.Vsn[2] + state.DMin = a.Vsn[3] + state.DMax = a.Vsn[4] + state.DCur = a.Vsn[5] + } // Add to map m.nodeMap[a.Node] = state diff --git a/state_test.go b/state_test.go index a3c8d8600..3c3414587 100644 --- a/state_test.go +++ b/state_test.go @@ -706,9 +706,9 @@ func TestMemberList_ProbeNode_Awareness_Improved(t *testing.T) { }) defer m2.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) // Start the health in a degraded state. @@ -835,11 +835,11 @@ func TestMemberList_ProbeNode_Awareness_OldProtocol(t *testing.T) { }) defer m3.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a2, nil, false) - a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1} + a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()} m1.aliveNode(&a3, nil, false) // Node 4 never gets started. @@ -899,8 +899,8 @@ func TestMemberList_ProbeNode_Buddy(t *testing.T) { }) defer m2.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} m1.aliveNode(&a1, nil, true) m1.aliveNode(&a2, nil, false) From b68ff894dafbe52a5f6b76cf82b6852b85b26b00 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Fri, 25 Jan 2019 16:34:41 +0100 Subject: [PATCH 3/6] Do not move m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) since not needed --- state.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/state.go b/state.go index 073ba6893..962820d90 100644 --- a/state.go +++ b/state.go @@ -986,6 +986,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { m.refute(state, a.Incarnation) m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions) } else { + m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) + // Update protocol versions if it arrived if len(a.Vsn) > 0 { state.PMin = a.Vsn[0] @@ -995,7 +997,6 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { state.DMax = a.Vsn[4] state.DCur = a.Vsn[5] } - m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) // Update the state and incarnation number state.Incarnation = a.Incarnation From bf53e0b94385d43ebdf86a250050f9de0b1a95bb Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Fri, 25 Jan 2019 16:42:26 +0100 Subject: [PATCH 4/6] Test the bare minimum for size of Vsn Co-Authored-By: pierresouchay --- state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state.go b/state.go index 962820d90..5254d0192 100644 --- a/state.go +++ b/state.go @@ -850,7 +850,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { return } - if len(a.Vsn) > 3 { + if len(a.Vsn) >= 3 { pMin := a.Vsn[0] pMax := a.Vsn[1] pCur := a.Vsn[2] From 64a9135421cde59c4a7e1778a9b21f43887b9ee9 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Fri, 25 Jan 2019 16:46:51 +0100 Subject: [PATCH 5/6] Fixed test TestMemberList_ProbeNode_Awareness_OldProtocol --- state_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/state_test.go b/state_test.go index 3c3414587..7923bbf43 100644 --- a/state_test.go +++ b/state_test.go @@ -835,11 +835,11 @@ func TestMemberList_ProbeNode_Awareness_OldProtocol(t *testing.T) { }) defer m3.Shutdown() - a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()} + a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1} m1.aliveNode(&a1, nil, true) - a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()} + a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1} m1.aliveNode(&a2, nil, false) - a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()} + a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1} m1.aliveNode(&a3, nil, false) // Node 4 never gets started. From a99b24c260688050723b6b58237c7aa644da0d03 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 31 Jan 2019 09:48:06 +0100 Subject: [PATCH 6/6] Avoid to crash when len(Vsn) is incorrect and ignore the message when there is an Alive delegate --- state.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/state.go b/state.go index 5254d0192..d77fe296a 100644 --- a/state.go +++ b/state.go @@ -865,6 +865,11 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { // Using a merge delegate is not enough, as it is possible for passive // cluster merging to still occur. if m.config.Alive != nil { + if len(a.Vsn) < 6 { + m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present", + a.Node, net.IP(a.Addr), a.Port) + return + } node := &Node{ Name: a.Node, Addr: a.Addr,