/
node.go
367 lines (315 loc) · 9.44 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
package redis
import (
"errors"
"fmt"
"net"
"sort"
"strings"
"time"
"github.com/amadeusitgroup/redis-operator/pkg/api/redis/v1"
"github.com/amadeusitgroup/redis-operator/pkg/utils"
kapiv1 "k8s.io/api/core/v1"
)
const (
// DefaultRedisPort define the default Redis Port
DefaultRedisPort = "6379"
// RedisMasterRole redis role master
redisMasterRole = "master"
// RedisSlaveRole redis role slave
redisSlaveRole = "slave"
)
const (
// RedisLinkStateConnected redis connection status connected
RedisLinkStateConnected = "connected"
// RedisLinkStateDisconnected redis connection status disconnected
RedisLinkStateDisconnected = "disconnected"
)
const (
// NodeStatusPFail Node is in PFAIL state. Not reachable for the node you are contacting, but still logically reachable
NodeStatusPFail = "fail?"
// NodeStatusFail Node is in FAIL state. It was not reachable for multiple nodes that promoted the PFAIL state to FAIL
NodeStatusFail = "fail"
// NodeStatusHandshake Untrusted node, we are handshaking.
NodeStatusHandshake = "handshake"
// NodeStatusNoAddr No address known for this node
NodeStatusNoAddr = "noaddr"
// NodeStatusNoFlags no flags at all
NodeStatusNoFlags = "noflags"
)
// Node Represent a Redis Node
type Node struct {
ID string
IP string
Port string
Role string
LinkState string
MasterReferent string
FailStatus []string
PingSent int64
PongRecv int64
ConfigEpoch int64
Slots []Slot
MigratingSlots map[Slot]string
ImportingSlots map[Slot]string
ServerStartTime time.Time
Pod *kapiv1.Pod
}
// Nodes represent a Node slice
type Nodes []*Node
func (n Nodes) String() string {
stringer := []utils.Stringer{}
for _, node := range n {
stringer = append(stringer, node)
}
return utils.SliceJoin(stringer, ",")
}
// NewDefaultNode builds and returns new defaultNode instance
func NewDefaultNode() *Node {
return &Node{
Port: DefaultRedisPort,
Slots: []Slot{},
MigratingSlots: map[Slot]string{},
ImportingSlots: map[Slot]string{},
}
}
// NewNode builds and returns new Node instance
func NewNode(id, ip string, pod *kapiv1.Pod) *Node {
node := NewDefaultNode()
node.ID = id
node.IP = ip
node.Pod = pod
return node
}
// SetRole from a flags string list set the Node's role
func (n *Node) SetRole(flags string) error {
n.Role = "" // reset value before setting the new one
vals := strings.Split(flags, ",")
for _, val := range vals {
switch val {
case redisMasterRole:
n.Role = redisMasterRole
case redisSlaveRole:
n.Role = redisSlaveRole
}
}
if n.Role == "" {
return errors.New("Node SetRole failed")
}
return nil
}
// GetRole return the Redis Cluster Node GetRole
func (n *Node) GetRole() v1.RedisClusterNodeRole {
switch n.Role {
case redisMasterRole:
return v1.RedisClusterNodeRoleMaster
case redisSlaveRole:
return v1.RedisClusterNodeRoleSlave
default:
if n.MasterReferent != "" {
return v1.RedisClusterNodeRoleSlave
}
if len(n.Slots) > 0 {
return v1.RedisClusterNodeRoleMaster
}
}
return v1.RedisClusterNodeRoleNone
}
// String string representation of a Instance
func (n *Node) String() string {
if n.ServerStartTime.IsZero() {
return fmt.Sprintf("{Redis ID: %s, role: %s, master: %s, link: %s, status: %s, addr: %s, slots: %s, len(migratingSlots): %d, len(importingSlots): %d}", n.ID, n.GetRole(), n.MasterReferent, n.LinkState, n.FailStatus, n.IPPort(), SlotSlice(n.Slots), len(n.MigratingSlots), len(n.ImportingSlots))
}
return fmt.Sprintf("{Redis ID: %s, role: %s, master: %s, link: %s, status: %s, addr: %s, slots: %s, len(migratingSlots): %d, len(importingSlots): %d, ServerStartTime: %s}", n.ID, n.GetRole(), n.MasterReferent, n.LinkState, n.FailStatus, n.IPPort(), SlotSlice(n.Slots), len(n.MigratingSlots), len(n.ImportingSlots), n.ServerStartTime.Format("2006-01-02 15:04:05"))
}
// IPPort returns join Ip Port string
func (n *Node) IPPort() string {
return net.JoinHostPort(n.IP, n.Port)
}
// GetNodesByFunc returns first node found by the FindNodeFunc
func (n Nodes) GetNodesByFunc(f FindNodeFunc) (Nodes, error) {
nodes := Nodes{}
for _, node := range n {
if f(node) {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nodes, nodeNotFoundedError
}
return nodes, nil
}
// ToAPINode used to convert the current Node to an API v1.RedisClusterNode
func (n *Node) ToAPINode() v1.RedisClusterNode {
apiNode := v1.RedisClusterNode{
ID: n.ID,
IP: n.IP,
PodName: n.Pod.Name,
Role: n.GetRole(),
Slots: []string{},
}
return apiNode
}
// Clear used to clear possible ressources attach to the current Node
func (n *Node) Clear() {
}
// SetLinkStatus set the Node link status
func (n *Node) SetLinkStatus(status string) error {
n.LinkState = "" // reset value before setting the new one
switch status {
case RedisLinkStateConnected:
n.LinkState = RedisLinkStateConnected
case RedisLinkStateDisconnected:
n.LinkState = RedisLinkStateDisconnected
}
if n.LinkState == "" {
return errors.New("Node SetLinkStatus failed")
}
return nil
}
// SetFailureStatus set from inputs flags the possible failure status
func (n *Node) SetFailureStatus(flags string) {
n.FailStatus = []string{} // reset value before setting the new one
vals := strings.Split(flags, ",")
for _, val := range vals {
switch val {
case NodeStatusFail:
n.FailStatus = append(n.FailStatus, NodeStatusFail)
case NodeStatusPFail:
n.FailStatus = append(n.FailStatus, NodeStatusPFail)
case NodeStatusHandshake:
n.FailStatus = append(n.FailStatus, NodeStatusHandshake)
case NodeStatusNoAddr:
n.FailStatus = append(n.FailStatus, NodeStatusNoAddr)
case NodeStatusNoFlags:
n.FailStatus = append(n.FailStatus, NodeStatusNoFlags)
}
}
}
// SetReferentMaster set the redis node parent referent
func (n *Node) SetReferentMaster(ref string) {
n.MasterReferent = ""
if ref == "-" {
return
}
n.MasterReferent = ref
}
// TotalSlots return the total number of slot
func (n *Node) TotalSlots() int {
return len(n.Slots)
}
// HasStatus returns true if the node has the provided fail status flag
func (n *Node) HasStatus(flag string) bool {
for _, status := range n.FailStatus {
if status == flag {
return true
}
}
return false
}
// IsMasterWithNoSlot anonymous function for searching Master Node with no slot
var IsMasterWithNoSlot = func(n *Node) bool {
if (n.GetRole() == v1.RedisClusterNodeRoleMaster) && (n.TotalSlots() == 0) {
return true
}
return false
}
// IsMasterWithSlot anonymous function for searching Master Node withslot
var IsMasterWithSlot = func(n *Node) bool {
if (n.GetRole() == v1.RedisClusterNodeRoleMaster) && (n.TotalSlots() > 0) {
return true
}
return false
}
// IsSlave anonymous function for searching Slave Node
var IsSlave = func(n *Node) bool {
return n.GetRole() == v1.RedisClusterNodeRoleSlave
}
// SortNodes sort Nodes and return the sorted Nodes
func (n Nodes) SortNodes() Nodes {
sort.Sort(n)
return n
}
// GetNodeByID returns a Redis Node by its ID
// if not present in the Nodes slice return an error
func (n Nodes) GetNodeByID(id string) (*Node, error) {
for _, node := range n {
if node.ID == id {
return node, nil
}
}
return nil, nodeNotFoundedError
}
// CountByFunc gives the number elements of NodeSlice that return true for the passed func.
func (n Nodes) CountByFunc(fn func(*Node) bool) (result int) {
for _, v := range n {
if fn(v) {
result++
}
}
return
}
// FilterByFunc remove a node from a slice by node ID and returns the slice. If not found, fail silently. Value must be unique
func (n Nodes) FilterByFunc(fn func(*Node) bool) Nodes {
newSlice := Nodes{}
for _, node := range n {
if fn(node) {
newSlice = append(newSlice, node)
}
}
return newSlice
}
// SortByFunc returns a new ordered NodeSlice, determined by a func defining ‘less’.
func (n Nodes) SortByFunc(less func(*Node, *Node) bool) Nodes {
result := make(Nodes, len(n))
copy(result, n)
by(less).Sort(n)
return result
}
// Len is the number of elements in the collection.
func (n Nodes) Len() int {
return len(n)
}
// Less reports whether the element with
// index i should sort before the element with index j.
func (n Nodes) Less(i, j int) bool {
return n[i].ID < n[j].ID
}
// Swap swaps the elements with indexes i and j.
func (n Nodes) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// By is the type of a "less" function that defines the ordering of its Node arguments.
type by func(p1, p2 *Node) bool
// Sort is a method on the function type, By, that sorts the argument slice according to the function.
func (b by) Sort(nodes Nodes) {
ps := &nodeSorter{
nodes: nodes,
by: b, // The Sort method's receiver is the function (closure) that defines the sort order.
}
sort.Sort(ps)
}
// nodeSorter joins a By function and a slice of Nodes to be sorted.
type nodeSorter struct {
nodes Nodes
by func(p1, p2 *Node) bool // Closure used in the Less method.
}
// Len is part of sort.Interface.
func (s *nodeSorter) Len() int {
return len(s.nodes)
}
// Swap is part of sort.Interface.
func (s *nodeSorter) Swap(i, j int) {
s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i]
}
// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (s *nodeSorter) Less(i, j int) bool {
return s.by(s.nodes[i], s.nodes[j])
}
// LessByID compare 2 Nodes with there ID
func LessByID(n1, n2 *Node) bool {
return n1.ID < n2.ID
}
// MoreByID compare 2 Nodes with there ID
func MoreByID(n1, n2 *Node) bool {
return n1.ID > n2.ID
}