-
Notifications
You must be signed in to change notification settings - Fork 245
/
watchingcache.go
621 lines (516 loc) · 19.9 KB
/
watchingcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
package schemacaching
import (
"context"
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/cache"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/genutil/mapz"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
)
var namespacesFallbackModeGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "watching_schema_cache_namespaces_fallback_mode",
Help: "value of 1 if the cache is in fallback mode and 0 otherwise",
})
var caveatsFallbackModeGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "watching_schema_cache_caveats_fallback_mode",
Help: "value of 1 if the cache is in fallback mode and 0 otherwise",
})
var schemaCacheRevisionGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "watching_schema_cache_tracked_revision",
Help: "the currently tracked max revision for the schema cache",
})
var definitionsReadCachedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "watching_schema_cache_definitions_read_cached_total",
Help: "cached number of definitions read from the watching cache",
}, []string{"definition_kind"})
var definitionsReadTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "watching_schema_cache_definitions_read_total",
Help: "total number of definitions read from the watching cache",
}, []string{"definition_kind"})
const maximumRetryCount = 10
func init() {
prometheus.MustRegister(namespacesFallbackModeGauge, caveatsFallbackModeGauge, schemaCacheRevisionGauge, definitionsReadCachedCounter, definitionsReadTotalCounter)
}
// watchingCachingProxy is a datastore proxy that caches schema (namespaces and caveat definitions)
// and updates its cache via a WatchSchema call. If the supplied datastore to be wrapped does not support
// this API, or the data is not available in this case or an error occurs, the updating cache fallsback
// to the standard schema cache.
type watchingCachingProxy struct {
datastore.Datastore
fallbackCache *definitionCachingProxy
gcWindow time.Duration
watchHeartbeat time.Duration
closed chan bool
namespaceCache *schemaWatchCache[*core.NamespaceDefinition]
caveatCache *schemaWatchCache[*core.CaveatDefinition]
}
// createWatchingCacheProxy creates and returns a watching cache proxy.
func createWatchingCacheProxy(delegate datastore.Datastore, c cache.Cache, gcWindow time.Duration, watchHeartbeat time.Duration) *watchingCachingProxy {
fallbackCache := &definitionCachingProxy{
Datastore: delegate,
c: c,
}
proxy := &watchingCachingProxy{
Datastore: delegate,
fallbackCache: fallbackCache,
gcWindow: gcWindow,
watchHeartbeat: watchHeartbeat,
closed: make(chan bool, 2),
namespaceCache: newSchemaWatchCache[*core.NamespaceDefinition](
"namespace",
datastore.NewNamespaceNotFoundErr,
func(ctx context.Context, name string, revision datastore.Revision) (*core.NamespaceDefinition, datastore.Revision, error) {
return fallbackCache.SnapshotReader(revision).ReadNamespaceByName(ctx, name)
},
func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[*core.NamespaceDefinition], error) {
return fallbackCache.SnapshotReader(revision).LookupNamespacesWithNames(ctx, names)
},
definitionsReadCachedCounter,
definitionsReadTotalCounter,
namespacesFallbackModeGauge,
),
caveatCache: newSchemaWatchCache[*core.CaveatDefinition](
"caveat",
datastore.NewCaveatNameNotFoundErr,
func(ctx context.Context, name string, revision datastore.Revision) (*core.CaveatDefinition, datastore.Revision, error) {
return fallbackCache.SnapshotReader(revision).ReadCaveatByName(ctx, name)
},
func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[*core.CaveatDefinition], error) {
return fallbackCache.SnapshotReader(revision).LookupCaveatsWithNames(ctx, names)
},
definitionsReadCachedCounter,
definitionsReadTotalCounter,
caveatsFallbackModeGauge,
),
}
return proxy
}
func (p *watchingCachingProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegateReader := p.Datastore.SnapshotReader(rev)
return &watchingCachingReader{delegateReader, rev, p}
}
func (p *watchingCachingProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
// NOTE: we always use the standard approach cache here, as it stores changes within the transaction
// itself, and should not impact the overall updating cache.
return p.fallbackCache.ReadWriteTx(ctx, f, opts...)
}
func (p *watchingCachingProxy) Start(ctx context.Context) error {
// Start async so that prepopulating doesn't block the server start.
go func() {
_ = p.startSync(ctx)
}()
return nil
}
func (p *watchingCachingProxy) startSync(ctx context.Context) error {
log.Info().Msg("starting watching cache")
headRev, err := p.Datastore.HeadRevision(context.Background())
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
return err
}
// Start watching for expired entries to be GCed.
go (func() {
log.Debug().Str("revision", headRev.String()).Msg("starting watching cache GC goroutine")
for {
select {
case <-ctx.Done():
log.Debug().Msg("GC routine for watch closed due to context cancelation")
return
case <-p.closed:
log.Debug().Msg("GC routine for watch closed")
return
case <-time.After(time.Hour):
log.Debug().Msg("beginning GC operation for schema watch")
p.namespaceCache.gcStaleEntries(p.gcWindow)
p.caveatCache.gcStaleEntries(p.gcWindow)
log.Debug().Msg("schema watch gc operation completed")
}
}
})()
var wg sync.WaitGroup
wg.Add(1)
// Start watching for schema changes.
go (func() {
retryCount := 0
restartWatch:
for {
p.namespaceCache.reset()
p.caveatCache.reset()
log.Debug().Str("revision", headRev.String()).Msg("starting watching cache watch operation")
reader := p.Datastore.SnapshotReader(headRev)
// Populate the cache with all definitions at the head revision.
log.Info().Str("revision", headRev.String()).Msg("prepopulating namespace watching cache")
namespaces, err := reader.ListAllNamespaces(ctx)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}
for _, namespaceDef := range namespaces {
err := p.namespaceCache.updateDefinition(namespaceDef.Definition.Name, namespaceDef.Definition, false, headRev)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}
}
log.Info().Str("revision", headRev.String()).Int("count", len(namespaces)).Msg("populated namespace watching cache")
log.Info().Str("revision", headRev.String()).Msg("prepopulating caveat watching cache")
caveats, err := reader.ListAllCaveats(ctx)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}
for _, caveatDef := range caveats {
err := p.caveatCache.updateDefinition(caveatDef.Definition.Name, caveatDef.Definition, false, headRev)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}
}
log.Info().Str("revision", headRev.String()).Int("count", len(caveats)).Msg("populated caveat watching cache")
log.Debug().Str("revision", headRev.String()).Dur("watch-heartbeat", p.watchHeartbeat).Msg("beginning schema watch")
ssc, serrc := p.Datastore.Watch(ctx, headRev, datastore.WatchOptions{
Content: datastore.WatchSchema | datastore.WatchCheckpoints,
CheckpointInterval: p.watchHeartbeat,
})
log.Debug().Msg("schema watch started")
p.namespaceCache.startAtRevision(headRev)
p.caveatCache.startAtRevision(headRev)
wg.Done()
for {
select {
case <-ctx.Done():
log.Debug().Msg("schema watch closed due to context cancelation")
return
case <-p.closed:
log.Debug().Msg("schema watch closed")
return
case ss := <-ssc:
log.Trace().Object("update", ss).Msg("received update from schema watch")
if ss.IsCheckpoint {
if converted, ok := ss.Revision.(revisions.WithInexactFloat64); ok {
schemaCacheRevisionGauge.Set(converted.InexactFloat64())
}
p.namespaceCache.setCheckpointRevision(ss.Revision)
p.caveatCache.setCheckpointRevision(ss.Revision)
continue
}
// Apply the change to the interval tree entry.
for _, changeDef := range ss.ChangedDefinitions {
switch t := changeDef.(type) {
case *core.NamespaceDefinition:
err := p.namespaceCache.updateDefinition(t.Name, t, false, ss.Revision)
if err != nil {
p.namespaceCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
}
case *core.CaveatDefinition:
err := p.caveatCache.updateDefinition(t.Name, t, false, ss.Revision)
if err != nil {
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
}
default:
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Error().Msg("unknown change definition type")
return
}
}
for _, deletedNamespaceName := range ss.DeletedNamespaces {
err := p.namespaceCache.updateDefinition(deletedNamespaceName, nil, true, ss.Revision)
if err != nil {
p.namespaceCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
break
}
}
for _, deletedCaveatName := range ss.DeletedCaveats {
err := p.caveatCache.updateDefinition(deletedCaveatName, nil, true, ss.Revision)
if err != nil {
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
break
}
}
case err := <-serrc:
var retryable datastore.ErrWatchRetryable
if errors.As(err, &retryable) && retryCount <= maximumRetryCount {
log.Warn().Err(err).Msg("received retryable error in schema watch; sleeping for a bit and restarting watch")
retryCount++
wg.Add(1)
pgxcommon.SleepOnErr(ctx, err, uint8(retryCount))
continue restartWatch
}
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received terminal error in schema watch; setting to permanent fallback mode")
return
}
}
}
})()
wg.Wait()
return nil
}
func (p *watchingCachingProxy) Close() error {
p.caveatCache.setFallbackMode()
p.namespaceCache.setFallbackMode()
// Close both goroutines
p.closed <- true
p.closed <- true
return errors.Join(p.fallbackCache.Close(), p.Datastore.Close())
}
// schemaWatchCache is a schema cache which updates based on changes received via the WatchSchema
// call.
type schemaWatchCache[T datastore.SchemaDefinition] struct {
// kind is a descriptive label of the kind of definitions in the cache.
kind string
notFoundError notFoundErrorFn
readDefinition readDefinitionFn[T]
lookupDefinitions lookupDefinitionsFn[T]
// inFallbackMode, if true, indicates that an error occurred with the WatchSchema call and that
// all further calls to this cache should passthrough, rather than using the cache itself (which
// is likely out of date).
// *Must* be accessed under the lock.
inFallbackMode bool
// checkpointRevision is the current revision at which the cache has been given *all* possible
// changes.
// *Must* be accessed under the lock.
checkpointRevision datastore.Revision
// entries are the entries in the cache, by name of the namespace or caveat.
// *Must* be accessed under the lock.
entries map[string]*intervalTracker[revisionedEntry[T]]
// definitionsReadCachedCounter is a counter of the number of cached definitions
// returned by the cache directly (without fallback)
definitionsReadCachedCounter *prometheus.CounterVec
// definitionsReadTotalCounter is a counter of the total number of definitions
// returned.
definitionsReadTotalCounter *prometheus.CounterVec
// fallbackGauge is a gauge holding a value of whether the cache is in fallback mode.
fallbackGauge prometheus.Gauge
lock sync.RWMutex
}
type revisionedEntry[T datastore.SchemaDefinition] struct {
revisionedDefinition datastore.RevisionedDefinition[T]
wasNotFound bool
}
type (
notFoundErrorFn func(name string) error
readDefinitionFn[T datastore.SchemaDefinition] func(ctx context.Context, name string, revision datastore.Revision) (T, datastore.Revision, error)
lookupDefinitionsFn[T datastore.SchemaDefinition] func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[T], error)
)
// newSchemaWatchCache creates a new schema watch cache, starting in fallback mode.
// To bring out of fallback mode, call startAtRevision to indicate that a watch loop
// has begun at that revision.
func newSchemaWatchCache[T datastore.SchemaDefinition](
kind string,
notFoundError notFoundErrorFn,
readDefinition readDefinitionFn[T],
lookupDefinitions lookupDefinitionsFn[T],
definitionsReadCachedCounter *prometheus.CounterVec,
definitionsReadTotalCounter *prometheus.CounterVec,
fallbackGauge prometheus.Gauge,
) *schemaWatchCache[T] {
fallbackGauge.Set(1)
return &schemaWatchCache[T]{
kind: kind,
notFoundError: notFoundError,
readDefinition: readDefinition,
lookupDefinitions: lookupDefinitions,
inFallbackMode: true,
entries: map[string]*intervalTracker[revisionedEntry[T]]{},
checkpointRevision: nil,
lock: sync.RWMutex{},
definitionsReadCachedCounter: definitionsReadCachedCounter,
definitionsReadTotalCounter: definitionsReadTotalCounter,
fallbackGauge: fallbackGauge,
}
}
func (swc *schemaWatchCache[T]) startAtRevision(revision datastore.Revision) {
swc.lock.Lock()
defer swc.lock.Unlock()
swc.checkpointRevision = revision
swc.inFallbackMode = false
swc.fallbackGauge.Set(0)
}
func (swc *schemaWatchCache[T]) gcStaleEntries(gcWindow time.Duration) {
swc.lock.Lock()
defer swc.lock.Unlock()
for entryName, entry := range swc.entries {
fullyRemoved := entry.removeStaleIntervals(gcWindow)
if fullyRemoved {
delete(swc.entries, entryName)
}
}
}
func (swc *schemaWatchCache[T]) setFallbackMode() {
swc.lock.Lock()
defer swc.lock.Unlock()
swc.inFallbackMode = true
swc.fallbackGauge.Set(1)
}
func (swc *schemaWatchCache[T]) reset() {
swc.lock.Lock()
defer swc.lock.Unlock()
swc.inFallbackMode = false
swc.fallbackGauge.Set(0)
swc.entries = map[string]*intervalTracker[revisionedEntry[T]]{}
swc.checkpointRevision = nil
}
func (swc *schemaWatchCache[T]) setCheckpointRevision(revision datastore.Revision) {
swc.lock.Lock()
defer swc.lock.Unlock()
swc.checkpointRevision = revision
}
func (swc *schemaWatchCache[T]) getTrackerForName(name string) *intervalTracker[revisionedEntry[T]] {
swc.lock.RLock()
tracker, ok := swc.entries[name]
swc.lock.RUnlock()
if ok {
return tracker
}
tracker = newIntervalTracker[revisionedEntry[T]]()
swc.lock.Lock()
swc.entries[name] = tracker
swc.lock.Unlock()
return tracker
}
func (swc *schemaWatchCache[T]) updateDefinition(name string, definition T, isDeletion bool, revision datastore.Revision) error {
tracker := swc.getTrackerForName(name)
result := tracker.add(revisionedEntry[T]{
revisionedDefinition: datastore.RevisionedDefinition[T]{
Definition: definition,
LastWrittenRevision: revision,
},
wasNotFound: isDeletion,
}, revision)
if !result {
return spiceerrors.MustBugf("received out of order insertion for definition %s", name)
}
return nil
}
func (swc *schemaWatchCache[T]) readDefinitionByName(ctx context.Context, name string, revision datastore.Revision) (T, datastore.Revision, error) {
swc.definitionsReadTotalCounter.WithLabelValues(swc.kind).Inc()
swc.lock.RLock()
inFallbackMode := swc.inFallbackMode
lastCheckpointRevision := swc.checkpointRevision
swc.lock.RUnlock()
// If in fallback mode, just read the definition directly from the fallback cache.
if inFallbackMode {
return swc.readDefinition(ctx, name, revision)
}
// Lookup the tracker for the definition name and then find the associated definition for the specified revision,
// if any.
tracker := swc.getTrackerForName(name)
found, ok := tracker.lookup(revision, lastCheckpointRevision)
if ok {
swc.definitionsReadCachedCounter.WithLabelValues(swc.kind).Inc()
// If an entry was found, return the stored information.
if found.wasNotFound {
return *new(T), nil, swc.notFoundError(name)
}
return found.revisionedDefinition.Definition, found.revisionedDefinition.LastWrittenRevision, nil
}
// Otherwise, read the definition from the fallback cache.
return swc.readDefinition(ctx, name, revision)
}
func (swc *schemaWatchCache[T]) readDefinitionsWithNames(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[T], error) {
swc.definitionsReadTotalCounter.WithLabelValues(swc.kind).Add(float64(len(names)))
swc.lock.RLock()
inFallbackMode := swc.inFallbackMode
lastCheckpointRevision := swc.checkpointRevision
swc.lock.RUnlock()
// If in fallback mode, just read the definition directly from the fallback cache.
if inFallbackMode {
return swc.lookupDefinitions(ctx, names, revision)
}
// Find whichever trackers are cached.
remainingNames := mapz.NewSet(names...)
foundDefs := make([]datastore.RevisionedDefinition[T], 0, len(names))
for _, name := range names {
tracker := swc.getTrackerForName(name)
found, ok := tracker.lookup(revision, lastCheckpointRevision)
if !ok {
continue
}
swc.definitionsReadCachedCounter.WithLabelValues(swc.kind).Inc()
remainingNames.Delete(name)
if !found.wasNotFound {
foundDefs = append(foundDefs, found.revisionedDefinition)
}
}
// If there are still remaining definition names to be looked up, look them up and then cache them.
if !remainingNames.IsEmpty() {
additionalDefs, err := swc.lookupDefinitions(ctx, remainingNames.AsSlice(), revision)
if err != nil {
return nil, err
}
foundDefs = append(foundDefs, additionalDefs...)
}
return foundDefs, nil
}
type watchingCachingReader struct {
datastore.Reader
rev datastore.Revision
p *watchingCachingProxy
}
func (r *watchingCachingReader) ReadNamespaceByName(
ctx context.Context,
name string,
) (*core.NamespaceDefinition, datastore.Revision, error) {
return r.p.namespaceCache.readDefinitionByName(ctx, name, r.rev)
}
func (r *watchingCachingReader) LookupNamespacesWithNames(
ctx context.Context,
nsNames []string,
) ([]datastore.RevisionedNamespace, error) {
return r.p.namespaceCache.readDefinitionsWithNames(ctx, nsNames, r.rev)
}
func (r *watchingCachingReader) ReadCaveatByName(
ctx context.Context,
name string,
) (*core.CaveatDefinition, datastore.Revision, error) {
return r.p.caveatCache.readDefinitionByName(ctx, name, r.rev)
}
func (r *watchingCachingReader) LookupCaveatsWithNames(
ctx context.Context,
caveatNames []string,
) ([]datastore.RevisionedCaveat, error) {
return r.p.caveatCache.readDefinitionsWithNames(ctx, caveatNames, r.rev)
}