/
event_broker.go
415 lines (348 loc) · 11.2 KB
/
event_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
package stream
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/go-hclog"
)
const (
ACLCheckNodeRead = "node-read"
ACLCheckManagement = "management"
aclCacheSize = 32
)
type EventBrokerCfg struct {
EventBufferSize int64
Logger hclog.Logger
}
type EventBroker struct {
// mu protects subscriptions
mu sync.Mutex
subscriptions *subscriptions
// eventBuf stores a configurable amount of events in memory
eventBuf *eventBuffer
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan *structs.Events
aclDelegate ACLDelegate
aclCache *lru.TwoQueueCache
aclCh chan *structs.Event
logger hclog.Logger
}
// NewEventBroker returns an EventBroker for publishing change events.
// A goroutine is run in the background to publish events to an event buffer.
// Cancelling the context will shutdown the goroutine to free resources, and stop
// all publishing.
func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBrokerCfg) (*EventBroker, error) {
if cfg.Logger == nil {
cfg.Logger = hclog.NewNullLogger()
}
// Set the event buffer size to a minimum
if cfg.EventBufferSize == 0 {
cfg.EventBufferSize = 100
}
aclCache, err := lru.New2Q(aclCacheSize)
if err != nil {
return nil, err
}
buffer := newEventBuffer(cfg.EventBufferSize)
e := &EventBroker{
logger: cfg.Logger.Named("event_broker"),
eventBuf: buffer,
publishCh: make(chan *structs.Events, 64),
aclCh: make(chan *structs.Event, 10),
aclDelegate: aclDelegate,
aclCache: aclCache,
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
}
go e.handleUpdates(ctx)
go e.handleACLUpdates(ctx)
return e, nil
}
// Len returns the current length of the event buffer.
func (e *EventBroker) Len() int {
return e.eventBuf.Len()
}
// Publish events to all subscribers of the event Topic.
func (e *EventBroker) Publish(events *structs.Events) {
if len(events.Events) == 0 {
return
}
// Notify the broker to check running subscriptions against potentially
// updated ACL Token or Policy
for _, event := range events.Events {
if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy {
e.aclCh <- &event
}
}
e.publishCh <- events
}
// SubscribeWithACLCheck validates the SubscribeRequest's token and requested Topics
// to ensure that the tokens privileges are sufficient enough.
func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, error) {
aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
if err != nil {
return nil, structs.ErrPermissionDenied
}
if allowed := aclAllowsSubscription(aclObj, req); !allowed {
return nil, structs.ErrPermissionDenied
}
return e.Subscribe(req)
}
// Subscribe returns a new Subscription for a given request. A Subscription
// will receive an initial empty currentItem value which points to the first item
// in the buffer. This allows the new subscription to call Next() without first checking
// for the current Item.
//
// A Subscription will start at the requested index, or as close as possible to
// the requested index if it is no longer in the buffer. If StartExactlyAtIndex is
// set and the index is no longer in the buffer or not yet in the buffer an error
// will be returned.
//
// When a caller is finished with the subscription it must call Subscription.Unsubscribe
// to free ACL tracking resources.
func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) {
e.mu.Lock()
defer e.mu.Unlock()
var head *bufferItem
var offset int
if req.Index != 0 {
head, offset = e.eventBuf.StartAtClosest(req.Index)
} else {
head = e.eventBuf.Head()
}
if offset > 0 && req.StartExactlyAtIndex {
return nil, fmt.Errorf("requested index not in buffer")
} else if offset > 0 {
metrics.SetGauge([]string{"nomad", "event_broker", "subscription", "request_offset"}, float32(offset))
e.logger.Debug("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index))
}
// Empty head so that calling Next on sub
start := newBufferItem(&structs.Events{Index: req.Index})
start.link.next.Store(head)
close(start.link.nextCh)
sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req))
e.subscriptions.add(req, sub)
return sub, nil
}
// CloseAll closes all subscriptions
func (e *EventBroker) CloseAll() {
e.subscriptions.closeAll()
}
func (e *EventBroker) handleUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
e.subscriptions.closeAll()
return
case update := <-e.publishCh:
e.eventBuf.Append(update)
}
}
}
func (e *EventBroker) handleACLUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case update := <-e.aclCh:
switch payload := update.Payload.(type) {
case *structs.ACLTokenEvent:
tokenSecretID := payload.SecretID()
// Token was deleted
if update.Type == structs.TypeACLTokenDeleted {
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
// If broker cannot fetch state there is nothing more to do
if e.aclDelegate == nil {
continue
}
aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
if err != nil || aclObj == nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req)
})
case *structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
}
}
}
}
// checkSubscriptionsAgainstPolicyChange iterates over the brokers
// subscriptions and evaluates whether the token used for the subscription is
// still valid. If it is not valid it closes the subscriptions belonging to the
// token.
//
// A lock must be held to iterate over the map of subscriptions.
func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
e.mu.Lock()
defer e.mu.Unlock()
// If broker cannot fetch state there is nothing more to do
if e.aclDelegate == nil {
return
}
aclSnapshot := e.aclDelegate.TokenProvider()
for tokenSecretID := range e.subscriptions.byToken {
// if tokenSecretID is empty ACLs were disabled at time of subscribing
if tokenSecretID == "" {
continue
}
aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
if err != nil || aclObj == nil {
e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req)
})
}
}
func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
if err != nil {
return nil, err
}
if aclToken == nil {
return nil, errors.New("no token for secret ID")
}
// Check if this is a management token
if aclToken.Type == structs.ACLManagementToken {
return acl.ManagementACL, nil
}
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies))
for _, policyName := range aclToken.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
if err != nil || policy == nil {
return nil, errors.New("error finding acl policy")
}
aclPolicies = append(aclPolicies, policy)
}
return structs.CompileACLObject(aclCache, aclPolicies)
}
type ACLTokenProvider interface {
ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
}
type ACLDelegate interface {
TokenProvider() ACLTokenProvider
}
func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
for topic := range subReq.Topics {
switch topic {
case structs.TopicDeployment,
structs.TopicEvaluation,
structs.TopicAllocation,
structs.TopicJob,
structs.TopicService:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
case structs.TopicNode:
if ok := aclObj.AllowNodeRead(); !ok {
return false
}
default:
if ok := aclObj.IsManagement(); !ok {
return false
}
}
}
return true
}
func (s *Subscription) forceClose() {
if atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) {
close(s.forceClosed)
}
}
type subscriptions struct {
// mu for byToken. If both subscription.mu and EventBroker.mu need
// to be held, EventBroker mutex MUST always be acquired first.
mu sync.RWMutex
// byToken is an mapping of active Subscriptions indexed by a token and
// a pointer to the request.
// When the token is modified all subscriptions under that token will be
// reloaded.
// A subscription may be unsubscribed by using the pointer to the request.
byToken map[string]map[*SubscribeRequest]*Subscription
}
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.mu.Lock()
defer s.mu.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
subsByToken = make(map[*SubscribeRequest]*Subscription)
s.byToken[req.Token] = subsByToken
}
subsByToken[req] = sub
}
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs {
sub.forceClose()
}
}
}
}
func (s *subscriptions) closeSubscriptionFunc(tokenSecretID string, fn func(*Subscription) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, sub := range s.byToken[tokenSecretID] {
if fn(sub) {
sub.forceClose()
}
}
}
// unsubscribeFn returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep
// track of the SubscriptionRequest, and can not accidentally call unsubscribeFn with the
// wrong pointer.
func (s *subscriptions) unsubscribeFn(req *SubscribeRequest) func() {
return func() {
s.mu.Lock()
defer s.mu.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
sub := subsByToken[req]
if sub == nil {
return
}
// close the subscription
sub.forceClose()
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
}
}
func (s *subscriptions) closeAll() {
s.mu.Lock()
defer s.mu.Unlock()
for _, byRequest := range s.byToken {
for _, sub := range byRequest {
sub.forceClose()
}
}
}