forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
event_endpoint.go
204 lines (178 loc) · 4.76 KB
/
event_endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package agent
import (
"bytes"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/hashicorp/consul/agent/consul/structs"
)
const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
)
// EventFire is used to fire a new event
func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Mandate a PUT request
if req.Method != "PUT" {
resp.WriteHeader(405)
return nil, nil
}
// Get the datacenter
var dc string
s.parseDC(req, &dc)
event := &UserEvent{}
event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/")
if event.Name == "" {
resp.WriteHeader(400)
fmt.Fprint(resp, "Missing name")
return nil, nil
}
// Get the ACL token
var token string
s.parseToken(req, &token)
// Get the filters
if filt := req.URL.Query().Get("node"); filt != "" {
event.NodeFilter = filt
}
if filt := req.URL.Query().Get("service"); filt != "" {
event.ServiceFilter = filt
}
if filt := req.URL.Query().Get("tag"); filt != "" {
event.TagFilter = filt
}
// Get the payload
if req.ContentLength > 0 {
var buf bytes.Buffer
if _, err := io.Copy(&buf, req.Body); err != nil {
return nil, err
}
event.Payload = buf.Bytes()
}
// Try to fire the event
if err := s.agent.UserEvent(dc, token, event); err != nil {
if strings.Contains(err.Error(), permissionDenied) {
resp.WriteHeader(403)
fmt.Fprint(resp, permissionDenied)
return nil, nil
}
resp.WriteHeader(500)
return nil, err
}
// Return the event
return event, nil
}
// EventList is used to retrieve the recent list of events
func (s *HTTPServer) EventList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse the query options, since we simulate a blocking query
var b structs.QueryOptions
if parseWait(resp, req, &b) {
return nil, nil
}
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
acl, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
// Look for a name filter
var nameFilter string
if filt := req.URL.Query().Get("name"); filt != "" {
nameFilter = filt
}
// Lots of this logic is borrowed from consul/rpc.go:blockingQuery
// However we cannot use that directly since this code has some
// slight semantics differences...
var timeout <-chan time.Time
var notifyCh chan struct{}
// Fast path non-blocking
if b.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Restrict the max query time
if b.MaxQueryTime > maxQueryTime {
b.MaxQueryTime = maxQueryTime
}
// Ensure a time limit is set if we have an index
if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 {
b.MaxQueryTime = maxQueryTime
}
// Setup a query timeout
if b.MaxQueryTime > 0 {
timeout = time.After(b.MaxQueryTime)
}
// Setup a notification channel for changes
SETUP_NOTIFY:
if b.MinQueryIndex > 0 {
notifyCh = make(chan struct{}, 1)
s.agent.eventNotify.Wait(notifyCh)
}
RUN_QUERY:
// Get the recent events
events := s.agent.UserEvents()
// Filter the events using the ACL, if present
if acl != nil {
for i := 0; i < len(events); i++ {
name := events[i].Name
if acl.EventRead(name) {
continue
}
s.agent.logger.Printf("[DEBUG] agent: dropping event %q from result due to ACLs", name)
events = append(events[:i], events[i+1:]...)
i--
}
}
// Filter the events if requested
if nameFilter != "" {
for i := 0; i < len(events); i++ {
if events[i].Name != nameFilter {
events = append(events[:i], events[i+1:]...)
i--
}
}
}
// Determine the index
var index uint64
if len(events) == 0 {
// Return a non-zero index to prevent a hot query loop. This
// can be caused by a watch for example when there is no matching
// events.
index = 1
} else {
last := events[len(events)-1]
index = uuidToUint64(last.ID)
}
setIndex(resp, index)
// Check for exact match on the query value. Because
// the index value is not monotonic, we just ensure it is
// not an exact match.
if index > 0 && index == b.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY
case <-timeout:
}
}
return events, nil
}
// uuidToUint64 is a bit of a hack to generate a 64bit Consul index.
// In effect, we take our random UUID, convert it to a 128 bit number,
// then XOR the high-order and low-order 64bit's together to get the
// output. This lets us generate an index which can be used to simulate
// the blocking behavior of other catalog endpoints.
func uuidToUint64(uuid string) uint64 {
lower := uuid[0:8] + uuid[9:13] + uuid[14:18]
upper := uuid[19:23] + uuid[24:36]
lowVal, err := strconv.ParseUint(lower, 16, 64)
if err != nil {
panic("Failed to convert " + lower)
}
highVal, err := strconv.ParseUint(upper, 16, 64)
if err != nil {
panic("Failed to convert " + upper)
}
return lowVal ^ highVal
}