/
discovery.go
468 lines (408 loc) · 12.7 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
// Copyright 2018-2020 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*Package discovery provides a basic memberlist integration.*/
package discovery
import (
"context"
"encoding/binary"
"errors"
"fmt"
"net"
"plugin"
"sort"
"strconv"
"sync"
"time"
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/flog"
"github.com/buraksezer/olric/pkg/service_discovery"
"github.com/hashicorp/memberlist"
"github.com/vmihailenco/msgpack"
)
const eventChanCapacity = 256
// ErrHostNotFound indicates that the requested host could not be found in the member list.
var ErrHostNotFound = errors.New("host not found")
// ClusterEvent is a single event related to node activity in the memberlist.
// The Node member of this struct must not be directly modified.
type ClusterEvent struct {
Event memberlist.NodeEventType
NodeName string
NodeAddr net.IP
NodePort uint16
NodeMeta []byte // Metadata from the delegate for this node.
}
func (c *ClusterEvent) MemberAddr() string {
port := strconv.Itoa(int(c.NodePort))
return net.JoinHostPort(c.NodeAddr.String(), port)
}
// Discovery is a structure that encapsulates memberlist and
// provides useful functions to utilize it.
type Discovery struct {
log *flog.Logger
host *Member
memberlist *memberlist.Memberlist
config *config.Config
// To manage Join/Leave/Update events
clusterEventsMtx sync.RWMutex
ClusterEvents chan *ClusterEvent
// Try to reconnect dead members
deadMembers map[string]int64
deadMemberEvents chan *ClusterEvent
eventSubscribers []chan *ClusterEvent
serviceDiscovery service_discovery.ServiceDiscovery
// Flow control
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// Member represents a node in the cluster.
type Member struct {
Name string
ID uint64
Birthdate int64
}
func (m Member) String() string {
return m.Name
}
func (d *Discovery) DecodeNodeMeta(buf []byte) (Member, error) {
res := &Member{}
err := msgpack.Unmarshal(buf, res)
return *res, err
}
// New creates a new memberlist with a proper configuration and returns a new Discovery instance along with it.
func New(log *flog.Logger, c *config.Config) (*Discovery, error) {
// Calculate host's identity. It's useful to compare hosts.
birthdate := time.Now().UnixNano()
buf := make([]byte, 8+len(c.MemberlistConfig.Name))
binary.BigEndian.PutUint64(buf, uint64(birthdate))
buf = append(buf, []byte(c.MemberlistConfig.Name)...)
id := c.Hasher.Sum64(buf)
host := &Member{
Name: c.MemberlistConfig.Name,
ID: id,
Birthdate: birthdate,
}
ctx, cancel := context.WithCancel(context.Background())
d := &Discovery{
host: host,
config: c,
log: log,
deadMembers: make(map[string]int64),
ctx: ctx,
cancel: cancel,
}
if c.ServiceDiscovery != nil {
if err := d.loadServiceDiscoveryPlugin(); err != nil {
return nil, err
}
}
return d, nil
}
func (d *Discovery) loadServiceDiscoveryPlugin() error {
var sd service_discovery.ServiceDiscovery
if val, ok := d.config.ServiceDiscovery["plugin"]; ok {
if sd, ok = val.(service_discovery.ServiceDiscovery); !ok {
return fmt.Errorf("plugin type %T is not a ServiceDiscovery interface", val)
}
} else {
pluginPath, ok := d.config.ServiceDiscovery["path"]
if !ok {
return fmt.Errorf("plugin path could not be found")
}
plug, err := plugin.Open(pluginPath.(string))
if err != nil {
return fmt.Errorf("failed to open plugin: %w", err)
}
symDiscovery, err := plug.Lookup("ServiceDiscovery")
if err != nil {
return fmt.Errorf("failed to lookup serviceDiscovery symbol: %w", err)
}
if sd, ok = symDiscovery.(service_discovery.ServiceDiscovery); !ok {
return fmt.Errorf("unable to assert type to serviceDiscovery")
}
}
if err := sd.SetConfig(d.config.ServiceDiscovery); err != nil {
return err
}
sd.SetLogger(d.config.Logger)
if err := sd.Initialize(); err != nil {
return err
}
d.serviceDiscovery = sd
return nil
}
func (d *Discovery) dialDeadMember(member string) {
// Knock knock
// TODO: Make this parametric
conn, err := net.DialTimeout("tcp", member, 100*time.Millisecond)
if err != nil {
d.log.V(5).Printf("[ERROR] Failed to dial member: %s: %v", member, err)
return
}
err = conn.Close()
if err != nil {
d.log.V(5).Printf("[ERROR] Failed to close connection: %s: %v", member, err)
// network partitioning continues
return
}
// Everything seems fine. Try to re-join!
_, err = d.Rejoin([]string{member})
if err != nil {
d.log.V(5).Printf("[ERROR] Failed to re-join: %s: %v", member, err)
}
}
func (d *Discovery) deadMemberTracker() {
d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
case e := <-d.deadMemberEvents:
member := e.MemberAddr()
if e.Event == memberlist.NodeJoin {
delete(d.deadMembers, member)
} else if e.Event == memberlist.NodeLeave {
d.deadMembers[member] = time.Now().UnixNano()
} else {
d.log.V(2).Printf("[ERROR] Unknown memberlist event received for: %s: %v",
e.NodeName, e.Event)
}
case <-time.After(time.Second):
// TODO: make this parametric
// Try to reconnect a random dead member every second.
// The Go runtime selects a random item in the map
for member, timestamp := range d.deadMembers {
d.dialDeadMember(member)
// TODO: Make this parametric
if time.Now().Add(24*time.Hour).UnixNano() >= timestamp {
delete(d.deadMembers, member)
}
break
}
// Just try one item
}
}
}
func (d *Discovery) Start() error {
// ClusterEvents chan is consumed by the Olric package to maintain a consistent hash ring.
d.ClusterEvents = d.SubscribeNodeEvents()
d.deadMemberEvents = d.SubscribeNodeEvents()
// Initialize a new memberlist
dl, err := d.newDelegate()
if err != nil {
return err
}
eventsCh := make(chan memberlist.NodeEvent, eventChanCapacity)
d.config.MemberlistConfig.Delegate = dl
d.config.MemberlistConfig.Logger = d.config.Logger
d.config.MemberlistConfig.Events = &memberlist.ChannelEventDelegate{
Ch: eventsCh,
}
list, err := memberlist.Create(d.config.MemberlistConfig)
if err != nil {
return err
}
d.memberlist = list
if d.serviceDiscovery != nil {
if err := d.serviceDiscovery.Register(); err != nil {
return err
}
}
d.wg.Add(2)
go d.eventLoop(eventsCh)
go d.deadMemberTracker()
return nil
}
// Join is used to take an existing Memberlist and attempt to Join a cluster
// by contacting all the given hosts and performing a state sync. Initially,
// the Memberlist only contains our own state, so doing this will cause remote
// nodes to become aware of the existence of this node, effectively joining the cluster.
func (d *Discovery) Join() (int, error) {
if d.serviceDiscovery != nil {
peers, err := d.serviceDiscovery.DiscoverPeers()
if err != nil {
return 0, err
}
return d.memberlist.Join(peers)
}
return d.memberlist.Join(d.config.Peers)
}
func (d *Discovery) Rejoin(peers []string) (int, error) {
return d.memberlist.Join(peers)
}
// GetMembers returns a full list of known alive nodes.
func (d *Discovery) GetMembers() []Member {
var members []Member
nodes := d.memberlist.Members()
for _, node := range nodes {
member, _ := d.DecodeNodeMeta(node.Meta)
members = append(members, member)
}
// sort members by birthdate
sort.Slice(members, func(i int, j int) bool {
return members[i].Birthdate < members[j].Birthdate
})
return members
}
func (d *Discovery) NumMembers() int {
return d.memberlist.NumMembers()
}
// FindMemberByName finds and returns an alive member.
func (d *Discovery) FindMemberByName(name string) (Member, error) {
members := d.GetMembers()
for _, member := range members {
if member.Name == name {
return member, nil
}
}
return Member{}, ErrHostNotFound
}
// FindMemberByID finds and returns an alive member.
func (d *Discovery) FindMemberByID(id uint64) (Member, error) {
members := d.GetMembers()
for _, member := range members {
if member.ID == id {
return member, nil
}
}
return Member{}, ErrHostNotFound
}
// GetCoordinator returns the oldest node in the memberlist.
func (d *Discovery) GetCoordinator() Member {
members := d.GetMembers()
if len(members) == 0 {
d.log.V(1).Printf("[ERROR] There is no member in memberlist")
return Member{}
}
return members[0]
}
// IsCoordinator returns true if the caller is the coordinator node.
func (d *Discovery) IsCoordinator() bool {
return d.GetCoordinator().ID == d.host.ID
}
// LocalNode is used to return the local Node
func (d *Discovery) LocalNode() *memberlist.Node {
return d.memberlist.LocalNode()
}
// Shutdown will stop any background maintenance of network activity
// for this memberlist, causing it to appear "dead". A leave message
// will not be broadcasted prior, so the cluster being left will have
// to detect this node's Shutdown using probing. If you wish to more
// gracefully exit the cluster, call Leave prior to shutting down.
//
// This method is safe to call multiple times.
func (d *Discovery) Shutdown() error {
select {
case <-d.ctx.Done():
return nil
default:
}
d.cancel()
// We don't do that in a goroutine with a timeout mechanism
// because this mechanism may cause goroutine leak.
d.wg.Wait()
// Leave will broadcast a leave message but will not shutdown the background
// listeners, meaning the node will continue participating in gossip and state
// updates.
d.log.V(2).Printf("[INFO] Broadcasting a leave message")
if err := d.memberlist.Leave(15 * time.Second); err != nil {
d.log.V(3).Printf("[ERROR] memberlist.Leave returned an error: %v", err)
}
if d.serviceDiscovery != nil {
defer d.serviceDiscovery.Close()
if err := d.serviceDiscovery.Deregister(); err != nil {
d.log.V(3).Printf("[ERROR] ServiceDiscovery.Deregister returned an error: %v", err)
}
}
return d.memberlist.Shutdown()
}
func convertToClusterEvent(e memberlist.NodeEvent) *ClusterEvent {
return &ClusterEvent{
Event: e.Event,
NodeName: e.Node.Name,
NodeAddr: e.Node.Addr,
NodePort: e.Node.Port,
NodeMeta: e.Node.Meta,
}
}
func (d *Discovery) handleEvent(event memberlist.NodeEvent) {
d.clusterEventsMtx.RLock()
defer d.clusterEventsMtx.RUnlock()
for _, ch := range d.eventSubscribers {
if event.Node.Name == d.host.Name {
continue
}
// NodeJoin or NodeLeave
if event.Event != memberlist.NodeUpdate {
ch <- convertToClusterEvent(event)
continue
}
// NodeUpdate: Olric is an in-memory k/v store. If the node metadata has been updated,
// the node may be restarted or/and serves stale data.
e := convertToClusterEvent(event)
e.Event = memberlist.NodeLeave
ch <- e
// Create a Join event from copied event.
e = convertToClusterEvent(event)
e.Event = memberlist.NodeJoin
ch <- e
}
}
// eventLoop awaits for messages from memberlist and broadcasts them to event listeners.
func (d *Discovery) eventLoop(eventsCh chan memberlist.NodeEvent) {
defer d.wg.Done()
for {
select {
case e := <-eventsCh:
d.handleEvent(e)
case <-d.ctx.Done():
return
}
}
}
func (d *Discovery) SubscribeNodeEvents() chan *ClusterEvent {
d.clusterEventsMtx.Lock()
defer d.clusterEventsMtx.Unlock()
ch := make(chan *ClusterEvent, eventChanCapacity)
d.eventSubscribers = append(d.eventSubscribers, ch)
return ch
}
// delegate is a struct which implements memberlist.Delegate interface.
type delegate struct {
meta []byte
}
// newDelegate returns a new delegate instance.
func (d *Discovery) newDelegate() (delegate, error) {
data, err := msgpack.Marshal(d.host)
if err != nil {
return delegate{}, err
}
return delegate{
meta: data,
}, nil
}
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
func (d delegate) NodeMeta(limit int) []byte {
return d.meta
}
// NotifyMsg is called when a user-data message is received.
func (d delegate) NotifyMsg(data []byte) {}
// GetBroadcasts is called when user data messages can be broadcast.
func (d delegate) GetBroadcasts(overhead, limit int) [][]byte { return nil }
// LocalState is used for a TCP Push/Pull.
func (d delegate) LocalState(join bool) []byte { return nil }
// MergeRemoteState is invoked after a TCP Push/Pull.
func (d delegate) MergeRemoteState(buf []byte, join bool) {}