Skip to content

Commit

Permalink
introduce new state: stateLeft (#213)
Browse files Browse the repository at this point in the history
This PR introduces another state: stateLeft. This is an attempt to help fix hashicorp/consul#6897.

When memberlist processes a dead message, it will now check where it is coming from and if the reporting node is equal to the reported node, we know this node wants to leave. In this case we don't have to prevent new nodes from using the same ip.

If a node reports another node, the behavior stays the same.
  • Loading branch information
hanshasselberg committed Jan 31, 2020
1 parent ce23509 commit 237d410
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 11 deletions.
11 changes: 8 additions & 3 deletions memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (m *Memberlist) Members() []*Node {

nodes := make([]*Node, 0, len(m.nodes))
for _, n := range m.nodes {
if n.State != stateDead {
if !n.DeadOrLeft() {
nodes = append(nodes, &n.Node)
}
}
Expand All @@ -542,7 +542,7 @@ func (m *Memberlist) NumMembers() (alive int) {
defer m.nodeLock.RUnlock()

for _, n := range m.nodes {
if n.State != stateDead {
if !n.DeadOrLeft() {
alive++
}
}
Expand Down Expand Up @@ -579,9 +579,14 @@ func (m *Memberlist) Leave(timeout time.Duration) error {
return nil
}

// This dead message is special, because Node and From are the
// same. This helps other nodes figure out that a node left
// intentionally. When Node equals From, other nodes know for
// sure this node is gone.
d := dead{
Incarnation: state.Incarnation,
Node: state.Name,
From: state.Name,
}
m.deadNode(&d)

Expand All @@ -607,7 +612,7 @@ func (m *Memberlist) anyAlive() bool {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
for _, n := range m.nodes {
if n.State != stateDead && n.Name != m.config.Name {
if !n.DeadOrLeft() && n.Name != m.config.Name {
return true
}
}
Expand Down
4 changes: 4 additions & 0 deletions memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,10 @@ func TestMemberlist_Leave(t *testing.T) {
if len(m2.Members()) != 1 {
t.Fatalf("should have 1 node")
}

if m2.nodeMap[c1.Name].State != stateLeft {
t.Fatalf("bad state")
}
}

func TestMemberlist_JoinShutdown(t *testing.T) {
Expand Down
29 changes: 22 additions & 7 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
stateAlive nodeStateType = iota
stateSuspect
stateDead
stateLeft
)

// Node represents a node in the cluster.
Expand Down Expand Up @@ -60,6 +61,10 @@ func (n *nodeState) Address() string {
return n.Node.Address()
}

func (n *nodeState) DeadOrLeft() bool {
return n.State == stateDead || n.State == stateLeft
}

// ackHandler is used to register handlers for incoming acks and nacks.
type ackHandler struct {
ackFn func([]byte, time.Time)
Expand Down Expand Up @@ -218,7 +223,7 @@ START:
node = *m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true
} else if node.State == stateDead {
} else if node.DeadOrLeft() {
skip = true
}

Expand Down Expand Up @@ -963,8 +968,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)

// Allow the address to be updated if a dead node is being replaced.
if state.State == stateDead && canReclaim {
m.logger.Printf("[INFO] memberlist: Updating address for failed node %s from %v:%d to %v:%d",
if state.State == stateLeft || (state.State == stateDead && canReclaim) {
m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
updatesNode = true
} else {
Expand Down Expand Up @@ -1059,8 +1064,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {

// Notify the delegate of any relevant updates
if m.config.Events != nil {
if oldState == stateDead {
// if Dead -> Alive, notify of join
if oldState == stateDead || oldState == stateLeft {
// if Dead/Left -> Alive, notify of join
m.config.Events.NotifyJoin(&state.Node)

} else if !bytes.Equal(oldMeta, state.Meta) {
Expand Down Expand Up @@ -1179,7 +1184,7 @@ func (m *Memberlist) deadNode(d *dead) {
delete(m.nodeTimers, d.Node)

// Ignore if node is already dead
if state.State == stateDead {
if state.DeadOrLeft() {
return
}

Expand All @@ -1203,7 +1208,14 @@ func (m *Memberlist) deadNode(d *dead) {

// Update the state
state.Incarnation = d.Incarnation
state.State = stateDead

// If the dead message was send by the node itself, mark it is left
// instead of dead.
if d.Node == d.From {
state.State = stateLeft
} else {
state.State = stateDead
}
state.StateChange = time.Now()

// Notify of death
Expand All @@ -1228,6 +1240,9 @@ func (m *Memberlist) mergeState(remote []pushNodeState) {
}
m.aliveNode(&a, nil, false)

case stateLeft:
d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
m.deadNode(&d)
case stateDead:
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
Expand Down
76 changes: 75 additions & 1 deletion state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,80 @@ func TestMemberList_DeadNode_NoNode(t *testing.T) {
}
}

func TestMemberList_DeadNodeLeft(t *testing.T) {
ch := make(chan NodeEvent, 1)

m := GetMemberlist(t, func(c *Config) {
c.Events = &ChannelEventDelegate{ch}
})
defer m.Shutdown()

nodeName := "node1"
s1 := alive{
Node: nodeName,
Addr: []byte{127, 0, 0, 1},
Port: 8000,
Incarnation: 1,
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s1, nil, false)

// Read the join event
<-ch

d := dead{Node: nodeName, From: nodeName, Incarnation: 1}
m.deadNode(&d)

// Read the dead event
<-ch

state := m.nodeMap[nodeName]
if state.State != stateLeft {
t.Fatalf("Bad state")
}

// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}

// Check its a dead message
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != deadMsg {
t.Fatalf("expected queued dead msg")
}

// Clear queue
// m.broadcasts.Reset()

// New alive node
s2 := alive{
Node: nodeName,
Addr: []byte{127, 0, 0, 2},
Port: 9000,
Incarnation: 3,
Meta: []byte("foo"),
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s2, nil, false)

// Read the join event
<-ch

state = m.nodeMap[nodeName]
if state.State != stateAlive {
t.Fatalf("should still be alive")
}
if !bytes.Equal(state.Meta, []byte("foo")) {
t.Fatalf("meta should be updated")
}
if !bytes.Equal(state.Addr, []byte{127, 0, 0, 2}) {
t.Fatalf("address should be updated")
}
if state.Port != 9000 {
t.Fatalf("port should be updated")
}
}

func TestMemberList_DeadNode(t *testing.T) {
ch := make(chan NodeEvent, 1)

Expand Down Expand Up @@ -1784,7 +1858,7 @@ func TestMemberList_DeadNode(t *testing.T) {
t.Fatalf("expected only one queued message")
}

// Check its a suspect message
// Check its a dead message
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != deadMsg {
t.Fatalf("expected queued dead msg")
}
Expand Down

0 comments on commit 237d410

Please sign in to comment.