-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
serf_adapter.go
110 lines (94 loc) · 2.76 KB
/
serf_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package router
import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
)
// routerFn selects one of the router operations to map to incoming Serf events.
type routerFn func(types.AreaID, *metadata.Server) error
// handleMemberEvents attempts to apply the given Serf member event to the given
// router function.
func handleMemberEvent(logger hclog.Logger, fn routerFn, areaID types.AreaID, e serf.Event) {
me, ok := e.(serf.MemberEvent)
if !ok {
logger.Error("Bad event type", "event", e)
return
}
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
logger.Warn("Non-server in server-only area",
"non_server", m.Name,
"area", areaID,
)
continue
}
if err := fn(areaID, parts); err != nil {
logger.Error("Failed to process event for server in area",
"event", me.Type.String(),
"server", m.Name,
"area", areaID,
"error", err,
)
continue
}
logger.Info("Handled event for server in area",
"event", me.Type.String(),
"server", m.Name,
"area", areaID,
)
}
}
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
// a Serf manager's channel into the given router. This will return when the
// shutdown channel is closed.
//
// If membershipNotifyCh is non-nil, it must be a buffered channel of size one
// with one consumer. That consumer will be notified when
// Join/Leave/Failed/Update occur on this serf pool.
func HandleSerfEvents(
logger hclog.Logger,
router *Router,
areaID types.AreaID,
shutdownCh <-chan struct{},
eventCh <-chan serf.Event,
membershipNotifyCh chan<- struct{},
) {
for {
select {
case <-shutdownCh:
return
case e := <-eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberLeave, serf.EventMemberReap:
handleMemberEvent(logger, router.RemoveServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberUpdate:
handleMemberEvent(logger, router.AddServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
// All of these event types are ignored.
case serf.EventUser:
case serf.EventQuery:
default:
logger.Warn("Unhandled Serf Event", "event", e)
}
}
}
}
func notifyMembershipPossibleChange(membershipNotifyCh chan<- struct{}) {
if membershipNotifyCh == nil {
return
}
// Notify if not already notified.
select {
case membershipNotifyCh <- struct{}{}:
default:
}
}