forked from hashicorp/serf
/
internal_query.go
116 lines (100 loc) · 3.06 KB
/
internal_query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package serf
import (
"log"
"strings"
)
const (
// This is the prefix we use for queries that are internal to Serf.
// They are handled internally, and not forwarded to a client.
InternalQueryPrefix = "_serf_"
// pingQuery is run to check for reachability
pingQuery = "ping"
// conflictQuery is run to resolve a name conflict
conflictQuery = "conflict"
)
// internalQueryName is used to generate a query name for an internal query
func internalQueryName(name string) string {
return InternalQueryPrefix + name
}
// serfQueries is used to listen for queries that start with
// _serf and respond to them as appropriate.
type serfQueries struct {
inCh chan Event
logger *log.Logger
outCh chan<- Event
serf *Serf
shutdownCh <-chan struct{}
}
// newSerfQueries is used to create a new serfQueries. We return an event
// channel that is ingested and forwarded to an outCh. Any Queries that
// have the InternalQueryPrefix are handled instead of forwarded.
func newSerfQueries(serf *Serf, logger *log.Logger, outCh chan<- Event, shutdownCh <-chan struct{}) (chan<- Event, error) {
inCh := make(chan Event, 1024)
q := &serfQueries{
inCh: inCh,
logger: logger,
outCh: outCh,
serf: serf,
shutdownCh: shutdownCh,
}
go q.stream()
return inCh, nil
}
// stream is a long running routine to ingest the event stream
func (s *serfQueries) stream() {
for {
select {
case e := <-s.inCh:
// Check if this is a query we should process
if q, ok := e.(*Query); ok && strings.HasPrefix(q.Name, InternalQueryPrefix) {
go s.handleQuery(q)
} else if s.outCh != nil {
s.outCh <- e
}
case <-s.shutdownCh:
return
}
}
}
// handleQuery is invoked when we get an internal query
func (s *serfQueries) handleQuery(q *Query) {
// Get the queryName after the initial prefix
queryName := q.Name[len(InternalQueryPrefix):]
switch queryName {
case pingQuery:
// Nothing to do, we will ack the query
case conflictQuery:
s.handleConflict(q)
default:
s.logger.Printf("[WARN] serf: Unhandled internal query '%s'", queryName)
}
}
// handleConflict is invoked when we get a query that is attempting to
// disambiguate a name conflict. They payload is a node name, and the response
// should the address we believe that node is at, if any.
func (s *serfQueries) handleConflict(q *Query) {
// The target node name is the payload
node := string(q.Payload)
// Do not respond to the query if it is about us
if node == s.serf.config.NodeName {
return
}
s.logger.Printf("[DEBUG] serf: Got conflict resolution query for '%s'", node)
// Look for the member info
var out *Member
s.serf.memberLock.Lock()
if member, ok := s.serf.members[node]; ok {
out = &member.Member
}
s.serf.memberLock.Unlock()
// Encode the response
buf, err := encodeMessage(messageConflictResponseType, out)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode conflict query response: %v", err)
return
}
// Send our answer
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to conflict query: %v", err)
}
}