-
Notifications
You must be signed in to change notification settings - Fork 461
/
generic_clientmap.go
277 lines (225 loc) · 8.99 KB
/
generic_clientmap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
// Copyright 2020 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
"golang.org/x/time/rate"
"k8s.io/utils/clock"
"github.com/gardener/gardener/pkg/client/kubernetes"
"github.com/gardener/gardener/pkg/client/kubernetes/clientmap"
)
var _ clientmap.ClientMap = &GenericClientMap{}
const waitForCacheSyncTimeout = 5 * time.Minute
// MaxRefreshInterval is the maximum rate at which the version and hash of a single ClientSet are checked, to
// decide whether the ClientSet should be refreshed. Also, the GenericClientMap waits at least MaxRefreshInterval
// after creating a new ClientSet before checking if it should be refreshed.
var MaxRefreshInterval = 5 * time.Second
// GenericClientMap is a generic implementation of clientmap.ClientMap, which can be used by specific ClientMap
// implementations to reuse the core logic for storing, requesting, invalidating and starting ClientSets. Specific
// implementations only need to provide a ClientSetFactory that can produce new ClientSets for the respective keys
// if a corresponding entry is not found in the GenericClientMap.
type GenericClientMap struct {
clientSets map[clientmap.ClientSetKey]*clientMapEntry
factory clientmap.ClientSetFactory
// lock guards concurrent access to clientSets
lock sync.RWMutex
log logr.Logger
clock clock.Clock
// stopCh is saved on the first call to Start and is used to start the caches of newly created ClientSets.
stopCh <-chan struct{}
started bool
}
// clientMapEntry is a single entry of the ClientMap.
type clientMapEntry struct {
clientSet kubernetes.Interface
synced bool
cancel context.CancelFunc
hash string
// refreshLimiter limits the attempts to refresh the entry due to an outdated ClientSetHash or server version.
refreshLimiter *rate.Limiter
}
// NewGenericClientMap creates a new GenericClientMap with the given factory and logger.
func NewGenericClientMap(factory clientmap.ClientSetFactory, logger logr.Logger, clock clock.Clock) *GenericClientMap {
return &GenericClientMap{
clientSets: make(map[clientmap.ClientSetKey]*clientMapEntry),
factory: factory,
log: logger,
clock: clock,
}
}
// GetClient requests a ClientSet for a cluster identified by the given key. If the ClientSet was already created before,
// it returns the one saved in the map, otherwise it creates a new ClientSet by using the provided ClientSetFactory.
// New ClientSets are immediately started if the ClientMap has already been started before. Also GetClient will regularly
// rediscover the server version of the targeted cluster and check if the config hash has changed and recreate the
// ClientSet if a config hash change is detected.
func (cm *GenericClientMap) GetClient(ctx context.Context, key clientmap.ClientSetKey) (kubernetes.Interface, error) {
entry, found := func() (*clientMapEntry, bool) {
cm.lock.RLock()
defer cm.lock.RUnlock()
entry, found := cm.clientSets[key]
return entry, found
}()
if found {
if entry.refreshLimiter.AllowN(cm.clock.Now(), 1) {
shouldRefresh, err := func() (bool, error) {
// invalidate client if the config of the client has changed (e.g. kubeconfig secret)
hash, err := cm.factory.CalculateClientSetHash(ctx, key)
if err != nil {
return false, fmt.Errorf("failed to calculate new hash for ClientSet: %w", err)
}
if hash != entry.hash {
cm.log.Info("Refreshing ClientSet due to changed ClientSetHash", "key", key.Key(), "oldHash", entry.hash, "newHash", hash)
return true, nil
}
// refresh server version
oldVersion := entry.clientSet.Version()
serverVersion, err := entry.clientSet.DiscoverVersion()
if err != nil {
return false, fmt.Errorf("failed to refresh ClientSet's server version: %w", err)
}
if serverVersion.GitVersion != oldVersion {
cm.log.Info("New server version discovered for ClientSet", "key", key.Key(), "serverVersion", serverVersion.GitVersion)
// client is intentionally not refreshed in this case, see https://github.com/gardener/gardener/pull/2581 for details
}
return false, nil
}()
if err != nil {
return nil, err
}
if shouldRefresh {
if err := cm.InvalidateClient(key); err != nil {
return nil, fmt.Errorf("error refreshing ClientSet for key %q: %w", key.Key(), err)
}
found = false
}
}
}
if !found {
var err error
if entry, err = cm.addClientSet(ctx, key); err != nil {
return nil, err
}
}
return entry.clientSet, nil
}
func (cm *GenericClientMap) addClientSet(ctx context.Context, key clientmap.ClientSetKey) (*clientMapEntry, error) {
cm.lock.Lock()
defer cm.lock.Unlock()
// ClientSet might have been created in the meanwhile (e.g. two goroutines might concurrently call
// GetClient() when the ClientSet is not yet created)
if entry, found := cm.clientSets[key]; found {
return entry, nil
}
cs, hash, err := cm.factory.NewClientSet(ctx, key)
if err != nil {
return nil, fmt.Errorf("error creating new ClientSet for key %q: %w", key.Key(), err)
}
cm.log.Info("Created new ClientSet", "key", key.Key(), "hash", hash)
entry := &clientMapEntry{
clientSet: cs,
refreshLimiter: rate.NewLimiter(rate.Every(MaxRefreshInterval), 1),
hash: hash,
}
// avoid checking if the client should be refreshed directly after creating, by directly taking a token here
entry.refreshLimiter.AllowN(cm.clock.Now(), 1)
// add ClientSet to map
cm.clientSets[key] = entry
// if ClientMap is not started, then don't automatically start new ClientSets
if cm.started {
if err := cm.startClientSet(key, entry); err != nil {
return nil, err
}
}
return entry, nil
}
// InvalidateClient removes the ClientSet identified by the given key from the ClientMap after stopping its cache.
func (cm *GenericClientMap) InvalidateClient(key clientmap.ClientSetKey) error {
cm.lock.Lock()
defer cm.lock.Unlock()
entry, found := cm.clientSets[key]
if !found {
return nil
}
cm.log.Info("Invalidating ClientSet", "key", key.Key())
if entry.cancel != nil {
entry.cancel()
}
delete(cm.clientSets, key)
if invalidate, ok := cm.factory.(clientmap.Invalidate); ok {
if err := invalidate.InvalidateClient(key); err != nil {
return err
}
}
return nil
}
// Start starts the caches of all contained ClientSets and saves the stopCh to start the caches of ClientSets,
// that will be created afterwards.
func (cm *GenericClientMap) Start(ctx context.Context) error {
cm.lock.Lock()
defer cm.lock.Unlock()
if cm.started {
return nil
}
cm.stopCh = ctx.Done()
// start any ClientSets that have been added before starting the ClientMap
// there will probably be only a garden client in here on startup, no other clients
for key, entry := range cm.clientSets {
// each call to startClientSet will also wait for the respective caches to sync.
// doing this in the loop here is not problematic, as
if err := cm.startClientSet(key, entry); err != nil {
return err
}
}
// set started to true, so we immediately start all newly created clientsets
cm.started = true
return nil
}
func (cm *GenericClientMap) startClientSet(key clientmap.ClientSetKey, entry *clientMapEntry) error {
clientSetContext, clientSetCancel := context.WithCancel(context.Background())
go func() {
select {
case <-clientSetContext.Done():
case <-cm.stopCh:
clientSetCancel()
}
}()
entry.cancel = clientSetCancel
entry.clientSet.Start(clientSetContext)
// limit the amount of time to wait for a cache sync, as this can block controller worker routines
// and we don't want to block all workers if it takes a long time to sync some caches
waitContext, cancel := context.WithTimeout(clientSetContext, waitForCacheSyncTimeout)
defer cancel()
// make sure the ClientSet has synced before returning
// callers of Start/GetClient expect that cached clients can be used immediately after retrieval from a started
// ClientMap or after starting the ClientMap respectively
if !waitForClientSetCacheSync(waitContext, entry) {
return fmt.Errorf("timed out waiting for caches of ClientSet with key %q to sync", key.Key())
}
return nil
}
func waitForClientSetCacheSync(ctx context.Context, entry *clientMapEntry) bool {
// don't need a lock here, as caller already holds lock
if entry.synced {
return true
}
if !entry.clientSet.WaitForCacheSync(ctx) {
return false
}
entry.synced = true
return true
}