/
membership.go
161 lines (130 loc) · 3.88 KB
/
membership.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package discovery
import (
"net"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"go.uber.org/zap"
)
// Configuration of a single node in a Surf cluster.
type Config struct {
// NodeName acts as the node's unique identifier across the Serf cluster.
NodeName string
// Serf listens on this address for gossiping. [Ref: gossip protocol serf]
BindAddr string
// Used for sharing data to other nodes in the cluster. This information is
// used by the cluster to decide how to handle this node.
Tags map[string]string
// Used for joining a new node to the cluster. Joining a new node requires
// pointing to atleast one in-cluster node. In a prod env., it's advisable
// to specify at least 3 addrs to increase cluster resliency.
StartJoinAddrs []string
}
// Handler for cluster membership modification operations.
type Handler interface {
Join(name, addr string) error
Leave(name string) error
}
// Represents membership of a node to a cluster. It wraps a Serf instance, providing
// a handle to cluster membership operations.
type Membership struct {
Config
handler Handler
serf *serf.Serf
events chan serf.Event
logger *zap.Logger
}
// Returns true if the given Serf member refers to the local member (i.e. the
// invoking node.) by having the same name.
func (m *Membership) isLocal(member serf.Member) bool {
return m.serf.LocalMember().Name == member.Name
}
// Returns a point-in-time snapshot of the cluster's Serf members.
func (m *Membership) Members() []serf.Member { return m.serf.Members() }
// Telles this member to leave the Serf cluster.
func (m *Membership) Leave() error { return m.serf.Leave() }
func (m *Membership) logError(err error, msg string, member serf.Member) {
log := m.logger.Error
if err == raft.ErrNotLeader {
// only the leader can remove member from the Raft cluster
// so this error is to be expected from the membership
// event handlers of the non-leaders
log = m.logger.Debug
}
log(
msg,
zap.Error(err),
zap.String("name", member.Name),
zap.String("rpc_addr", member.Tags["rpc_addr"]),
)
}
func (m *Membership) handleJoin(member serf.Member) {
if err := m.handler.Join(member.Name, member.Tags["rpc_addr"]); err != nil {
m.logError(err, "failed to join", member)
}
}
func (m *Membership) handleLeave(member serf.Member) {
if err := m.handler.Leave(member.Name); err != nil {
m.logError(err, "failed to leave", member)
}
}
func (m *Membership) eventHandler() {
for event := range m.events {
eventMembers := event.(serf.MemberEvent).Members
switch event.EventType() {
case serf.EventMemberJoin:
for _, member := range eventMembers {
if m.isLocal(member) {
continue
}
m.handleJoin(member)
}
case
serf.EventMemberLeave,
serf.EventMemberFailed:
for _, member := range eventMembers {
if m.isLocal(member) {
return
}
m.handleLeave(member)
}
}
}
}
func (m *Membership) setupSerf() (err error) {
addr, err := net.ResolveTCPAddr("tcp", m.BindAddr)
if err != nil {
return err
}
config := serf.DefaultConfig()
config.Init()
config.MemberlistConfig.BindAddr = addr.IP.String()
config.MemberlistConfig.BindPort = addr.Port
m.events = make(chan serf.Event)
config.EventCh = m.events
config.Tags = m.Tags
config.NodeName = m.Config.NodeName
m.serf, err = serf.Create(config)
if err != nil {
return err
}
go m.eventHandler()
if m.StartJoinAddrs != nil {
if _, err := m.serf.Join(m.StartJoinAddrs, true); err != nil {
return err
}
}
return nil
}
// Creates a new Serf cluster member with the given config and cluster handler.
// It internally setups up the Serf configuration and event handlers.
func New(handler Handler, config Config) (*Membership, error) {
membership := &Membership{
Config: config,
handler: handler,
logger: zap.L().Named("Membership"),
}
if err := membership.setupSerf(); err != nil {
return nil, err
}
return membership, nil
}