forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
crd_finalizer.go
327 lines (281 loc) · 11.1 KB
/
crd_finalizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package finalizer
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
client "k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion"
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion"
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
)
// CRDFinalizer is a controller that finalizes the CRD by deleting all the CRs associated with it.
type CRDFinalizer struct {
crdClient client.CustomResourceDefinitionsGetter
crClientGetter CRClientGetter
crdLister listers.CustomResourceDefinitionLister
crdSynced cache.InformerSynced
// To allow injection for testing.
syncFn func(key string) error
queue workqueue.RateLimitingInterface
}
// ListerCollectionDeleter combines rest.Lister and rest.CollectionDeleter.
type ListerCollectionDeleter interface {
rest.Lister
rest.CollectionDeleter
}
// CRClientGetter knows how to get a ListerCollectionDeleter for a given CRD UID.
type CRClientGetter interface {
// GetCustomResourceListerCollectionDeleter gets the ListerCollectionDeleter for the given CRD
// UID.
GetCustomResourceListerCollectionDeleter(crd *apiextensions.CustomResourceDefinition) (ListerCollectionDeleter, error)
}
// NewCRDFinalizer creates a new CRDFinalizer.
func NewCRDFinalizer(
crdInformer informers.CustomResourceDefinitionInformer,
crdClient client.CustomResourceDefinitionsGetter,
crClientGetter CRClientGetter,
) *CRDFinalizer {
c := &CRDFinalizer{
crdClient: crdClient,
crdLister: crdInformer.Lister(),
crdSynced: crdInformer.Informer().HasSynced,
crClientGetter: crClientGetter,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CustomResourceDefinition-CRDFinalizer"),
}
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addCustomResourceDefinition,
UpdateFunc: c.updateCustomResourceDefinition,
})
c.syncFn = c.sync
return c
}
func (c *CRDFinalizer) sync(key string) error {
cachedCRD, err := c.crdLister.Get(key)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// no work to do
if cachedCRD.DeletionTimestamp.IsZero() || !apiextensions.CRDHasFinalizer(cachedCRD, apiextensions.CustomResourceCleanupFinalizer) {
return nil
}
crd := cachedCRD.DeepCopy()
// update the status condition. This cleanup could take a while.
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionTrue,
Reason: "InstanceDeletionInProgress",
Message: "CustomResource deletion is in progress",
})
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
if err != nil {
return err
}
// Now we can start deleting items. We should use the REST API to ensure that all normal admission runs.
// Since we control the endpoints, we know that delete collection works. No need to delete if not established.
if apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) {
cond, deleteErr := c.deleteInstances(crd)
apiextensions.SetCRDCondition(crd, cond)
if deleteErr != nil {
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
if err != nil {
utilruntime.HandleError(err)
}
return deleteErr
}
} else {
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionFalse,
Reason: "NeverEstablished",
Message: "resource was never established",
})
}
apiextensions.CRDRemoveFinalizer(crd, apiextensions.CustomResourceCleanupFinalizer)
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
if err != nil {
return err
}
// and now issue another delete, which should clean it all up if no finalizers remain or no-op if they do
return c.crdClient.CustomResourceDefinitions().Delete(crd.Name, nil)
}
func (c *CRDFinalizer) deleteInstances(crd *apiextensions.CustomResourceDefinition) (apiextensions.CustomResourceDefinitionCondition, error) {
// Now we can start deleting items. While it would be ideal to use a REST API client, doing so
// could incorrectly delete a ThirdPartyResource with the same URL as the CustomResource, so we go
// directly to the storage instead. Since we control the storage, we know that delete collection works.
crClient, err := c.crClientGetter.GetCustomResourceListerCollectionDeleter(crd)
if err != nil {
err = fmt.Errorf("unable to find a custom resource client for %s.%s: %v", crd.Status.AcceptedNames.Plural, crd.Spec.Group, err)
return apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionTrue,
Reason: "InstanceDeletionFailed",
Message: fmt.Sprintf("could not list instances: %v", err),
}, err
}
ctx := genericapirequest.NewContext()
allResources, err := crClient.List(ctx, nil)
if err != nil {
return apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionTrue,
Reason: "InstanceDeletionFailed",
Message: fmt.Sprintf("could not list instances: %v", err),
}, err
}
deletedNamespaces := sets.String{}
deleteErrors := []error{}
for _, item := range allResources.(*unstructured.UnstructuredList).Items {
metadata, err := meta.Accessor(&item)
if err != nil {
utilruntime.HandleError(err)
continue
}
if deletedNamespaces.Has(metadata.GetNamespace()) {
continue
}
// don't retry deleting the same namespace
deletedNamespaces.Insert(metadata.GetNamespace())
nsCtx := genericapirequest.WithNamespace(ctx, metadata.GetNamespace())
if _, err := crClient.DeleteCollection(nsCtx, nil, nil); err != nil {
deleteErrors = append(deleteErrors, err)
continue
}
}
if deleteError := utilerrors.NewAggregate(deleteErrors); deleteError != nil {
return apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionTrue,
Reason: "InstanceDeletionFailed",
Message: fmt.Sprintf("could not issue all deletes: %v", deleteError),
}, deleteError
}
// now we need to wait until all the resources are deleted. Start with a simple poll before we do anything fancy.
// TODO not all servers are synchronized on caches. It is possible for a stale one to still be creating things.
// Once we have a mechanism for servers to indicate their states, we should check that for concurrence.
err = wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
listObj, err := crClient.List(ctx, nil)
if err != nil {
return false, err
}
if len(listObj.(*unstructured.UnstructuredList).Items) == 0 {
return true, nil
}
glog.V(2).Infof("%s.%s waiting for %d items to be removed", crd.Status.AcceptedNames.Plural, crd.Spec.Group, len(listObj.(*unstructured.UnstructuredList).Items))
return false, nil
})
if err != nil {
return apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionTrue,
Reason: "InstanceDeletionCheck",
Message: fmt.Sprintf("could not confirm zero CustomResources remaining: %v", err),
}, err
}
return apiextensions.CustomResourceDefinitionCondition{
Type: apiextensions.Terminating,
Status: apiextensions.ConditionFalse,
Reason: "InstanceDeletionCompleted",
Message: "removed all instances",
}, nil
}
func (c *CRDFinalizer) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
glog.Infof("Starting CRDFinalizer")
defer glog.Infof("Shutting down CRDFinalizer")
if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (c *CRDFinalizer) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *CRDFinalizer) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncFn(key.(string))
if err == nil {
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *CRDFinalizer) enqueue(obj *apiextensions.CustomResourceDefinition) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err))
return
}
c.queue.Add(key)
}
func (c *CRDFinalizer) addCustomResourceDefinition(obj interface{}) {
castObj := obj.(*apiextensions.CustomResourceDefinition)
// only queue deleted things
if !castObj.DeletionTimestamp.IsZero() && apiextensions.CRDHasFinalizer(castObj, apiextensions.CustomResourceCleanupFinalizer) {
c.enqueue(castObj)
}
}
func (c *CRDFinalizer) updateCustomResourceDefinition(oldObj, newObj interface{}) {
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
// only queue deleted things that haven't been finalized by us
if newCRD.DeletionTimestamp.IsZero() || !apiextensions.CRDHasFinalizer(newCRD, apiextensions.CustomResourceCleanupFinalizer) {
return
}
// always requeue resyncs just in case
if oldCRD.ResourceVersion == newCRD.ResourceVersion {
c.enqueue(newCRD)
return
}
// If the only difference is in the terminating condition, then there's no reason to requeue here. This controller
// is likely to be the originator, so requeuing would hot-loop us. Failures are requeued by the workqueue directly.
// This is a low traffic and scale resource, so the copy is terrible. It's not good, so better ideas
// are welcome.
oldCopy := oldCRD.DeepCopy()
newCopy := newCRD.DeepCopy()
oldCopy.ResourceVersion = ""
newCopy.ResourceVersion = ""
apiextensions.RemoveCRDCondition(oldCopy, apiextensions.Terminating)
apiextensions.RemoveCRDCondition(newCopy, apiextensions.Terminating)
if !reflect.DeepEqual(oldCopy, newCopy) {
c.enqueue(newCRD)
}
}