-
Notifications
You must be signed in to change notification settings - Fork 6
/
member.go
154 lines (131 loc) · 3.66 KB
/
member.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// SPDX-FileCopyrightText: Copyright (c) 2023-2024, CIQ, Inc. All rights reserved
// SPDX-License-Identifier: Apache-2.0
package gossip
import (
"fmt"
"io"
"time"
"github.com/hashicorp/memberlist"
)
// Member represents a member part of a gossip cluster.
type Member struct {
ml *memberlist.Memberlist
eventChan chan MemberEvent
nd *nodeDelegate
}
const (
// DefaultLeaveTimeout is the time to wait during while leaving cluster
DefaultLeaveTimeout = 5 * time.Second
// DefaultReadyTimeout is the time to wait during ready status update
DefaultReadyTimeout = 2 * time.Second
)
// NewMember creates and/or participates to a gossip cluster.
func NewMember(name string, peers []string, memberOpt ...MemberOption) (*Member, error) {
cfg := memberlist.DefaultLANConfig()
cfg.BindPort = 0
cfg.Name = name
eventChan := make(chan MemberEvent, 128)
nd := &nodeDelegate{
eventChan: eventChan,
}
cfg.Delegate = nd
cfg.Events = nd
cfg.LogOutput = io.Discard
cfg.Keyring, _ = memberlist.NewKeyring(nil, nil)
for _, opt := range memberOpt {
if err := opt(cfg); err != nil {
return nil, err
}
}
// create memberlist network
ml, err := memberlist.Create(cfg)
if err != nil {
return nil, err
}
member := &Member{
ml: ml,
eventChan: eventChan,
nd: nd,
}
if len(peers) > 0 {
peerJoined, err := member.join(peers)
if err != nil {
return nil, err
} else if peerJoined == 0 {
return nil, fmt.Errorf("no peer has been joined")
}
}
return member, nil
}
// Watch returns an event channel to react to various notifications
// coming from gossip protocol (join, leave, update ...).
func (member *Member) Watch() <-chan MemberEvent {
return member.eventChan
}
// Join joins a peer in the cluster.
func (member *Member) join(peers []string) (int, error) {
if len(peers) == 0 {
return 0, fmt.Errorf("at least one master peer address is required to join cluster")
}
count, err := member.ml.Join(peers)
if err != nil {
_ = member.ml.Shutdown()
return 0, err
}
return count, err
}
// Shutdown leaves the cluster.
func (member *Member) Shutdown() error {
if member == nil {
return nil
} else if member.ml == nil {
return fmt.Errorf("no cluster joined")
}
if member.ml.NumMembers() > 0 {
if err := member.ml.Leave(DefaultLeaveTimeout); err != nil {
return err
}
return member.ml.Shutdown()
}
return nil
}
// Nodes returns all nodes participating to the cluster.
func (member *Member) Nodes() []*memberlist.Node {
return member.ml.Members()
}
// LocalNode returns the current node information.
func (member *Member) LocalNode() *memberlist.Node {
return member.ml.LocalNode()
}
// Send senda message to a particular node.
func (member *Member) Send(node *memberlist.Node, msg []byte) error {
return member.ml.SendReliable(node, msg)
}
// MarkAsReady updates node metadata ready status and advertise nodes about change.
func (member *Member) MarkAsReady(timeout time.Duration) error {
meta := NewBeskarMeta()
if err := meta.Decode(member.nd.meta); err != nil {
return fmt.Errorf("while decoding metadata")
}
meta.Ready = true
mb, err := meta.Encode()
if err != nil {
return err
}
member.nd.meta = mb
return member.ml.UpdateNode(timeout)
}
// PeerState returns the state of the peer used to join the cluster if any.
func (member *Member) RemoteState() ([]byte, error) {
if member.nd.remoteState != nil {
return member.nd.remoteState, nil
}
return nil, fmt.Errorf("no remote state received")
}
// LocalState returns the state of the node if any.
func (member *Member) LocalState() ([]byte, error) {
if member.nd.localState != nil {
return member.nd.localState, nil
}
return nil, fmt.Errorf("no local state set")
}