/
node-catalog.go
88 lines (76 loc) · 2.19 KB
/
node-catalog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package registry
import (
"sync"
"time"
"github.com/Bendomey/nucleo-go"
log "github.com/sirupsen/logrus"
)
// NodeCatalog catalog of nodes
type NodeCatalog struct {
nodes sync.Map
logger *log.Entry
}
// CreateNodesCatalog create a node catalog
func CreateNodesCatalog(logger *log.Entry) *NodeCatalog {
return &NodeCatalog{sync.Map{}, logger}
}
// HeartBeat delegate the heart beat to the node in question payload.sender
func (catalog *NodeCatalog) HeartBeat(heartbeat map[string]interface{}) bool {
sender := heartbeat["sender"].(string)
node, nodeExists := catalog.nodes.Load(sender)
if nodeExists && (node.(nucleo.Node)).IsAvailable() {
(node.(nucleo.Node)).HeartBeat(heartbeat)
return true
}
return false
}
func (catalog *NodeCatalog) list() []nucleo.Node {
var result []nucleo.Node
catalog.nodes.Range(func(key, value interface{}) bool {
node := value.(nucleo.Node)
result = append(result, node)
return true
})
return result
}
// expiredNodes check nodes with heartbeat expired based on the timeout parameter
func (catalog *NodeCatalog) expiredNodes(timeout time.Duration) []nucleo.Node {
var result []nucleo.Node
catalog.nodes.Range(func(key, value interface{}) bool {
node := value.(nucleo.Node)
if node.IsExpired(timeout) {
result = append(result, node)
}
return true
})
return result
}
// findNode : return a Node instance from the catalog
func (catalog *NodeCatalog) findNode(nodeID string) (nucleo.Node, bool) {
node, exists := catalog.nodes.Load(nodeID)
if exists {
return node.(nucleo.Node), true
} else {
return nil, false
}
}
// removeNode : remove a node from the catalog
func (catalog *NodeCatalog) removeNode(nodeID string) {
catalog.nodes.Delete(nodeID)
}
func (catalog *NodeCatalog) Add(node nucleo.Node) {
catalog.nodes.Store(node.GetID(), node)
}
func (catalog *NodeCatalog) Info(info map[string]interface{}) (bool, bool) {
sender := info["sender"].(string)
node, exists := catalog.findNode(sender)
var reconnected bool
if exists {
reconnected = node.Update(sender, info)
} else {
node := CreateNode(sender, false, catalog.logger.WithField("remote-node", sender))
node.Update(sender, info)
catalog.Add(node)
}
return exists, reconnected
}