-
Notifications
You must be signed in to change notification settings - Fork 40
/
manager.go
225 lines (195 loc) · 6.4 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package watch
import (
"context"
"net/http"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"kpt.dev/configsync/pkg/resourcegroup/controllers/resourcemap"
)
// Manager records which GVK's are watched.
// When a new GVK needs to be watches, it adds the watch
// to the associated controller.
type Manager struct {
// cfg is the rest config used to talk to apiserver.
cfg *rest.Config
// mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources.
mapper meta.RESTMapper
// resources is the declared resources that are parsed from Git.
resources *resourcemap.ResourceMap
// createWatcherFunc is the function to create a watcher.
createWatcherFunc createWatcherFunc
// channel is the channel for ResourceGroup generic events.
channel chan event.GenericEvent
// The following fields are guarded by the mutex.
mux sync.Mutex
// watcherMap maps GVKs to their associated watchers
watcherMap map[schema.GroupVersionKind]Runnable
// needsUpdate indicates if the Manager's watches need to be updated.
needsUpdate bool
}
// Options contains options for creating a watch manager.
type Options struct {
// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources.
Mapper meta.RESTMapper
watcherFunc createWatcherFunc
}
// DefaultOptions return the default options:
// - create discovery RESTmapper from the passed rest.Config
// - use createWatcher to create watchers
func DefaultOptions(cfg *rest.Config, httpClient *http.Client) (*Options, error) {
mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, err
}
return &Options{
Mapper: mapper,
watcherFunc: createWatcher,
}, nil
}
// NewManager starts a new watch manager
func NewManager(cfg *rest.Config, httpClient *http.Client, decls *resourcemap.ResourceMap, channel chan event.GenericEvent, options *Options) (*Manager, error) {
if options == nil {
var err error
options, err = DefaultOptions(cfg, httpClient)
if err != nil {
return nil, err
}
}
return &Manager{
cfg: cfg,
resources: decls,
watcherMap: make(map[schema.GroupVersionKind]Runnable),
createWatcherFunc: options.watcherFunc,
mapper: options.Mapper,
channel: channel,
mux: sync.Mutex{},
}, nil
}
// NeedsUpdate returns true if the Manager's watches need to be updated. This function is threadsafe.
func (m *Manager) NeedsUpdate() bool {
m.mux.Lock()
defer m.mux.Unlock()
return m.needsUpdate
}
// UpdateWatches accepts a map of GVKs that should be watched and takes the
// following actions:
// - stop watchers for any GroupVersionKind that is not present in the given map.
// - start watchers for any GroupVersionKind that is present in the given map and not present in the current watch map.
//
// This function is threadsafe.
func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVersionKind]struct{}) error {
m.mux.Lock()
defer m.mux.Unlock()
m.needsUpdate = false
var startedWatches, stoppedWatches uint64
// Stop obsolete watchers.
for gvk := range m.watcherMap {
if _, keepWatching := gvkMap[gvk]; !keepWatching {
// We were watching the type, but no longer have declarations for it.
// It is safe to stop the watcher.
m.stopWatcher(gvk)
stoppedWatches++
}
}
// Start new watchers
var errs []error
for gvk := range gvkMap {
if _, isWatched := m.watcherMap[gvk]; !isWatched {
// We don't have a watcher for this type, so add a watcher for it.
if err := m.startWatcher(ctx, gvk); err != nil {
errs = append(errs, err)
}
startedWatches++
}
}
if startedWatches > 0 || stoppedWatches > 0 {
klog.Infof("The watch manager made new progress: started %d new watches, and stopped %d watches", startedWatches, stoppedWatches)
} else {
klog.V(4).Infof("The watch manager made no new progress")
}
if len(errs) == 0 {
return nil
}
return errs[0]
}
// watchedGVKs returns a list of all GroupVersionKinds currently being watched.
func (m *Manager) watchedGVKs() []schema.GroupVersionKind {
var gvks []schema.GroupVersionKind
for gvk := range m.watcherMap {
gvks = append(gvks, gvk)
}
return gvks
}
// startWatcher starts a watcher for a GVK. This function is NOT threadsafe;
// caller must have a lock on m.mux.
func (m *Manager) startWatcher(ctx context.Context, gvk schema.GroupVersionKind) error {
_, found := m.watcherMap[gvk]
if found {
// The watcher is already started.
return nil
}
cfg := watcherConfig{
gvk: gvk,
mapper: m.mapper,
config: m.cfg,
channel: m.channel,
resources: m.resources,
}
w, err := m.createWatcherFunc(ctx, cfg)
if err != nil {
return err
}
m.watcherMap[gvk] = w
go m.runWatcher(ctx, w, gvk)
return nil
}
// runWatcher blocks until the given watcher finishes running. This function is
// threadsafe.
func (m *Manager) runWatcher(ctx context.Context, r Runnable, gvk schema.GroupVersionKind) {
if err := r.Run(ctx); err != nil {
klog.Warningf("Error running watcher for %s: %v", gvk.String(), err)
m.mux.Lock()
delete(m.watcherMap, gvk)
m.needsUpdate = true
m.mux.Unlock()
}
}
// stopWatcher stops a watcher for a GVK. This function is NOT threadsafe;
// caller must have a lock on m.mux.
func (m *Manager) stopWatcher(gvk schema.GroupVersionKind) {
w, found := m.watcherMap[gvk]
if !found {
// The watcher is already stopped.
return
}
// Stop the watcher.
w.Stop()
delete(m.watcherMap, gvk)
}
// Len returns the number of types that are currently watched.
func (m *Manager) Len() int {
return len(m.watcherMap)
}
// IsWatched returns whether the given GVK is being watched
func (m *Manager) IsWatched(gvk schema.GroupVersionKind) bool {
_, found := m.watcherMap[gvk]
return found
}