This repository has been archived by the owner on Jan 14, 2024. It is now read-only.
/
filteredwatcher.go
351 lines (315 loc) · 11.1 KB
/
filteredwatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package watch
import (
"context"
"fmt"
"math"
"math/rand"
"reflect"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
k8sevent "sigs.k8s.io/controller-runtime/pkg/event"
"kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1"
"kpt.dev/resourcegroup/controllers/resourcemap"
"kpt.dev/resourcegroup/controllers/status"
)
const (
// Copying strategy from k8s.io/client-go/tools/cache/reflector.go
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
minWatchTimeout = 5 * time.Minute
)
// maxWatchRetryFactor is used to determine when the next retry should happen.
// 2^^18 * time.Millisecond = 262,144 ms, which is about 4.36 minutes.
const maxWatchRetryFactor = 18
// Runnable defines the custom watch interface.
type Runnable interface {
Stop()
Run(ctx context.Context) error
}
const (
watchEventBookmarkType = "Bookmark"
watchEventErrorType = "Error"
watchEventUnsupportedType = "Unsupported"
)
// errorLoggingInterval specifies the minimal time interval two errors related to the same object
// and having the same errorType should be logged.
const errorLoggingInterval = time.Second
// filteredWatcher is wrapper around a watch interface.
// It only keeps the events for objects that are
// listed in a ResourceGroup CR.
type filteredWatcher struct {
gvk string
startWatch startWatchFunc
resources *resourcemap.ResourceMap
// errorTracker maps an error to the time when the same error happened last time.
errorTracker map[string]time.Time
// channel is the channel for ResourceGroup generic events.
channel chan k8sevent.GenericEvent
// The following fields are guarded by the mutex.
mux sync.Mutex
base watch.Interface
stopped bool
}
// filteredWatcher implements the Runnable interface.
var _ Runnable = &filteredWatcher{}
// NewFiltered returns a new filtered watch initialized with the given options.
func NewFiltered(_ context.Context, cfg watcherConfig) Runnable {
return &filteredWatcher{
gvk: cfg.gvk.String(),
startWatch: cfg.startWatch,
resources: cfg.resources,
base: watch.NewEmptyWatch(),
errorTracker: make(map[string]time.Time),
channel: cfg.channel,
}
}
// pruneErrors removes the errors happened before errorLoggingInterval from w.errorTracker.
// This is to save the memory usage for tracking errors.
func (w *filteredWatcher) pruneErrors() {
for errName, lastErrorTime := range w.errorTracker {
if time.Since(lastErrorTime) >= errorLoggingInterval {
delete(w.errorTracker, errName)
}
}
}
// addError checks whether an error identified by the errorID has been tracked,
// and handles it in one of the following ways:
// - tracks it if it has not yet been tracked;
// - updates the time for this error to time.Now() if `errorLoggingInterval` has passed
// since the same error happened last time;
// - ignore the error if `errorLoggingInterval` has NOT passed since it happened last time.
//
// addError returns false if the error is ignored, and true if it is not ignored.
func (w *filteredWatcher) addError(errorID string) bool {
lastErrorTime, ok := w.errorTracker[errorID]
if !ok || time.Since(lastErrorTime) >= errorLoggingInterval {
w.errorTracker[errorID] = time.Now()
return true
}
return false
}
// Stop fully stops the filteredWatcher in a threadsafe manner. This means that
// it stops the underlying base watch and prevents the filteredWatcher from
// restarting it (like it does if the API server disconnects the base watch).
func (w *filteredWatcher) Stop() {
w.mux.Lock()
defer w.mux.Unlock()
w.base.Stop()
w.stopped = true
}
func waitUntilNextRetry(retries int) {
if retries > maxWatchRetryFactor {
retries = maxWatchRetryFactor
}
milliseconds := int64(math.Pow(2, float64(retries)))
duration := time.Duration(milliseconds) * time.Millisecond
time.Sleep(duration)
}
// Run reads the event from the base watch interface,
// filters the event and pushes the object contained
// in the event to the controller work queue.
func (w *filteredWatcher) Run(context.Context) error {
klog.Infof("Watch started for %s", w.gvk)
var resourceVersion string
var retriesForWatchError int
for {
// There are three ways this function can return:
// 1. false, error -> We were unable to start the watch, so exit Run().
// 2. false, nil -> We have been stopped via Stop(), so exit Run().
// 3. true, nil -> We have not been stopped and we started a new watch.
started, err := w.start(resourceVersion)
if err != nil {
return err
}
if !started {
break
}
eventCount := 0
ignoredEventCount := 0
klog.Infof("(Re)starting watch for %s at resource version %q", w.gvk, resourceVersion)
for event := range w.base.ResultChan() {
w.pruneErrors()
newVersion, ignoreEvent, err := w.handle(event)
eventCount++
if ignoreEvent {
ignoredEventCount++
}
if err != nil {
if apierrors.IsResourceExpired(err) {
klog.Infof("Watch for %s at resource version %q closed with: %v", w.gvk, resourceVersion, err)
// `w.handle` may fail because we try to watch an old resource version, setting
// a watch on an old resource version will always fail.
// Reset `resourceVersion` to an empty string here so that we can start a new
// watch at the most recent resource version.
resourceVersion = ""
} else if w.addError(watchEventErrorType + errorID(err)) {
klog.Errorf("Watch for %s at resource version %q ended with: %v", w.gvk, resourceVersion, err)
}
retriesForWatchError++
waitUntilNextRetry(retriesForWatchError)
// Call `break` to restart the watch.
break
}
retriesForWatchError = 0
if newVersion != "" {
resourceVersion = newVersion
}
}
klog.Infof("Ending watch for %s at resource version %q (total events: %d, ignored events: %d)",
w.gvk, resourceVersion, eventCount, ignoredEventCount)
}
klog.Infof("Watch stopped for %s", w.gvk)
return nil
}
// start initiates a new base watch at the given resource version in a
// threadsafe manner and returns true if the new base watch was created. Returns
// false if the filteredWatcher is already stopped and returns error if the base
// watch could not be started.
func (w *filteredWatcher) start(resourceVersion string) (bool, error) {
w.mux.Lock()
defer w.mux.Unlock()
if w.stopped {
return false, nil
}
w.base.Stop()
// We want to avoid situations of hanging watchers. Stop any watchers that
// do not receive any events within the timeout window.
//nolint:gosec // don't need cryptographic randomness here
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
AllowWatchBookmarks: true,
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
Watch: true,
}
base, err := w.startWatch(options)
if err != nil {
return false, fmt.Errorf("failed to start watch for %s: %v", w.gvk, err)
}
w.base = base
return true, nil
}
func errorID(err error) string {
errTypeName := reflect.TypeOf(err).String()
var s string
switch t := err.(type) {
case *apierrors.StatusError:
if t == nil {
break
}
if t.ErrStatus.Details != nil {
s = t.ErrStatus.Details.Name
}
if s == "" {
s = fmt.Sprintf("%s-%s-%d", t.ErrStatus.Status, t.ErrStatus.Reason, t.ErrStatus.Code)
}
}
return errTypeName + s
}
// handle reads the event from the base watch interface,
// filters the event and pushes the object contained
// in the event to the controller work queue.
//
// handle returns the new resource version, whether the event should be ignored,
// and an error indicating that a watch.Error event type was encountered and the
// watch should be restarted.
func (w *filteredWatcher) handle(event watch.Event) (string, bool, error) {
var deleted bool
switch event.Type {
case watch.Added, watch.Modified:
deleted = false
case watch.Deleted:
deleted = true
case watch.Bookmark:
m, err := meta.Accessor(event.Object)
if err != nil {
// For watch.Bookmark, only the ResourceVersion field of event.Object is set.
// Therefore, set the second argument of w.addError to watchEventBookmarkType.
if w.addError(watchEventBookmarkType) {
klog.Errorf("Unable to access metadata of Bookmark event: %v", event)
}
return "", false, nil
}
return m.GetResourceVersion(), false, nil
case watch.Error:
return "", false, apierrors.FromObject(event.Object)
// Keep the default case to catch any new watch event types added in the future.
default:
if w.addError(watchEventUnsupportedType) {
klog.Errorf("Unsupported watch event: %#v", event)
}
return "", false, nil
}
// get client.Object from the runtime object.
object, ok := event.Object.(*unstructured.Unstructured)
if !ok {
klog.Infof("Received non unstructured object in watch event: %T", object)
return "", false, nil
}
// filter objects.
id := getID(object)
if !w.shouldProcess(object) {
klog.V(4).Infof("Ignoring event for object: %v", id)
return object.GetResourceVersion(), true, nil
}
if deleted {
klog.Infof("updating the reconciliation status: %v: %v", id, v1alpha1.NotFound)
w.resources.SetStatus(id, &resourcemap.CachedStatus{Status: v1alpha1.NotFound})
} else {
klog.Infof("Received watch event for created/updated object %q", id)
resStatus := status.ComputeStatus(object)
if resStatus != nil {
klog.Infof("updating the reconciliation status: %v: %v", id, resStatus.Status)
w.resources.SetStatus(id, resStatus)
}
}
for _, r := range w.resources.Get(id) {
resgroup := &v1alpha1.ResourceGroup{}
resgroup.SetNamespace(r.Namespace)
resgroup.SetName(r.Name)
klog.Infof("sending a generic event from watcher for %v", resgroup.GetObjectMeta())
w.channel <- k8sevent.GenericEvent{Object: resgroup}
}
return object.GetResourceVersion(), false, nil
}
// shouldProcess returns true if the given object should be enqueued by the
// watcher for processing.
func (w *filteredWatcher) shouldProcess(object client.Object) bool {
if w.resources == nil {
klog.V(4).Infof("The resources are empty")
}
id := getID(object)
return w.resources.HasResource(id)
}
func getID(object client.Object) v1alpha1.ObjMetadata {
gvk := object.GetObjectKind().GroupVersionKind()
id := v1alpha1.ObjMetadata{
Name: object.GetName(),
Namespace: object.GetNamespace(),
GroupKind: v1alpha1.GroupKind{
Group: gvk.Group,
Kind: gvk.Kind,
},
}
return id
}