/
store.go
353 lines (306 loc) · 11 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
// Copyright 2019 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ram
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
antreastorage "antrea.io/antrea/pkg/apiserver/storage"
)
const (
// watcherChanSize is the buffer size of watchers.
watcherChanSize = 1000
// watcherAddTimeout is the timeout of sending one event to all watchers.
// Watchers whose buffer can't be available in it will be terminated.
watcherAddTimeout = 50 * time.Millisecond
)
type watchersMap map[int]*storeWatcher
// store implements ram.Interface, serving the requests for a given resource from its internal cache storage.
type store struct {
// watcherMutex protects the watchers map from concurrent access during watcher insertion and deletion.
watcherMutex sync.RWMutex
// eventMutex is used to avoid race condition when generating events.
eventMutex sync.RWMutex
// incomingHWM is HighWaterMark for performance debugging.
// It records the maximum number of events backed up in incoming channel that have been seen.
incomingHWM storage.HighWaterMark
// incoming stores the incoming events that should be dispatched to watchers.
incoming chan antreastorage.InternalEvent
// storage is the underlying storage.
storage cache.Indexer
// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc cache.KeyFunc
// selectFunc is used to check whether a watcher is interested in a given object.
selectFunc antreastorage.SelectFunc
// genEventFunc is used to generate InternalEvent from update of an object.
genEventFunc antreastorage.GenEventFunc
// newFunc is a function that creates new empty object of this type.
newFunc func() runtime.Object
// resourceVersion up to which the store has generated.
resourceVersion uint64
// watcherIdx is the index that will be allocated to next watcher and used as key in watchersMap
// so that a watcher can be deleted from the map according to its index later.
watcherIdx int
// watchers is a mapping from the index of a watcher to the watcher.
watchers watchersMap
stopCh chan struct{}
// timer is used when sending events to watchers. Hold it here to avoid unnecessary
// re-allocation for each event.
timer *time.Timer
}
// NewStore creates a store based on the provided KeyFunc, Indexers, and GenEventFunc.
// KeyFunc decides how to get the key from an object.
// Indexers decides how to build indices for an object.
// GenEventFunc decides how to generate InternalEvent for an update of an object.
func NewStore(keyFunc cache.KeyFunc, indexers cache.Indexers, genEventFunc antreastorage.GenEventFunc, selectorFunc antreastorage.SelectFunc, newFunc func() runtime.Object) *store {
stopCh := make(chan struct{})
storage := cache.NewIndexer(keyFunc, indexers)
timer := time.NewTimer(time.Duration(0))
// Ensure the timer is stopped and drain the channel.
if !timer.Stop() {
<-timer.C
}
s := &store{
incoming: make(chan antreastorage.InternalEvent, 100),
storage: storage,
stopCh: stopCh,
watchers: make(map[int]*storeWatcher),
keyFunc: keyFunc,
genEventFunc: genEventFunc,
selectFunc: selectorFunc,
timer: timer,
newFunc: newFunc,
}
go s.dispatchEvents()
return s
}
// nextResourceVersion increments the resourceVersion and returns it.
// It is not thread safe and should be called while holding a lock on eventMutex.
func (s *store) nextResourceVersion() uint64 {
s.resourceVersion++
return s.resourceVersion
}
func (s *store) processEvent(event antreastorage.InternalEvent) {
if curLen := int64(len(s.incoming)); s.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
klog.V(1).Infof("%v objects queued in incoming channel", curLen)
}
s.incoming <- event
}
// Get returns the object matching the provided key along with a boolean value
// indicating of its presence in the store and an error, if any.
func (s *store) Get(key string) (interface{}, bool, error) {
return s.storage.GetByKey(key)
}
// GetByIndex returns the objects which match the indexer or the error encountered.
func (s *store) GetByIndex(indexName, indexKey string) ([]interface{}, error) {
return s.storage.ByIndex(indexName, indexKey)
}
// Create stores the object in internal cache storage.
func (s *store) Create(obj interface{}) error {
key, err := s.keyFunc(obj)
if err != nil {
return fmt.Errorf("couldn't get key for object %+v: %v", obj, err)
}
s.eventMutex.Lock()
defer s.eventMutex.Unlock()
_, exists, _ := s.storage.GetByKey(key)
if exists {
return fmt.Errorf("object %+v already exists in storage", obj)
}
var event antreastorage.InternalEvent
if s.genEventFunc != nil {
event, err = s.genEventFunc(key, nil, obj, s.nextResourceVersion())
if err != nil {
return fmt.Errorf("error generating event for Create operation of object %+v: %v", obj, err)
}
}
// The object has been verified with keyFunc in the beginning, can never encounter any error.
s.storage.Add(obj)
if event != nil {
s.processEvent(event)
}
return nil
}
// Update updates the store with the latest copy of the object, if it exists.
func (s *store) Update(obj interface{}) error {
key, err := s.keyFunc(obj)
if err != nil {
return fmt.Errorf("couldn't get key for object %+v: %v", obj, err)
}
s.eventMutex.Lock()
defer s.eventMutex.Unlock()
prevObj, exists, _ := s.storage.GetByKey(key)
if !exists {
return fmt.Errorf("object %+v not found in storage", obj)
}
var event antreastorage.InternalEvent
if s.genEventFunc != nil {
event, err = s.genEventFunc(key, prevObj, obj, s.nextResourceVersion())
if err != nil {
return fmt.Errorf("error generating event for Update operation of object %+v: %v", obj, err)
}
}
s.storage.Update(obj)
if event != nil {
s.processEvent(event)
}
return nil
}
// List returns a list of all the objects.
func (s *store) List() []interface{} {
return s.storage.List()
}
// Delete deletes the object from internal cache storage.
func (s *store) Delete(key string) error {
s.eventMutex.Lock()
defer s.eventMutex.Unlock()
prevObj, exists, _ := s.storage.GetByKey(key)
if !exists {
return fmt.Errorf("object %+v not found in storage", key)
}
var event antreastorage.InternalEvent
var err error
if s.genEventFunc != nil {
event, err = s.genEventFunc(key, prevObj, nil, s.nextResourceVersion())
if err != nil {
return fmt.Errorf("error generating event for Delete operation: %v", err)
}
}
s.storage.Delete(prevObj)
if event != nil {
s.processEvent(event)
}
return nil
}
// Watch creates a watcher based on the key, label selector and field selector.
func (s *store) Watch(ctx context.Context, key string, labelSelector labels.Selector, fieldSelector fields.Selector) (watch.Interface, error) {
if s.genEventFunc == nil {
return nil, fmt.Errorf("genEventFunc must be set to support watching")
}
// Locks eventMutex for reading so that no new events will be generated in the meantime
// while other watchers won't be blocked.
s.eventMutex.RLock()
defer s.eventMutex.RUnlock()
selectors := &antreastorage.Selectors{
Key: key,
Label: labelSelector,
Field: fieldSelector,
}
allObjects := s.storage.List()
initEvents := make([]antreastorage.InternalEvent, 0, len(allObjects))
for _, obj := range allObjects {
// Objects retrieved from storage have been verified with keyFunc when they are inserted.
key, _ := s.keyFunc(obj)
// Check whether the watcher is interested in this object, don't generate an initEvent if not.
if s.selectFunc != nil && !s.selectFunc(selectors, key, obj) {
continue
}
event, err := s.genEventFunc(key, nil, obj, s.resourceVersion)
if err != nil {
return nil, err
}
initEvents = append(initEvents, event)
}
watcher := func() *storeWatcher {
s.watcherMutex.Lock()
defer s.watcherMutex.Unlock()
w := newStoreWatcher(watcherChanSize, selectors, forgetWatcher(s, s.watcherIdx), s.newFunc)
s.watchers[s.watcherIdx] = w
s.watcherIdx++
return w
}()
// Specify current resourceVersion so that old events that were currently buffered in incoming channel won't be
// delivered to the watcher twice when initEvents already have them.
go watcher.process(ctx, initEvents, s.resourceVersion)
return watcher, nil
}
// GetWatchersNum gets the number of watchers for the store.
func (s *store) GetWatchersNum() int {
s.watcherMutex.RLock()
defer s.watcherMutex.RUnlock()
return len(s.watchers)
}
func forgetWatcher(s *store, index int) func() {
return func() {
s.watcherMutex.Lock()
defer s.watcherMutex.Unlock()
delete(s.watchers, index)
}
}
func (s *store) dispatchEvents() {
for {
select {
case event, ok := <-s.incoming:
if !ok {
return
}
s.dispatchEvent(event)
case <-s.stopCh:
return
}
}
}
func (s *store) dispatchEvent(event antreastorage.InternalEvent) {
var failedWatchers []*storeWatcher
func() {
s.watcherMutex.RLock()
defer s.watcherMutex.RUnlock()
// First try to send events without blocking, to avoid setting up a timer
// for every event.
// blockedWatchers keeps watchers whose buffer are full.
var blockedWatchers []*storeWatcher
// TODO: Optimize this to dispatch the event based on watchers' selector.
for _, watcher := range s.watchers {
if !watcher.nonBlockingAdd(event) {
blockedWatchers = append(blockedWatchers, watcher)
}
}
if len(blockedWatchers) == 0 {
return
}
klog.V(2).Infof("%d watchers were not available to receive event %+v immediately", len(blockedWatchers), event)
// Then try to send events to blocked watchers with a timeout. If it
// timeouts, it means the watcher is too slow to consume the events or the
// underlying connection is already dead, terminate the watcher in this case.
// antrea-agent will start a new watch after it's disconnected.
s.timer.Reset(watcherAddTimeout)
timer := s.timer
for _, watcher := range blockedWatchers {
if !watcher.add(event, timer) {
failedWatchers = append(failedWatchers, watcher)
// setting timer to nil to let watcher know know the timer has fired.
timer = nil
}
}
// Stop the timer and drain its channel if it is not fired.
if timer != nil && !timer.Stop() {
<-timer.C
}
}()
// Terminate unresponsive watchers, this must be executed without watcherMutex as
// watcher.Stop will require the lock itself.
for _, watcher := range failedWatchers {
klog.Warningf("Forcing stopping watcher (selectors: %v) due to unresponsiveness", watcher.selectors)
watcher.Stop()
}
}