-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
watchstoremgr.go
144 lines (120 loc) · 4.14 KB
/
watchstoremgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package store
import (
"context"
"path"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/logging/logfields"
)
// WSMFunc if a function which can be registered in the WatchStoreManager.
type WSMFunc func(context.Context)
// WatchStoreManager enables to register a set of functions to be asynchronously
// executed when the corresponding kvstore prefixes are synchronized (based on
// the implementation).
type WatchStoreManager interface {
// Register registers a function associated with a given kvstore prefix.
// It cannot be called once Run() has started.
Register(prefix string, function WSMFunc)
// Run starts the manager, blocking until the context is closed and all
// started functions terminated.
Run(ctx context.Context)
}
// wsmCommon implements the common logic shared by WatchStoreManager implementations.
type wsmCommon struct {
wg sync.WaitGroup
functions map[string]WSMFunc
running atomic.Bool
log *logrus.Entry
}
func newWSMCommon(clusterName string) wsmCommon {
return wsmCommon{
functions: make(map[string]WSMFunc),
log: log.WithField(logfields.ClusterName, clusterName),
}
}
// Register registers a function associated with a given kvstore prefix.
// It cannot be called once Run() has started.
func (mgr *wsmCommon) Register(prefix string, function WSMFunc) {
if mgr.running.Load() {
mgr.log.Panic("Cannot call Register while the watch store manager is running")
}
mgr.functions[prefix] = function
}
func (mgr *wsmCommon) ready(ctx context.Context, prefix string) {
if fn := mgr.functions[prefix]; fn != nil {
mgr.log.WithField(logfields.Prefix, prefix).Debug("Starting function for kvstore prefix")
delete(mgr.functions, prefix)
mgr.wg.Add(1)
go func() {
defer mgr.wg.Done()
fn(ctx)
mgr.log.WithField(logfields.Prefix, prefix).Debug("Function terminated for kvstore prefix")
}()
} else {
mgr.log.WithField(logfields.Prefix, prefix).Debug("Received sync event for unregistered prefix")
}
}
func (mgr *wsmCommon) run() {
mgr.log.Info("Starting watch store manager")
if mgr.running.Swap(true) {
mgr.log.Panic("Cannot start the watch store manager twice")
}
}
func (mgr *wsmCommon) wait() {
mgr.wg.Wait()
mgr.log.Info("Stopped watch store manager")
}
type wsmSync struct {
wsmCommon
clusterName string
backend WatchStoreBackend
store WatchStore
onUpdate func(prefix string)
}
// NewWatchStoreManagerSync implements the WatchStoreManager interface, starting the
// registered functions only once the corresponding prefix sync canary has been received.
// This ensures that the synchronization of the keys hosted under the given prefix
// have been successfully synchronized from the external source, even in case an
// ephemeral kvstore is used.
func NewWatchStoreManagerSync(backend WatchStoreBackend, clusterName string) WatchStoreManager {
mgr := wsmSync{
wsmCommon: newWSMCommon(clusterName),
clusterName: clusterName,
backend: backend,
}
mgr.store = NewRestartableWatchStore(clusterName, KVPairCreator, &mgr)
return &mgr
}
// Run starts the manager, blocking until the context is closed and all
// started functions terminated.
func (mgr *wsmSync) Run(ctx context.Context) {
mgr.run()
mgr.onUpdate = func(prefix string) { mgr.ready(ctx, prefix) }
mgr.store.Watch(ctx, mgr.backend, path.Join(kvstore.SyncedPrefix, mgr.clusterName))
mgr.wait()
}
func (mgr *wsmSync) OnUpdate(k Key) { mgr.onUpdate(k.GetKeyName()) }
func (mgr *wsmSync) OnDelete(k NamedKey) {}
type wsmImmediate struct {
wsmCommon
}
// NewWatchStoreManagerImmediate implements the WatchStoreManager interface,
// immediately starting the registered functions once Run() is executed.
func NewWatchStoreManagerImmediate(clusterName string) WatchStoreManager {
return &wsmImmediate{
wsmCommon: newWSMCommon(clusterName),
}
}
// Run starts the manager, blocking until the context is closed and all
// started functions terminated.
func (mgr *wsmImmediate) Run(ctx context.Context) {
mgr.run()
for prefix := range mgr.functions {
mgr.ready(ctx, prefix)
}
mgr.wait()
}