This repository has been archived by the owner on Mar 20, 2024. It is now read-only.
forked from cilium/cilium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
512 lines (417 loc) · 13.9 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
// Copyright 2018-2019 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"context"
"fmt"
"path"
"strings"
"time"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/option"
"github.com/sirupsen/logrus"
)
const (
// listTimeoutDefault is the default timeout to wait while performing
// the initial list operation of objects from the kvstore
listTimeoutDefault = 3 * time.Minute
// watcherChanSize is the size of the channel to buffer kvstore events
watcherChanSize = 100
)
var (
controllers controller.Manager
log = logging.DefaultLogger.WithField(logfields.LogSubsys, "shared-store")
)
// KeyCreator is the function to create a new empty Key instances. Store
// collaborators must implement this interface and provide the implementation
// in the Configuration structure.
type KeyCreator func() Key
// Configuration is the set of configuration parameters of a shared store.
type Configuration struct {
// Prefix is the key prefix of the store shared by all keys. The prefix
// is the unique identification of the store. Multiple collaborators
// connected to the same kvstore cluster configuring stores with
// matching prefixes will automatically form a shared store. This
// parameter is required.
Prefix string
// SynchronizationInterval is the interval in which locally owned keys
// are synchronized with the kvstore. This parameter is optional.
SynchronizationInterval time.Duration
// KeyCreator is called to allocate a Key instance when a new shared
// key is discovered. This parameter is required.
KeyCreator KeyCreator
// Backend is the kvstore to use as a backend. If no backend is
// specified, kvstore.Client() is being used.
Backend kvstore.BackendOperations
// Observer is the observe that will receive events on key mutations
Observer Observer
}
// validate is invoked by JoinSharedStore to validate and complete the
// configuration. It returns nil when the configuration is valid.
func (c *Configuration) validate() error {
if c.Prefix == "" {
return fmt.Errorf("prefix must be specified")
}
if c.KeyCreator == nil {
return fmt.Errorf("KeyCreator must be specified")
}
if c.SynchronizationInterval == 0 {
c.SynchronizationInterval = option.Config.KVstorePeriodicSync
}
if c.Backend == nil {
c.Backend = kvstore.Client()
}
return nil
}
// SharedStore is an instance of a shared store. It is created with
// JoinSharedStore() and released with the SharedStore.Close() function.
type SharedStore struct {
// conf is a copy of the store configuration. This field is never
// mutated after JoinSharedStore() so it is safe to access this without
// a lock.
conf Configuration
// name is the name of the shared store. It is derived from the kvstore
// prefix.
name string
// controllerName is the name of the controller used to synchronize
// with the kvstore. It is derived from the name.
controllerName string
// backend is the backend as configured via Configuration
backend kvstore.BackendOperations
// mutex protects mutations to localKeys and sharedKeys
mutex lock.RWMutex
// localKeys is a map of keys that are owned by the local instance. All
// local keys are synchronized with the kvstore. This map can be
// modified with UpdateLocalKey() and DeleteLocalKey().
localKeys map[string]LocalKey
// sharedKeys is a map of all keys that either have been discovered
// from remote collaborators or successfully shared local keys. This
// map represents the state in the kvstore and is updated based on
// kvstore events.
sharedKeys map[string]Key
kvstoreWatcher *kvstore.Watcher
}
// Observer receives events when objects in the store mutate
type Observer interface {
// OnDelete is called when the key has been deleted from the shared store
OnDelete(k NamedKey)
// OnUpdate is called whenever a change has occurred in the data
// structure represented by the key
OnUpdate(k Key)
}
// NamedKey is an interface that a data structure must implement in order to
// be deleted from a SharedStore.
type NamedKey interface {
// GetKeyName must return the name of the key. The name of the key must
// be unique within the store and stable for a particular key. The name
// of the key must be identical across agent restarts as the keys
// remain in the kvstore.
GetKeyName() string
}
// Key is the interface that a data structure must implement in order to be
// stored and shared as a key in a SharedStore.
type Key interface {
NamedKey
// Marshal is called to retrieve the byte slice representation of the
// data represented by the key to store it in the kvstore. The function
// must ensure that the underlying datatype is properly locked. It is
// typically a good idea to use json.Marshal to implement this
// function.
Marshal() ([]byte, error)
// Unmarshal is called when an update from the kvstore is received. The
// byte slice passed to the function is coming from the Marshal
// function from another collaborator. The function must unmarshal and
// update the underlying data type. It is typically a good idea to use
// json.Unmarshal to implement this function.
Unmarshal(data []byte) error
}
// LocalKey is a Key owned by the local store instance
type LocalKey interface {
Key
// DeepKeyCopy must return a deep copy of the key
DeepKeyCopy() LocalKey
}
// JoinSharedStore creates a new shared store based on the provided
// configuration. An error is returned if the configuration is invalid. The
// store is initialized with the contents of the kvstore. An error is returned
// if the contents cannot be retrieved synchronously from the kvstore. Starts a
// controller to continuously synchronize the store with the kvstore.
func JoinSharedStore(c Configuration) (*SharedStore, error) {
if err := c.validate(); err != nil {
return nil, err
}
s := &SharedStore{
conf: c,
localKeys: map[string]LocalKey{},
sharedKeys: map[string]Key{},
backend: c.Backend,
}
s.name = "store-" + s.conf.Prefix
s.controllerName = "kvstore-sync-" + s.name
if err := s.listAndStartWatcher(); err != nil {
return nil, err
}
controllers.UpdateController(s.controllerName,
controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
return s.syncLocalKeys()
},
RunInterval: s.conf.SynchronizationInterval,
},
)
return s, nil
}
func (s *SharedStore) onDelete(k NamedKey) {
if s.conf.Observer != nil {
s.conf.Observer.OnDelete(k)
}
}
func (s *SharedStore) onUpdate(k Key) {
if s.conf.Observer != nil {
s.conf.Observer.OnUpdate(k)
}
}
// Release frees all resources own by the store but leaves all keys in the
// kvstore intact
func (s *SharedStore) Release() {
// Wait for all write operations to complete and then block all further
// operations
s.mutex.Lock()
defer s.mutex.Unlock()
if s.kvstoreWatcher != nil {
s.kvstoreWatcher.Stop()
}
controllers.RemoveController(s.controllerName)
}
// Close stops participation with a shared store and removes all keys owned by
// this node in the kvstore. This stops the controller started by
// JoinSharedStore().
func (s *SharedStore) Close() {
s.Release()
for name, key := range s.localKeys {
if err := s.backend.Delete(s.keyPath(key)); err != nil {
s.getLogger().WithError(err).Warning("Unable to delete key in kvstore")
}
delete(s.localKeys, name)
// Since we have received our own notification we also need to remove
// it from the shared keys.
delete(s.sharedKeys, name)
s.onDelete(key)
}
}
// keyPath returns the absolute kvstore path of a key
func (s *SharedStore) keyPath(key NamedKey) string {
// WARNING - STABLE API: The composition of the absolute key path
// cannot be changed without breaking up and downgrades.
return path.Join(s.conf.Prefix, key.GetKeyName())
}
// syncLocalKey synchronizes a key to the kvstore
func (s *SharedStore) syncLocalKey(key LocalKey) error {
jsonValue, err := key.Marshal()
if err != nil {
return err
}
// Update key in kvstore, overwrite an eventual existing key, attach
// lease to expire entry when agent dies and never comes back up.
if _, err := s.backend.UpdateIfDifferent(context.TODO(), s.keyPath(key), jsonValue, true); err != nil {
return err
}
return nil
}
// syncLocalKeys synchronizes all local keys with the kvstore
func (s *SharedStore) syncLocalKeys() error {
// Create a copy of all local keys so we can unlock and sync to kvstore
// without holding the lock
s.mutex.RLock()
keys := []LocalKey{}
for _, key := range s.localKeys {
keys = append(keys, key)
}
s.mutex.RUnlock()
for _, key := range keys {
if err := s.syncLocalKey(key); err != nil {
return err
}
}
return nil
}
func (s *SharedStore) lookupLocalKey(name string) LocalKey {
s.mutex.RLock()
defer s.mutex.RUnlock()
for _, key := range s.localKeys {
if key.GetKeyName() == name {
return key
}
}
return nil
}
// SharedKeysMap returns a copy of the SharedKeysMap, the returned map can
// be safely modified but the values of the map represent the actual data
// stored in the internal SharedStore SharedKeys map.
func (s *SharedStore) SharedKeysMap() map[string]Key {
s.mutex.RLock()
defer s.mutex.RUnlock()
sharedKeysCopy := make(map[string]Key, len(s.sharedKeys))
for k, v := range s.sharedKeys {
sharedKeysCopy[k] = v
}
return sharedKeysCopy
}
// UpdateLocalKey adds a key to be synchronized with the kvstore
func (s *SharedStore) UpdateLocalKey(key LocalKey) {
s.mutex.Lock()
s.localKeys[key.GetKeyName()] = key.DeepKeyCopy()
s.mutex.Unlock()
}
// UpdateLocalKeySync synchronously synchronizes a local key with the kvstore
// and adds it to the list of local keys to be synchronized if the initial
// synchronous synchronization was successful
func (s *SharedStore) UpdateLocalKeySync(key LocalKey) error {
s.mutex.Lock()
defer s.mutex.Unlock()
err := s.syncLocalKey(key)
if err == nil {
s.localKeys[key.GetKeyName()] = key.DeepKeyCopy()
}
return err
}
// UpdateKeySync synchronously synchronizes a key with the kvstore.
func (s *SharedStore) UpdateKeySync(key LocalKey) error {
return s.syncLocalKey(key)
}
// DeleteLocalKey removes a key from being synchronized with the kvstore
func (s *SharedStore) DeleteLocalKey(key NamedKey) {
name := key.GetKeyName()
s.mutex.Lock()
_, ok := s.localKeys[name]
delete(s.localKeys, name)
s.mutex.Unlock()
err := s.backend.Delete(s.keyPath(key))
if ok {
if err != nil {
s.getLogger().WithError(err).Warning("Unable to delete key in kvstore")
}
s.onDelete(key)
}
}
// getLocalKeys returns all local keys
func (s *SharedStore) getLocalKeys() []Key {
s.mutex.RLock()
defer s.mutex.RUnlock()
keys := make([]Key, len(s.localKeys))
idx := 0
for _, key := range s.localKeys {
keys[idx] = key
idx++
}
return keys
}
// getSharedKeys returns all shared keys
func (s *SharedStore) getSharedKeys() []Key {
s.mutex.RLock()
defer s.mutex.RUnlock()
keys := make([]Key, len(s.sharedKeys))
idx := 0
for _, key := range s.sharedKeys {
keys[idx] = key
idx++
}
return keys
}
func (s *SharedStore) getLogger() *logrus.Entry {
return log.WithFields(logrus.Fields{
"storeName": s.name,
})
}
func (s *SharedStore) updateKey(name string, value []byte) error {
newKey := s.conf.KeyCreator()
if err := newKey.Unmarshal(value); err != nil {
return err
}
s.mutex.Lock()
s.sharedKeys[name] = newKey
s.mutex.Unlock()
s.onUpdate(newKey)
return nil
}
func (s *SharedStore) deleteSharedKey(name string) {
s.mutex.Lock()
existingKey, ok := s.sharedKeys[name]
delete(s.sharedKeys, name)
s.mutex.Unlock()
if ok {
go func() {
time.Sleep(defaults.NodeDeleteDelay)
s.mutex.RLock()
_, ok := s.sharedKeys[name]
s.mutex.RUnlock()
if ok {
log.Warningf("Received node delete event for node %s which re-appeared within %s",
name, defaults.NodeDeleteDelay)
return
}
s.onDelete(existingKey)
}()
} else {
s.getLogger().WithField("key", name).
Warning("Unable to find deleted key in local state")
}
}
func (s *SharedStore) listAndStartWatcher() error {
listDone := make(chan bool)
go s.watcher(listDone)
select {
case <-listDone:
case <-time.After(listTimeoutDefault):
return fmt.Errorf("timeout while retrieving initial list of objects from kvstore")
}
return nil
}
func (s *SharedStore) watcher(listDone chan bool) {
s.kvstoreWatcher = s.backend.ListAndWatch(s.name+"-watcher", s.conf.Prefix, watcherChanSize)
for event := range s.kvstoreWatcher.Events {
if event.Typ == kvstore.EventTypeListDone {
s.getLogger().Debug("Initial list of objects received from kvstore")
close(listDone)
continue
}
logger := s.getLogger().WithFields(logrus.Fields{
"key": event.Key,
"eventType": event.Typ,
})
logger.Debugf("Received key update via kvstore [value %s]", string(event.Value))
keyName := strings.TrimPrefix(event.Key, s.conf.Prefix)
if keyName[0] == '/' {
keyName = keyName[1:]
}
switch event.Typ {
case kvstore.EventTypeCreate, kvstore.EventTypeModify:
if err := s.updateKey(keyName, event.Value); err != nil {
logger.WithError(err).Warningf("Unable to unmarshal store value: %s", string(event.Value))
}
case kvstore.EventTypeDelete:
if localKey := s.lookupLocalKey(keyName); localKey != nil {
logger.Warning("Received delete event for local key. Re-creating the key in the kvstore")
s.syncLocalKey(localKey)
} else {
s.deleteSharedKey(keyName)
}
}
}
}