Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid to take into account wrong versions of protocols in Vsn. #178

Merged
21 changes: 11 additions & 10 deletions memberlist.go
Expand Up @@ -72,6 +72,15 @@ type Memberlist struct {
logger *log.Logger
}

// BuildVsnArray creates the array of Vsn
func (conf *Config) BuildVsnArray() []uint8 {
return []uint8{
ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
conf.DelegateProtocolMin, conf.DelegateProtocolMax,
conf.DelegateProtocolVersion,
}
}

// newMemberlist creates the network listeners.
// Does not schedule execution of background maintenance.
func newMemberlist(conf *Config) (*Memberlist, error) {
Expand Down Expand Up @@ -402,11 +411,7 @@ func (m *Memberlist) setAlive() error {
Addr: addr,
Port: uint16(port),
Meta: meta,
Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
m.config.DelegateProtocolVersion,
},
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&a, nil, true)
return nil
Expand Down Expand Up @@ -447,11 +452,7 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
Addr: state.Addr,
Port: state.Port,
Meta: meta,
Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
m.config.DelegateProtocolVersion,
},
Vsn: m.config.BuildVsnArray(),
}
notifyCh := make(chan struct{})
m.aliveNode(&a, notifyCh, true)
Expand Down
4 changes: 4 additions & 0 deletions net_test.go
Expand Up @@ -569,6 +569,10 @@ func TestSendMsg_Piggyback(t *testing.T) {
Node: "rand",
Addr: []byte{127, 0, 0, 255},
Meta: nil,
Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, ProtocolVersionMin,
1, 1, 1,
},
}
m.encodeAndBroadcast("rand", aliveMsg, &a)

Expand Down
26 changes: 24 additions & 2 deletions state.go
Expand Up @@ -850,11 +850,26 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
return
}

if len(a.Vsn) >= 3 {
pMin := a.Vsn[0]
pMax := a.Vsn[1]
pCur := a.Vsn[2]
if pMin == 0 || pMax == 0 || pMin > pMax {
m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax)
return
}
}

// Invoke the Alive delegate if any. This can be used to filter out
// alive messages based on custom logic. For example, using a cluster name.
// Using a merge delegate is not enough, as it is possible for passive
// cluster merging to still occur.
if m.config.Alive != nil {
if len(a.Vsn) < 6 {
m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present",
a.Node, net.IP(a.Addr), a.Port)
return
}
node := &Node{
Name: a.Node,
Addr: a.Addr,
Expand Down Expand Up @@ -886,6 +901,14 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
},
State: stateDead,
}
if len(a.Vsn) > 5 {
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
state.PMin = a.Vsn[0]
state.PMax = a.Vsn[1]
state.PCur = a.Vsn[2]
state.DMin = a.Vsn[3]
state.DMax = a.Vsn[4]
state.DCur = a.Vsn[5]
}

// Add to map
m.nodeMap[a.Node] = state
Expand Down Expand Up @@ -965,9 +988,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
bytes.Equal(a.Vsn, versions) {
return
}

m.refute(state, a.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions)
} else {
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)

Expand Down