-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
watchstore.go
252 lines (209 loc) · 8 KB
/
watchstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package store
import (
"context"
"strings"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/metrics/metric"
)
// WatchStore abstracts the operations allowing to synchronize key/value pairs
// from a kvstore, emitting the corresponding events.
type WatchStore interface {
// Watch starts watching the specified kvstore prefix, blocking until the context is closed.
// Depending on the implementation, it might be executed multiple times.
Watch(ctx context.Context, backend WatchStoreBackend, prefix string)
// NumEntries returns the number of entries synchronized from the store.
NumEntries() uint64
// Synced returns whether the initial list of entries has been retrieved from
// the kvstore, and new events are currently being watched.
Synced() bool
// Drain emits a deletion event for each known key. It shall be called only
// when no watch operation is in progress.
Drain()
}
// WatchStoreBackend represents the subset of kvstore.BackendOperations leveraged
// by WatchStore implementations.
type WatchStoreBackend interface {
// ListAndWatch creates a new watcher for the given prefix after listing the existing keys.
ListAndWatch(ctx context.Context, prefix string, chanSize int) *kvstore.Watcher
}
type RWSOpt func(*restartableWatchStore)
// WSWithOnSyncCallback registers a function to be executed after
// listing all keys from the kvstore for the first time. Multiple
// callback functions can be registered.
func RWSWithOnSyncCallback(callback func(ctx context.Context)) RWSOpt {
return func(rws *restartableWatchStore) {
rws.onSyncCallbacks = append(rws.onSyncCallbacks, callback)
}
}
// WSWithEntriesGauge registers a Prometheus gauge metric that is kept
// in sync with the number of entries synchronized from the kvstore.
func RWSWithEntriesMetric(gauge prometheus.Gauge) RWSOpt {
return func(rws *restartableWatchStore) {
rws.entriesMetric = gauge
}
}
type rwsEntry struct {
key Key
stale bool
}
// restartableWatchStore implements the WatchStore interface, supporting
// multiple executions of the Watch() operation (granted that the previous one
// already terminated). This allows to transparently handle the case in which
// we had to create a new etcd connection (for instance following a failure)
// which refers to the same remote cluster.
type restartableWatchStore struct {
source string
keyCreator KeyCreator
observer Observer
watching atomic.Bool
synced atomic.Bool
onSyncCallbacks []func(ctx context.Context)
// Using a separate entries counter avoids the need for synchronizing the
// access to the state map, since the only concurrent reader is represented
// by the NumEntries() function.
state map[string]*rwsEntry
numEntries atomic.Uint64
log *logrus.Entry
entriesMetric prometheus.Gauge
syncMetric metric.Vec[metric.Gauge]
}
// NewRestartableWatchStore returns a WatchStore instance which supports
// restarting the watch operation multiple times, automatically handling
// the emission of deletion events for all stale entries (if enabled). It
// shall be restarted only once the previous Watch execution terminated.
func newRestartableWatchStore(clusterName string, keyCreator KeyCreator, observer Observer, m *Metrics, opts ...RWSOpt) WatchStore {
rws := &restartableWatchStore{
source: clusterName,
keyCreator: keyCreator,
observer: observer,
state: make(map[string]*rwsEntry),
log: log,
entriesMetric: metrics.NoOpGauge,
syncMetric: m.KVStoreInitialSyncCompleted,
}
for _, opt := range opts {
opt(rws)
}
rws.log = rws.log.WithField(logfields.ClusterName, rws.source)
return rws
}
// Watch starts watching the specified kvstore prefix, blocking until the context is closed.
// It might be executed multiple times, granted that the previous execution already terminated.
func (rws *restartableWatchStore) Watch(ctx context.Context, backend WatchStoreBackend, prefix string) {
// Append a trailing "/" to the prefix, to make sure that we watch only
// sub-elements belonging to that prefix, and not to sibling prefixes
// (for instance in case the last part of the prefix is the cluster name,
// and one is the substring of another).
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
rws.log = rws.log.WithField(logfields.Prefix, prefix)
syncedMetric := rws.syncMetric.WithLabelValues(
kvstore.GetScopeFromKey(prefix), rws.source, "read")
rws.log.Info("Starting restartable watch store")
syncedMetric.Set(metrics.BoolToFloat64(false))
if rws.watching.Swap(true) {
rws.log.Panic("Cannot start the watch store while still running")
}
defer func() {
rws.log.Info("Stopped restartable watch store")
syncedMetric.Set(metrics.BoolToFloat64(false))
rws.watching.Store(false)
rws.synced.Store(false)
}()
// Mark all known keys as stale.
for _, entry := range rws.state {
entry.stale = true
}
// The events channel is closed when the context is closed.
watcher := backend.ListAndWatch(ctx, prefix, 0)
for event := range watcher.Events {
if event.Typ == kvstore.EventTypeListDone {
rws.log.Debug("Initial synchronization completed")
rws.drainKeys(true)
syncedMetric.Set(metrics.BoolToFloat64(true))
rws.synced.Store(true)
for _, callback := range rws.onSyncCallbacks {
callback(ctx)
}
// Clear the list of callbacks so that they don't get executed
// a second time in case of reconnections.
rws.onSyncCallbacks = nil
continue
}
key := strings.TrimPrefix(event.Key, prefix)
rws.log.WithFields(logrus.Fields{
logfields.Key: key,
logfields.Event: event.Typ,
}).Debug("Received event from kvstore")
switch event.Typ {
case kvstore.EventTypeCreate, kvstore.EventTypeModify:
rws.handleUpsert(key, event.Value)
case kvstore.EventTypeDelete:
rws.handleDelete(key)
}
}
}
// NumEntries returns the number of entries synchronized from the store.
func (rws *restartableWatchStore) NumEntries() uint64 {
return rws.numEntries.Load()
}
// Synced returns whether the initial list of entries has been retrieved from
// the kvstore, and new events are currently being watched.
func (rws *restartableWatchStore) Synced() bool {
return rws.synced.Load()
}
// Drain emits a deletion event for each known key. It shall be called only
// when no watch operation is in progress.
func (rws *restartableWatchStore) Drain() {
if rws.watching.Swap(true) {
rws.log.Panic("Cannot drain the watch store while still running")
}
defer rws.watching.Store(false)
rws.log.Info("Draining restartable watch store")
rws.drainKeys(false)
rws.log.Info("Drained restartable watch store")
}
// drainKeys emits synthetic deletion events:
// * staleOnly == true: for all keys marked as stale;
// * staleOnly == false: for all known keys;
func (rws *restartableWatchStore) drainKeys(staleOnly bool) {
for key, entry := range rws.state {
if !staleOnly || entry.stale {
rws.log.WithField(logfields.Key, key).Debug("Emitting deletion event for stale key")
rws.handleDelete(key)
}
}
}
func (rws *restartableWatchStore) handleUpsert(key string, value []byte) {
entry := &rwsEntry{key: rws.keyCreator()}
if err := entry.key.Unmarshal(key, value); err != nil {
rws.log.WithFields(logrus.Fields{
logfields.Key: key,
logfields.Value: string(value),
}).WithError(err).Warning("Unable to unmarshal value")
return
}
rws.state[key] = entry
rws.numEntries.Store(uint64(len(rws.state)))
rws.entriesMetric.Set(float64(len(rws.state)))
rws.observer.OnUpdate(entry.key)
}
func (rws *restartableWatchStore) handleDelete(key string) {
entry, ok := rws.state[key]
if !ok {
rws.log.WithField(logfields.Key, key).Warning("Received deletion event for unknown key")
return
}
delete(rws.state, key)
rws.numEntries.Store(uint64(len(rws.state)))
rws.entriesMetric.Set(float64(len(rws.state)))
rws.observer.OnDelete(entry.key)
}