forked from hashicorp/consul
/
client_serf.go
140 lines (125 loc) · 3.86 KB
/
client_serf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package consul
import (
"fmt"
"path/filepath"
"strings"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/serf"
)
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["segment"] = c.config.Segment
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
if c.logger == nil {
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
}
conf.MemberlistConfig.Logger = c.logger
conf.Logger = c.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
}
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}