-
Notifications
You must be signed in to change notification settings - Fork 23
/
manager.go
160 lines (126 loc) · 3.83 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package node
import (
"strings"
"sync"
"github.com/Conflux-Chain/confura/util/metrics"
"github.com/Conflux-Chain/confura/util/rpc"
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
)
// nodeFactory factory method to create node instance
type nodeFactory func(group Group, name, url string, hm HealthMonitor) (Node, error)
// Manager manages full node cluster, including:
// 1. Monitor node health and disable/enable full node automatically.
// 2. Implements Router interface to route RPC requests to different full nodes
// in manner of consistent hashing.
type Manager struct {
group Group
nodes map[string]Node // node name => Node
hashRing *consistent.Consistent // consistent hashing algorithm
resolver RepartitionResolver // support repartition for hash ring
mu sync.RWMutex
nodeFactory nodeFactory // factory method to create node instance
nodeName2Epochs map[string]uint64 // node name => epoch
midEpoch uint64 // middle epoch of managed full nodes.
}
func NewManager(group Group, nf nodeFactory, urls []string) *Manager {
return NewManagerWithRepartition(group, nf, urls, &noopRepartitionResolver{})
}
func NewManagerWithRepartition(group Group, nf nodeFactory, urls []string, resolver RepartitionResolver) *Manager {
manager := Manager{
group: group,
nodeFactory: nf,
nodes: make(map[string]Node),
resolver: resolver,
nodeName2Epochs: make(map[string]uint64),
}
var members []consistent.Member
for _, url := range urls {
nodeName := rpc.Url2NodeName(url)
if _, ok := manager.nodes[nodeName]; !ok {
node, _ := nf(group, nodeName, url, &manager)
manager.nodes[nodeName] = node
members = append(members, node)
}
}
manager.hashRing = consistent.New(members, cfg.HashRingRaw())
return &manager
}
// Add adds fullnode to monitor
func (m *Manager) Add(url string) {
m.mu.Lock()
defer m.mu.Unlock()
nodeName := rpc.Url2NodeName(url)
if _, ok := m.nodes[nodeName]; !ok {
node, _ := m.nodeFactory(m.group, nodeName, url, m)
m.nodes[nodeName] = node
m.hashRing.Add(node)
}
}
// Remove removes monitored fullnode
func (m *Manager) Remove(url string) {
m.mu.Lock()
defer m.mu.Unlock()
nodeName := rpc.Url2NodeName(url)
if node, ok := m.nodes[nodeName]; ok {
node.Close()
delete(m.nodes, nodeName)
delete(m.nodeName2Epochs, nodeName)
m.hashRing.Remove(nodeName)
}
}
// Get gets monitored fullnode from url
func (m *Manager) Get(url string) Node {
m.mu.RLock()
defer m.mu.RUnlock()
nodeName := rpc.Url2NodeName(url)
return m.nodes[nodeName]
}
// List lists all monitored fullnodes
func (m *Manager) List() []Node {
m.mu.RLock()
defer m.mu.RUnlock()
var nodes []Node
for _, v := range m.nodes {
nodes = append(nodes, v)
}
return nodes
}
// String implements stringer interface
func (m *Manager) String() string {
m.mu.RLock()
defer m.mu.RUnlock()
var nodes []string
for n := range m.nodes {
nodes = append(nodes, n)
}
return strings.Join(nodes, ", ")
}
// Distribute distributes a full node by specified key.
func (m *Manager) Distribute(key []byte) Node {
k := xxhash.Sum64(key)
m.mu.RLock()
defer m.mu.RUnlock()
// Use repartition resolver to distribute if configured.
if name, ok := m.resolver.Get(k); ok {
return m.nodes[name]
}
member := m.hashRing.LocateKey(key)
if member == nil { // in case of empty consistent member
return nil
}
node := member.(Node)
m.resolver.Put(k, node.Name())
return node
}
// Route implements the Router interface.
func (m *Manager) Route(key []byte) string {
if n := m.Distribute(key); n != nil {
// metrics overall route QPS
metrics.Registry.Nodes.Routes(m.group.Space(), m.group.String(), "overall").Mark(1)
// metrics per node route QPS
metrics.Registry.Nodes.Routes(m.group.Space(), m.group.String(), n.Name()).Mark(1)
return n.Url()
}
return ""
}