-
Notifications
You must be signed in to change notification settings - Fork 24
/
tracker.go
483 lines (435 loc) · 12.6 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
package notifiers
import (
"bytes"
"context"
"fmt"
"strings"
"github.com/fluxninja/aperture/v2/pkg/log"
panichandler "github.com/fluxninja/aperture/v2/pkg/panic-handler"
)
//go:generate mockgen -source tracker.go -package mocks -aux_files github.com/fluxninja/aperture/v2/pkg/notifiers=./watcher.go -destination ../mocks/mock-trackers.go
// Per key trackers.
type keyTracker struct {
key Key
value []byte
notifiers keyNotifiers
validKey bool
}
func newTracker(key Key) *keyTracker {
tracker := &keyTracker{
key: key,
notifiers: make(keyNotifiers, 0),
value: nil,
}
return tracker
}
func (tracker *keyTracker) notify(eventType EventType, value []byte) {
if !bytes.Equal(value, tracker.value) {
// send last value if the event is the remove event and value is nil
if eventType == Remove && value == nil {
value = tracker.value
} else {
tracker.value = value
}
tracker.notifiers.notify(Event{
Type: eventType,
Key: tracker.key,
Value: value,
})
}
switch eventType {
case Write:
tracker.validKey = true
case Remove:
tracker.validKey = false
tracker.value = nil
}
}
func (tracker *keyTracker) addKeyNotifier(notifier KeyNotifier) {
// check existing notifier
for _, n := range tracker.notifiers {
if n.getID() == notifier.getID() {
// already exists
return
}
}
tracker.notifiers = append(tracker.notifiers, notifier)
if tracker.isValidKey() {
transformNotify(notifier, Event{
Type: Write,
Key: tracker.key,
Value: tracker.value,
})
}
}
func (tracker *keyTracker) removeKeyNotifier(id string) {
for i, n := range tracker.notifiers {
if n.getID() == id {
transformNotify(n, Event{
Type: Remove,
Key: tracker.key,
Value: nil,
})
// remove the key notifier
tracker.notifiers[i] = tracker.notifiers[len(tracker.notifiers)-1]
tracker.notifiers = tracker.notifiers[:len(tracker.notifiers)-1]
return
}
}
}
func (tracker *keyTracker) isValidKey() bool {
return tracker.validKey
}
func (tracker *keyTracker) getKeyNotifiers() keyNotifiers {
return tracker.notifiers
}
func (tracker *keyTracker) String() string {
notifiers := []string{}
for _, n := range tracker.notifiers {
notifiers = append(notifiers, n.GetKey().String())
}
tStr := fmt.Sprintf("key: %s, value: %s, notifiers: %+v", tracker.key, tracker.value, notifiers)
return tStr
}
////////////////////////////////////////////////////////////////////////////////
// Trackers
////////////////////////////////////////////////////////////////////////////////
const (
add = iota
remove
update
purge
stop
)
type notifierOp struct {
keyNotifier KeyNotifier
prefixNotifier PrefixNotifier
updateKey Key
updateFunc UpdateValueFunc
purgePrefix string
op int
}
// UpdateValueFunc is a function that can be used to update the value of an existing tracker entry.
type UpdateValueFunc func(oldValue []byte) (EventType, []byte)
// EventWriter can be used to inject events into a tracker collection.
type EventWriter interface {
WriteEvent(key Key, value []byte)
RemoveEvent(key Key)
Purge(prefix string)
UpdateValue(key Key, updateFunc UpdateValueFunc)
}
// Trackers is the interface of a tracker collection.
type Trackers interface {
Watcher
EventWriter
}
// DefaultTrackers is a collection of key trackers.
type DefaultTrackers struct {
waitGroup panichandler.WaitGroup
ctx context.Context
trackers map[Key]*keyTracker
notifiersChannel chan notifierOp
eventsChannel chan Event
cancel context.CancelFunc
prefixNotifiers prefixNotifiers
}
// Make sure Trackers implements Watcher interface.
var _ Watcher = &DefaultTrackers{}
// Make sure Trackers implements Trackers interface.
var _ Trackers = &DefaultTrackers{}
// NewDefaultTrackers creates a new instance of Trackers.
func NewDefaultTrackers() *DefaultTrackers {
t := &DefaultTrackers{
trackers: make(map[Key]*keyTracker),
notifiersChannel: make(chan notifierOp),
eventsChannel: make(chan Event),
prefixNotifiers: make(prefixNotifiers, 0),
}
t.ctx, t.cancel = context.WithCancel(context.Background())
return t
}
func (t *DefaultTrackers) getTracker(key Key) (*keyTracker, bool) {
tracker, ok := t.trackers[key]
if !ok {
tracker = newTracker(key)
t.trackers[key] = tracker
}
return tracker, ok
}
func (t *DefaultTrackers) getKeys() []Key {
keys := make([]Key, 0)
for key := range t.trackers {
keys = append(keys, key)
}
return keys
}
// WriteEvent sends a Write event with the given key and value to the underlying event channel.
func (t *DefaultTrackers) WriteEvent(key Key, value []byte) {
t.eventsChannel <- Event{
Type: Write,
Key: key,
Value: value,
}
}
// RemoveEvent sends a Remove event with the given key and value to the underlying event channel.
func (t *DefaultTrackers) RemoveEvent(key Key) {
t.eventsChannel <- Event{
Type: Remove,
Key: key,
Value: nil,
}
}
// AddKeyNotifier is a convenience function to add a key notifier to the underlying trackers.
// If the key of the given notifier is already tracked, the notifier will be added to the existing tracker.
func (t *DefaultTrackers) AddKeyNotifier(notifier KeyNotifier) error {
op := notifierOp{
op: add,
keyNotifier: notifier,
prefixNotifier: nil,
}
t.notifiersChannel <- op
return nil
}
func (t *DefaultTrackers) addKeyNotifier(n KeyNotifier) {
key := n.GetKey()
tracker, _ := t.getTracker(key)
tracker.addKeyNotifier(n)
}
// RemoveKeyNotifier is a convenience function to remove a key notifier from the underlying trackers.
// If the key of the given notifier is not tracked, the notifier will be ignored.
func (t *DefaultTrackers) RemoveKeyNotifier(notifier KeyNotifier) error {
op := notifierOp{
op: remove,
keyNotifier: notifier,
prefixNotifier: nil,
}
t.notifiersChannel <- op
return nil
}
func (t *DefaultTrackers) removeKeyNotifier(key Key, id string) {
tracker, _ := t.getTracker(key)
tracker.removeKeyNotifier(id)
// if tracker has no notifiers, remove it
if len(tracker.getKeyNotifiers()) == 0 && !tracker.isValidKey() {
delete(t.trackers, key)
}
}
// AddPrefixNotifier is a convenience function to add a prefix notifier to the underlying trackers.
// Internally, a key notifier is added for each key under the given prefix.
// If the prefix of the given notifier is already tracked, the notifier will be added to the existing tracker.
func (t *DefaultTrackers) AddPrefixNotifier(notifier PrefixNotifier) error {
op := notifierOp{
op: add,
prefixNotifier: notifier,
keyNotifier: nil,
}
t.notifiersChannel <- op
return nil
}
func (t *DefaultTrackers) addPrefixNotifier(notifier PrefixNotifier) {
t.prefixNotifiers = append(t.prefixNotifiers, notifier)
// add to existing trackers
for _, key := range t.getKeys() {
if strings.HasPrefix(key.String(), notifier.GetPrefix()) {
kn, err := notifier.GetKeyNotifier(key)
if err != nil {
continue
}
kn.inherit(key, notifier)
t.addKeyNotifier(kn)
}
}
}
// RemovePrefixNotifier is a convenience function to remove a prefix notifier from the underlying trackers.
// Internally, a key notifier is removed for each key under the given prefix.
// If the prefix of the given notifier is not tracked, the notifier will be ignored.
func (t *DefaultTrackers) RemovePrefixNotifier(notifier PrefixNotifier) error {
op := notifierOp{
op: remove,
prefixNotifier: notifier,
keyNotifier: nil,
}
t.notifiersChannel <- op
return nil
}
func (t *DefaultTrackers) removePrefixNotifier(notifier PrefixNotifier) {
id := notifier.getID()
for i, notifier := range t.prefixNotifiers {
if notifier.getID() == id {
t.prefixNotifiers = append(t.prefixNotifiers[:i], t.prefixNotifiers[i+1:]...)
break
}
}
// remove from trackers by iterating over all tracker keys
for _, key := range t.getKeys() {
if strings.HasPrefix(key.String(), notifier.GetPrefix()) {
t.removeKeyNotifier(key, id)
}
}
}
// Purge is a convenience function to purge all trackers.
// This will remove all key notifiers and prefix notifiers.
func (t *DefaultTrackers) Purge(prefix string) {
t.notifiersChannel <- notifierOp{
op: purge,
purgePrefix: prefix,
}
}
func (t *DefaultTrackers) purge(prefix string) {
for key, tracker := range t.trackers {
// if key is not a prefix of the purge prefix, skip it
if !strings.HasPrefix(key.String(), prefix) {
continue
}
// remove all prefix notifiers
for _, pn := range t.prefixNotifiers {
t.removeKeyNotifier(key, pn.getID())
}
tracker.notify(Remove, nil)
}
}
// UpdateValue returns the current value tracked by a key.
func (t *DefaultTrackers) UpdateValue(key Key, updateFunc UpdateValueFunc) {
t.notifiersChannel <- notifierOp{
op: update,
updateKey: key,
updateFunc: updateFunc,
}
}
func (t *DefaultTrackers) updateValue(key Key, updateFunc UpdateValueFunc) {
tracker, _ := t.getTracker(key)
eventType, newValue := updateFunc(tracker.value)
event := Event{
Type: eventType,
Key: key,
Value: newValue,
}
switch eventType {
case Write:
t.writeEvent(tracker, event)
case Remove:
t.removeEvent(tracker, event)
}
}
func (t *DefaultTrackers) writeEvent(tracker *keyTracker, event Event) {
valid := tracker.isValidKey()
tracker.notify(Write, event.Value)
// if the key was not valid earlier, then this is a create event
if !valid {
for _, pn := range t.prefixNotifiers {
if strings.HasPrefix(event.Key.String(), pn.GetPrefix()) {
n, err := pn.GetKeyNotifier(event.Key)
if err != nil {
continue
}
n.inherit(event.Key, pn)
tracker.addKeyNotifier(n)
}
}
}
}
func (t *DefaultTrackers) removeEvent(tracker *keyTracker, event Event) {
for _, n := range t.prefixNotifiers {
tracker.removeKeyNotifier(n.getID())
}
tracker.notify(Remove, nil)
if len(tracker.getKeyNotifiers()) == 0 {
delete(t.trackers, event.Key)
}
}
// Start opens the underlying event channel and starts the event loop.
// See AddKeyNotifier, AddPrefixNotifier, RemoveKeyNotifier, RemovePrefixNotifier, and Purge for more information.
func (t *DefaultTrackers) Start() error {
t.waitGroup.Go(func() {
OUTER:
for {
select {
case op := <-t.notifiersChannel:
switch op.op {
case add:
if op.keyNotifier != nil {
t.addKeyNotifier(op.keyNotifier)
} else if op.prefixNotifier != nil {
t.addPrefixNotifier(op.prefixNotifier)
}
case remove:
if op.keyNotifier != nil {
t.removeKeyNotifier(op.keyNotifier.GetKey(), op.keyNotifier.getID())
} else if op.prefixNotifier != nil {
t.removePrefixNotifier(op.prefixNotifier)
}
case update:
t.updateValue(op.updateKey, op.updateFunc)
case purge:
t.purge(op.purgePrefix)
case stop:
t.stop()
}
case event := <-t.eventsChannel:
tracker, _ := t.getTracker(event.Key)
switch event.Type {
case Write:
t.writeEvent(tracker, event)
case Remove:
t.removeEvent(tracker, event)
}
case <-t.ctx.Done():
break OUTER
}
}
if len(t.prefixNotifiers) > 0 {
log.Warn().Msg("non-zero prefix notifiers detected on notifier shutdown")
}
if len(t.trackers) > 0 {
// loop through trackers
for _, tracker := range t.trackers {
if len(tracker.getKeyNotifiers()) > 0 {
log.Warn().Interface("key", tracker.key).Msg("dangling notifier detected on shutdown")
}
}
}
})
return nil
}
// Stop closes all channels and waits for the goroutine to finish.
func (t *DefaultTrackers) Stop() error {
op := notifierOp{
op: stop,
}
t.notifiersChannel <- op
t.waitGroup.Wait()
return nil
}
func (t *DefaultTrackers) stop() {
t.cancel()
}
// NewPrefixedEventWriter returns an event writer which keys will be
// automatically prefixed with given prefix.
//
// It's recommended that prefix ends up some kind of delimiter, like `.` or `/`.
func NewPrefixedEventWriter(prefix string, ew EventWriter) EventWriter {
return &prefixedEventWriter{
prefix: prefix,
parent: ew,
}
}
type prefixedEventWriter struct {
parent EventWriter
prefix string
}
// WriteEvent implements EventWriter interface.
func (ew *prefixedEventWriter) WriteEvent(key Key, value []byte) {
ew.parent.WriteEvent(Key(ew.prefix+string(key)), value)
}
// RemoveEvent implements EventWriter interface.
func (ew *prefixedEventWriter) RemoveEvent(key Key) {
ew.parent.RemoveEvent(Key(ew.prefix + string(key)))
}
// Purge implements EventWriter interface.
func (ew *prefixedEventWriter) Purge(prefix string) {
ew.parent.Purge(ew.prefix + prefix)
}
// UpdateValue implements EventWriter interface.
func (ew *prefixedEventWriter) UpdateValue(key Key, updateFunc UpdateValueFunc) {
ew.parent.UpdateValue(Key(ew.prefix+string(key)), updateFunc)
}