Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup node management logic #2036

Merged
merged 1 commit into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions diagnose/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *Server) EnableDebug(ip string, port int) {
}

logrus.Infof("Starting the diagnose server listening on %d for commands", port)
srv := &http.Server{Addr: fmt.Sprintf("127.0.0.1:%d", port), Handler: s}
srv := &http.Server{Addr: fmt.Sprintf("%s:%d", ip, port), Handler: s}
s.srv = srv
s.enable = 1
go func(n *Server) {
Expand All @@ -101,7 +101,6 @@ func (s *Server) EnableDebug(ip string, port int) {
atomic.SwapInt32(&n.enable, 0)
}
}(s)

}

// DisableDebug stop the dubug and closes the tcp socket
Expand Down
77 changes: 23 additions & 54 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,61 @@ func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}

// getNode searches the node inside the tables
// returns true if the node was respectively in the active list, explicit node leave list or failed list
func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) {
var active bool
var left bool
var failed bool

for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
if n, ok := nodes[nEvent.NodeName]; ok {
active = &nodes == &nDB.nodes
left = &nodes == &nDB.leftNodes
failed = &nodes == &nDB.failedNodes
if n.ltime >= nEvent.LTime {
return active, left, failed, nil
}
if extract {
delete(nodes, n.Name)
}
return active, left, failed, n
}
}
return active, left, failed, nil
}

func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)

nDB.RLock()
active, left, _, n := nDB.getNode(nEvent, false)
defer nDB.RUnlock()

// check if the node exists
n, _, _ := nDB.findNode(nEvent.NodeName)
if n == nil {
nDB.RUnlock()
return false
}
nDB.RUnlock()

// check if the event is fresh
if n.ltime >= nEvent.LTime {
return false
}

// If we are here means that the event is fresher and the node is known. Update the laport time
n.ltime = nEvent.LTime

// If it is a node leave event for a manager and this is the only manager we
// know of we want the reconnect logic to kick in. In a single manager
// cluster manager's gossip can't be bootstrapped unless some other node
// connects to it.
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
for _, ip := range nDB.bootStrapIP {
if ip.Equal(n.Addr) {
n.ltime = nEvent.LTime
return true
}
}
}

n.ltime = nEvent.LTime

switch nEvent.Type {
case NodeEventTypeJoin:
if active {
// the node is already marked as active nothing to do
moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
if err != nil {
logrus.WithError(err).Error("unable to find the node to move")
return false
}
nDB.Lock()
// Because the lock got released on the previous check we have to do it again and re verify the status of the node
// All of this is to avoid a big lock on the function
if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil {
n.reapTime = 0
nDB.nodes[n.Name] = n
if moved {
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
}
nDB.Unlock()
return true
return moved
case NodeEventTypeLeave:
if left {
// the node is already marked as left nothing to do.
moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
if err != nil {
logrus.WithError(err).Error("unable to find the node to move")
return false
}
nDB.Lock()
// Because the lock got released on the previous check we have to do it again and re verify the status of the node
// All of this is to avoid a big lock on the function
if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil {
n.reapTime = nodeReapInterval
nDB.leftNodes[n.Name] = n
if moved {
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
}
nDB.Unlock()
return true
return moved
}

return false
Expand Down
61 changes: 17 additions & 44 deletions networkdb/event_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,6 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
}
}

func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) {
for name, node := range e.nDB.failedNodes {
if node.Addr.Equal(mn.Addr) {
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr)
delete(e.nDB.failedNodes, name)
return
}
}

for name, node := range e.nDB.leftNodes {
if node.Addr.Equal(mn.Addr) {
logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr)
delete(e.nDB.leftNodes, name)
return
}
}
}

func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opCreate)
Expand All @@ -57,44 +39,35 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
// Every node has a unique ID
// Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
// failed or shutdown one
e.purgeReincarnation(mn)
e.nDB.purgeReincarnation(mn)

e.nDB.nodes[mn.Name] = &node{Node: *mn}
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
}

func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
var failed bool
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opDelete)
// The node left or failed, delete all the entries created by it.
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
// If the node instead left because was going down, then it makes sense to just delete all its state

e.nDB.Lock()
defer e.nDB.Unlock()
e.nDB.deleteNodeFromNetworks(mn.Name)
e.nDB.deleteNodeTableEntries(mn.Name)
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)

// Check if a new incarnation of the same node already joined
// In that case this node can simply be removed and no further action are needed
for name, node := range e.nDB.nodes {
if node.Addr.Equal(mn.Addr) {
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr)
return
}
}

// In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h)
// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
n.reapTime = nodeReapInterval
e.nDB.failedNodes[mn.Name] = n
failed = true
n, currState, _ := e.nDB.findNode(mn.Name)
if n == nil {
logrus.Errorf("Node %s/%s not found in the node lists", mn.Name, mn.Addr)
return
}

if failed {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
// if the node was active means that did not send the leave cluster message, so it's probable that
// failed. Else would be already in the left list so nothing else has to be done
if currState == nodeActiveState {
moved, err := e.nDB.changeNodeState(mn.Name, nodeFailedState)
if err != nil {
logrus.WithError(err).Errorf("impossible condition, node %s/%s not present in the list", mn.Name, mn.Addr)
return
}
if moved {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
}
}
}

Expand Down
155 changes: 155 additions & 0 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"sync/atomic"
"testing"
"time"

"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -580,3 +582,156 @@ func TestNetworkDBGarbageCollection(t *testing.T) {

closeNetworkDBInstances(dbs)
}

func TestFindNode(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())

dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}

// active nodes is 2 because the testing node is in the list
assert.Equal(t, 2, len(dbs[0].nodes))
assert.Equal(t, 1, len(dbs[0].failedNodes))
assert.Equal(t, 1, len(dbs[0].leftNodes))

n, currState, m := dbs[0].findNode("active")
assert.NotNil(t, n)
assert.Equal(t, "active", n.Name)
assert.Equal(t, nodeActiveState, currState)
assert.NotNil(t, m)
// delete the entry manually
delete(m, "active")

// test if can be still find
n, currState, m = dbs[0].findNode("active")
assert.Nil(t, n)
assert.Equal(t, nodeNotFound, currState)
assert.Nil(t, m)

n, currState, m = dbs[0].findNode("failed")
assert.NotNil(t, n)
assert.Equal(t, "failed", n.Name)
assert.Equal(t, nodeFailedState, currState)
assert.NotNil(t, m)

// find and remove
n, currState, m = dbs[0].findNode("left")
assert.NotNil(t, n)
assert.Equal(t, "left", n.Name)
assert.Equal(t, nodeLeftState, currState)
assert.NotNil(t, m)
delete(m, "left")

n, currState, m = dbs[0].findNode("left")
assert.Nil(t, n)
assert.Equal(t, nodeNotFound, currState)
assert.Nil(t, m)

closeNetworkDBInstances(dbs)
}

func TestChangeNodeState(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())

dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}

// active nodes is 4 because the testing node is in the list
assert.Equal(t, 4, len(dbs[0].nodes))

n, currState, m := dbs[0].findNode("node1")
assert.NotNil(t, n)
assert.Equal(t, nodeActiveState, currState)
assert.Equal(t, "node1", n.Name)
assert.NotNil(t, m)

// node1 to failed
dbs[0].changeNodeState("node1", nodeFailedState)

n, currState, m = dbs[0].findNode("node1")
assert.NotNil(t, n)
assert.Equal(t, nodeFailedState, currState)
assert.Equal(t, "node1", n.Name)
assert.NotNil(t, m)
assert.NotEqual(t, time.Duration(0), n.reapTime)

// node1 back to active
dbs[0].changeNodeState("node1", nodeActiveState)

n, currState, m = dbs[0].findNode("node1")
assert.NotNil(t, n)
assert.Equal(t, nodeActiveState, currState)
assert.Equal(t, "node1", n.Name)
assert.NotNil(t, m)
assert.Equal(t, time.Duration(0), n.reapTime)

// node1 to left
dbs[0].changeNodeState("node1", nodeLeftState)
dbs[0].changeNodeState("node2", nodeLeftState)
dbs[0].changeNodeState("node3", nodeLeftState)

n, currState, m = dbs[0].findNode("node1")
assert.NotNil(t, n)
assert.Equal(t, nodeLeftState, currState)
assert.Equal(t, "node1", n.Name)
assert.NotNil(t, m)
assert.NotEqual(t, time.Duration(0), n.reapTime)

n, currState, m = dbs[0].findNode("node2")
assert.NotNil(t, n)
assert.Equal(t, nodeLeftState, currState)
assert.Equal(t, "node2", n.Name)
assert.NotNil(t, m)
assert.NotEqual(t, time.Duration(0), n.reapTime)

n, currState, m = dbs[0].findNode("node3")
assert.NotNil(t, n)
assert.Equal(t, nodeLeftState, currState)
assert.Equal(t, "node3", n.Name)
assert.NotNil(t, m)
assert.NotEqual(t, time.Duration(0), n.reapTime)

// active nodes is 1 because the testing node is in the list
assert.Equal(t, 1, len(dbs[0].nodes))
assert.Equal(t, 0, len(dbs[0].failedNodes))
assert.Equal(t, 3, len(dbs[0].leftNodes))

closeNetworkDBInstances(dbs)
}

func TestNodeReincarnation(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())

dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}

// active nodes is 2 because the testing node is in the list
assert.Equal(t, 2, len(dbs[0].nodes))
assert.Equal(t, 1, len(dbs[0].failedNodes))
assert.Equal(t, 1, len(dbs[0].leftNodes))

b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
assert.True(t, b)
dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}

b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
assert.True(t, b)
dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}

b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
assert.True(t, b)
dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}

b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
assert.False(t, b)

// active nodes is 1 because the testing node is in the list
assert.Equal(t, 4, len(dbs[0].nodes))
assert.Equal(t, 0, len(dbs[0].failedNodes))
assert.Equal(t, 3, len(dbs[0].leftNodes))

closeNetworkDBInstances(dbs)
}
Loading