/
graph_builder.go
906 lines (818 loc) · 32.7 KB
/
graph_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
type eventType int
func (e eventType) String() string {
switch e {
case addEvent:
return "add"
case updateEvent:
return "update"
case deleteEvent:
return "delete"
default:
return fmt.Sprintf("unknown(%d)", int(e))
}
}
const (
addEvent eventType = iota
updateEvent
deleteEvent
)
type event struct {
// virtual indicates this event did not come from an informer, but was constructed artificially
virtual bool
eventType eventType
obj interface{}
// the update event comes with an old object, but it's not used by the garbage collector.
oldObj interface{}
gvk schema.GroupVersionKind
}
// GraphBuilder processes events supplied by the informers, updates uidToNode,
// a graph that caches the dependencies as we know, and enqueues
// items to the attemptToDelete and attemptToOrphan.
type GraphBuilder struct {
restMapper meta.RESTMapper
// each monitor list/watches a resource, the results are funneled to the
// dependencyGraphBuilder
monitors monitors
monitorLock sync.RWMutex
// informersStarted is closed after after all of the controllers have been initialized and are running.
// After that it is safe to start them here, before that it is not.
informersStarted <-chan struct{}
// stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
// This channel is also protected by monitorLock.
stopCh <-chan struct{}
// running tracks whether Run() has been called.
// it is protected by monitorLock.
running bool
eventRecorder record.EventRecorder
metadataClient metadata.Interface
// monitors are the producer of the graphChanges queue, graphBuilder alters
// the in-memory graph according to the changes.
graphChanges workqueue.RateLimitingInterface
// uidToNode doesn't require a lock to protect, because only the
// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
uidToNode *concurrentUIDToNode
// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
attemptToDelete workqueue.RateLimitingInterface
attemptToOrphan workqueue.RateLimitingInterface
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
// be non-existent are added to the cached.
absentOwnerCache *ReferenceCache
sharedInformers informerfactory.InformerFactory
ignoredResources map[schema.GroupResource]struct{}
}
// monitor runs a Controller with a local stop channel.
type monitor struct {
controller cache.Controller
store cache.Store
// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
// not yet started.
stopCh chan struct{}
}
// Run is intended to be called in a goroutine. Multiple calls of this is an
// error.
func (m *monitor) Run() {
m.controller.Run(m.stopCh)
}
type monitors map[schema.GroupVersionResource]*monitor
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
event := &event{
eventType: addEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{
eventType: updateEvent,
obj: newObj,
oldObj: oldObj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
event := &event{
eventType: deleteEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err != nil {
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
return nil, nil, err
}
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
}
// syncMonitors rebuilds the monitor set according to the supplied resources,
// creating or deleting monitors as necessary. It will return any error
// encountered, but will make an attempt to create a monitor for each resource
// instead of immediately exiting on an error. It may be called before or after
// Run. Monitors are NOT started as part of the sync. To ensure all existing
// monitors are started, call startMonitors.
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
toRemove := gb.monitors
if toRemove == nil {
toRemove = monitors{}
}
current := monitors{}
errs := []error{}
kept := 0
added := 0
for resource := range resources {
if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
continue
}
if m, ok := toRemove[resource]; ok {
current[resource] = m
delete(toRemove, resource)
kept++
continue
}
kind, err := gb.restMapper.KindFor(resource)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
continue
}
c, s, err := gb.controllerFor(resource, kind)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
continue
}
current[resource] = &monitor{store: s, controller: c}
added++
}
gb.monitors = current
for _, monitor := range toRemove {
if monitor.stopCh != nil {
close(monitor.stopCh)
}
}
klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
// NewAggregate returns nil if errs is 0-length
return utilerrors.NewAggregate(errs)
}
// startMonitors ensures the current set of monitors are running. Any newly
// started monitors will also cause shared informers to be started.
//
// If called before Run, startMonitors does nothing (as there is no stop channel
// to support monitor/informer execution).
func (gb *GraphBuilder) startMonitors() {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
if !gb.running {
return
}
// we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
// that they don't get unexpected events on their work queues.
<-gb.informersStarted
monitors := gb.monitors
started := 0
for _, monitor := range monitors {
if monitor.stopCh == nil {
monitor.stopCh = make(chan struct{})
gb.sharedInformers.Start(gb.stopCh)
go monitor.Run()
started++
}
}
klog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors))
}
// IsSynced returns true if any monitors exist AND all those monitors'
// controllers HasSynced functions return true. This means IsSynced could return
// true at one time, and then later return false if all monitors were
// reconstructed.
func (gb *GraphBuilder) IsSynced() bool {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
if len(gb.monitors) == 0 {
klog.V(4).Info("garbage controller monitor not synced: no monitors")
return false
}
for resource, monitor := range gb.monitors {
if !monitor.controller.HasSynced() {
klog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
return false
}
}
return true
}
// Run sets the stop channel and starts monitor execution until stopCh is
// closed. Any running monitors will be stopped before Run returns.
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
klog.Infof("GraphBuilder running")
defer klog.Infof("GraphBuilder stopping")
// Set up the stop channel.
gb.monitorLock.Lock()
gb.stopCh = stopCh
gb.running = true
gb.monitorLock.Unlock()
// Start monitors and begin change processing until the stop channel is
// closed.
gb.startMonitors()
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
// Stop any running monitors.
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
monitors := gb.monitors
stopped := 0
for _, monitor := range monitors {
if monitor.stopCh != nil {
stopped++
close(monitor.stopCh)
}
}
// reset monitors so that the graph builder can be safely re-run/synced.
gb.monitors = nil
klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
}
var ignoredResources = map[schema.GroupResource]struct{}{
{Group: "", Resource: "events"}: {},
}
// DefaultIgnoredResources returns the default set of resources that the garbage collector controller
// should ignore. This is exposed so downstream integrators can have access to the defaults, and add
// to them as necessary when constructing the controller.
func DefaultIgnoredResources() map[schema.GroupResource]struct{} {
return ignoredResources
}
// enqueueVirtualDeleteEvent is used to add a virtual delete event to be processed for virtual nodes
// once it is determined they do not have backing objects in storage
func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
gv, _ := schema.ParseGroupVersion(ref.APIVersion)
gb.graphChanges.Add(&event{
virtual: true,
eventType: deleteEvent,
gvk: gv.WithKind(ref.Kind),
obj: &metaonly.MetadataOnlyObject{
TypeMeta: metav1.TypeMeta{APIVersion: ref.APIVersion, Kind: ref.Kind},
ObjectMeta: metav1.ObjectMeta{Namespace: ref.Namespace, UID: ref.UID, Name: ref.Name},
},
})
}
// addDependentToOwners adds n to owners' dependents list. If the owner does not
// exist in the gb.uidToNode yet, a "virtual" node will be created to represent
// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
// attemptToDeleteItem() will verify if the owner exists according to the API server.
func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
// track if some of the referenced owners already exist in the graph and have been observed,
// and the dependent's ownerRef does not match their observed coordinates
hasPotentiallyInvalidOwnerReference := false
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
// Create a "virtual" node in the graph for the owner if it doesn't
// exist in the graph yet.
ownerNode = &node{
identity: objectReference{
OwnerReference: ownerReferenceCoordinates(owner),
Namespace: n.identity.Namespace,
},
dependents: make(map[*node]struct{}),
virtual: true,
}
klog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
gb.uidToNode.Write(ownerNode)
}
ownerNode.addDependent(n)
if !ok {
// Enqueue the virtual node into attemptToDelete.
// The garbage processor will enqueue a virtual delete
// event to delete it from the graph if API server confirms this
// owner doesn't exist.
gb.attemptToDelete.Add(ownerNode)
} else if !hasPotentiallyInvalidOwnerReference {
ownerIsNamespaced := len(ownerNode.identity.Namespace) > 0
if ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace {
if ownerNode.isObserved() {
// The owner node has been observed via an informer
// the dependent's namespace doesn't match the observed owner's namespace, this is definitely wrong.
// cluster-scoped owners can be referenced as an owner from any namespace or cluster-scoped object.
klog.V(2).Infof("node %s references an owner %s but does not match namespaces", n.identity, ownerNode.identity)
gb.reportInvalidNamespaceOwnerRef(n, owner.UID)
}
hasPotentiallyInvalidOwnerReference = true
} else if !ownerReferenceMatchesCoordinates(owner, ownerNode.identity.OwnerReference) {
if ownerNode.isObserved() {
// The owner node has been observed via an informer
// n's owner reference doesn't match the observed identity, this might be wrong.
klog.V(2).Infof("node %s references an owner %s with coordinates that do not match the observed identity", n.identity, ownerNode.identity)
}
hasPotentiallyInvalidOwnerReference = true
} else if !ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace && !ownerNode.isObserved() {
// the ownerNode is cluster-scoped and virtual, and does not match the child node's namespace.
// the owner could be a missing instance of a namespaced type incorrectly referenced by a cluster-scoped child (issue #98040).
// enqueue this child to attemptToDelete to verify parent references.
hasPotentiallyInvalidOwnerReference = true
}
}
}
if hasPotentiallyInvalidOwnerReference {
// Enqueue the potentially invalid dependent node into attemptToDelete.
// The garbage processor will verify whether the owner references are dangling
// and delete the dependent if all owner references are confirmed absent.
gb.attemptToDelete.Add(n)
}
}
func (gb *GraphBuilder) reportInvalidNamespaceOwnerRef(n *node, invalidOwnerUID types.UID) {
var invalidOwnerRef metav1.OwnerReference
var found = false
for _, ownerRef := range n.owners {
if ownerRef.UID == invalidOwnerUID {
invalidOwnerRef = ownerRef
found = true
break
}
}
if !found {
return
}
ref := &v1.ObjectReference{
Kind: n.identity.Kind,
APIVersion: n.identity.APIVersion,
Namespace: n.identity.Namespace,
Name: n.identity.Name,
UID: n.identity.UID,
}
invalidIdentity := objectReference{
OwnerReference: metav1.OwnerReference{
Kind: invalidOwnerRef.Kind,
APIVersion: invalidOwnerRef.APIVersion,
Name: invalidOwnerRef.Name,
UID: invalidOwnerRef.UID,
},
Namespace: n.identity.Namespace,
}
gb.eventRecorder.Eventf(ref, v1.EventTypeWarning, "OwnerRefInvalidNamespace", "ownerRef %s does not exist in namespace %q", invalidIdentity, n.identity.Namespace)
}
// insertNode insert the node to gb.uidToNode; then it finds all owners as listed
// in n.owners, and adds the node to their dependents list.
func (gb *GraphBuilder) insertNode(n *node) {
gb.uidToNode.Write(n)
gb.addDependentToOwners(n, n.owners)
}
// removeDependentFromOwners remove n from owners' dependents list.
func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
continue
}
ownerNode.deleteDependent(n)
}
}
// removeNode removes the node from gb.uidToNode, then finds all
// owners as listed in n.owners, and removes n from their dependents list.
func (gb *GraphBuilder) removeNode(n *node) {
gb.uidToNode.Delete(n.identity.UID)
gb.removeDependentFromOwners(n, n.owners)
}
type ownerRefPair struct {
oldRef metav1.OwnerReference
newRef metav1.OwnerReference
}
// TODO: profile this function to see if a naive N^2 algorithm performs better
// when the number of references is small.
func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
oldUIDToRef := make(map[string]metav1.OwnerReference)
for _, value := range old {
oldUIDToRef[string(value.UID)] = value
}
oldUIDSet := sets.StringKeySet(oldUIDToRef)
for _, value := range new {
newUID := string(value.UID)
if oldUIDSet.Has(newUID) {
if !reflect.DeepEqual(oldUIDToRef[newUID], value) {
changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[newUID], newRef: value})
}
oldUIDSet.Delete(newUID)
} else {
added = append(added, value)
}
}
for oldUID := range oldUIDSet {
removed = append(removed, oldUIDToRef[oldUID])
}
return added, removed, changed
}
func deletionStartsWithFinalizer(oldObj interface{}, newAccessor metav1.Object, matchingFinalizer string) bool {
// if the new object isn't being deleted, or doesn't have the finalizer we're interested in, return false
if !beingDeleted(newAccessor) || !hasFinalizer(newAccessor, matchingFinalizer) {
return false
}
// if the old object is nil, or wasn't being deleted, or didn't have the finalizer, return true
if oldObj == nil {
return true
}
oldAccessor, err := meta.Accessor(oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
return false
}
return !beingDeleted(oldAccessor) || !hasFinalizer(oldAccessor, matchingFinalizer)
}
func beingDeleted(accessor metav1.Object) bool {
return accessor.GetDeletionTimestamp() != nil
}
func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
return hasFinalizer(accessor, metav1.FinalizerDeleteDependents)
}
func hasOrphanFinalizer(accessor metav1.Object) bool {
return hasFinalizer(accessor, metav1.FinalizerOrphanDependents)
}
func hasFinalizer(accessor metav1.Object, matchingFinalizer string) bool {
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == matchingFinalizer {
return true
}
}
return false
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerDeleteDependents)
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerOrphanDependents)
}
// if an blocking ownerReference points to an object gets removed, or gets set to
// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) {
for _, ref := range removed {
if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
node, found := gb.uidToNode.Read(ref.UID)
if !found {
klog.V(5).Infof("cannot find %s in uidToNode", ref.UID)
continue
}
gb.attemptToDelete.Add(node)
}
}
for _, c := range changed {
wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
if wasBlocked && isUnblocked {
node, found := gb.uidToNode.Read(c.newRef.UID)
if !found {
klog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID)
continue
}
gb.attemptToDelete.Add(node)
}
}
}
func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
klog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
gb.attemptToOrphan.Add(n)
return
}
if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
klog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
n.markDeletingDependents()
for dep := range n.dependents {
gb.attemptToDelete.Add(dep)
}
gb.attemptToDelete.Add(n)
}
}
func (gb *GraphBuilder) runProcessGraphChanges() {
for gb.processGraphChanges() {
}
}
func identityFromEvent(event *event, accessor metav1.Object) objectReference {
return objectReference{
OwnerReference: metav1.OwnerReference{
APIVersion: event.gvk.GroupVersion().String(),
Kind: event.gvk.Kind,
UID: accessor.GetUID(),
Name: accessor.GetName(),
},
Namespace: accessor.GetNamespace(),
}
}
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
func (gb *GraphBuilder) processGraphChanges() bool {
item, quit := gb.graphChanges.Get()
if quit {
return false
}
defer gb.graphChanges.Done(item)
event, ok := item.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
return true
}
obj := event.obj
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
klog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v, virtual=%v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType, event.virtual)
// Check if the node already exists
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
if found && !event.virtual && !existingNode.isObserved() {
// this marks the node as having been observed via an informer event
// 1. this depends on graphChanges only containing add/update events from the actual informer
// 2. this allows things tracking virtual nodes' existence to stop polling and rely on informer events
observedIdentity := identityFromEvent(event, accessor)
if observedIdentity != existingNode.identity {
// find dependents that don't match the identity we observed
_, potentiallyInvalidDependents := partitionDependents(existingNode.getDependents(), observedIdentity)
// add those potentially invalid dependents to the attemptToDelete queue.
// if their owners are still solid the attemptToDelete will be a no-op.
// this covers the bad child -> good parent observation sequence.
// the good parent -> bad child observation sequence is handled in addDependentToOwners
for _, dep := range potentiallyInvalidDependents {
if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace {
// Namespace mismatch, this is definitely wrong
klog.V(2).Infof("node %s references an owner %s but does not match namespaces", dep.identity, observedIdentity)
gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID)
}
gb.attemptToDelete.Add(dep)
}
// make a copy (so we don't modify the existing node in place), store the observed identity, and replace the virtual node
klog.V(2).Infof("replacing virtual node %s with observed node %s", existingNode.identity, observedIdentity)
existingNode = existingNode.clone()
existingNode.identity = observedIdentity
gb.uidToNode.Write(existingNode)
}
existingNode.markObserved()
}
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
identity: identityFromEvent(event, accessor),
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
beingDeleted: beingDeleted(accessor),
}
gb.insertNode(newNode)
// the underlying delta_fifo may combine a creation and a deletion into
// one event, so we need to further process the event.
gb.processTransitions(event.oldObj, accessor, newNode)
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// handle changes in ownerReferences
added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
// check if the changed dependency graph unblock owners that are
// waiting for the deletion of their dependents.
gb.addUnblockedOwnersToDeleteQueue(removed, changed)
// update the node itself
existingNode.owners = accessor.GetOwnerReferences()
// Add the node to its new owners' dependent lists.
gb.addDependentToOwners(existingNode, added)
// remove the node from the dependent list of node that are no longer in
// the node's owners list.
gb.removeDependentFromOwners(existingNode, removed)
}
if beingDeleted(accessor) {
existingNode.markBeingDeleted()
}
gb.processTransitions(event.oldObj, accessor, existingNode)
case event.eventType == deleteEvent:
if !found {
klog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
return true
}
removeExistingNode := true
if event.virtual {
// this is a virtual delete event, not one observed from an informer
deletedIdentity := identityFromEvent(event, accessor)
if existingNode.virtual {
// our existing node is also virtual, we're not sure of its coordinates.
// see if any dependents reference this owner with coordinates other than the one we got a virtual delete event for.
if matchingDependents, nonmatchingDependents := partitionDependents(existingNode.getDependents(), deletedIdentity); len(nonmatchingDependents) > 0 {
// some of our dependents disagree on our coordinates, so do not remove the existing virtual node from the graph
removeExistingNode = false
if len(matchingDependents) > 0 {
// mark the observed deleted identity as absent
gb.absentOwnerCache.Add(deletedIdentity)
// attempt to delete dependents that do match the verified deleted identity
for _, dep := range matchingDependents {
gb.attemptToDelete.Add(dep)
}
}
// if the delete event verified existingNode.identity doesn't exist...
if existingNode.identity == deletedIdentity {
// find an alternative identity our nonmatching dependents refer to us by
replacementIdentity := getAlternateOwnerIdentity(nonmatchingDependents, deletedIdentity)
if replacementIdentity != nil {
// replace the existing virtual node with a new one with one of our other potential identities
replacementNode := existingNode.clone()
replacementNode.identity = *replacementIdentity
gb.uidToNode.Write(replacementNode)
// and add the new virtual node back to the attemptToDelete queue
gb.attemptToDelete.AddRateLimited(replacementNode)
}
}
}
} else if existingNode.identity != deletedIdentity {
// do not remove the existing real node from the graph based on a virtual delete event
removeExistingNode = false
// our existing node which was observed via informer disagrees with the virtual delete event's coordinates
matchingDependents, _ := partitionDependents(existingNode.getDependents(), deletedIdentity)
if len(matchingDependents) > 0 {
// mark the observed deleted identity as absent
gb.absentOwnerCache.Add(deletedIdentity)
// attempt to delete dependents that do match the verified deleted identity
for _, dep := range matchingDependents {
gb.attemptToDelete.Add(dep)
}
}
}
}
if removeExistingNode {
// removeNode updates the graph
gb.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
if len(existingNode.dependents) > 0 {
gb.absentOwnerCache.Add(identityFromEvent(event, accessor))
}
for dep := range existingNode.dependents {
gb.attemptToDelete.Add(dep)
}
for _, owner := range existingNode.owners {
ownerNode, found := gb.uidToNode.Read(owner.UID)
if !found || !ownerNode.isDeletingDependents() {
continue
}
// this is to let attempToDeleteItem check if all the owner's
// dependents are deleted, if so, the owner will be deleted.
gb.attemptToDelete.Add(ownerNode)
}
}
}
return true
}
// partitionDependents divides the provided dependents into a list which have an ownerReference matching the provided identity,
// and ones which have an ownerReference for the given uid that do not match the provided identity.
// Note that a dependent with multiple ownerReferences for the target uid can end up in both lists.
func partitionDependents(dependents []*node, matchOwnerIdentity objectReference) (matching, nonmatching []*node) {
ownerIsNamespaced := len(matchOwnerIdentity.Namespace) > 0
for i := range dependents {
dep := dependents[i]
foundMatch := false
foundMismatch := false
// if the dep namespace matches or the owner is cluster scoped ...
if ownerIsNamespaced && matchOwnerIdentity.Namespace != dep.identity.Namespace {
// all references to the parent do not match, since the dependent namespace does not match the owner
foundMismatch = true
} else {
for _, ownerRef := range dep.owners {
// ... find the ownerRef with a matching uid ...
if ownerRef.UID == matchOwnerIdentity.UID {
// ... and check if it matches all coordinates
if ownerReferenceMatchesCoordinates(ownerRef, matchOwnerIdentity.OwnerReference) {
foundMatch = true
} else {
foundMismatch = true
}
}
}
}
if foundMatch {
matching = append(matching, dep)
}
if foundMismatch {
nonmatching = append(nonmatching, dep)
}
}
return matching, nonmatching
}
func referenceLessThan(a, b objectReference) bool {
// kind/apiVersion are more significant than namespace,
// so that we get coherent ordering between kinds
// regardless of whether they are cluster-scoped or namespaced
if a.Kind != b.Kind {
return a.Kind < b.Kind
}
if a.APIVersion != b.APIVersion {
return a.APIVersion < b.APIVersion
}
// namespace is more significant than name
if a.Namespace != b.Namespace {
return a.Namespace < b.Namespace
}
// name is more significant than uid
if a.Name != b.Name {
return a.Name < b.Name
}
// uid is included for completeness, but is expected to be identical
// when getting alternate identities for an owner since they are keyed by uid
if a.UID != b.UID {
return a.UID < b.UID
}
return false
}
// getAlternateOwnerIdentity searches deps for owner references which match
// verifiedAbsentIdentity.UID but differ in apiVersion/kind/name or namespace.
// The first that follows verifiedAbsentIdentity (according to referenceLessThan) is returned.
// If none follow verifiedAbsentIdentity, the first (according to referenceLessThan) is returned.
// If no alternate identities are found, nil is returned.
func getAlternateOwnerIdentity(deps []*node, verifiedAbsentIdentity objectReference) *objectReference {
absentIdentityIsClusterScoped := len(verifiedAbsentIdentity.Namespace) == 0
seenAlternates := map[objectReference]bool{verifiedAbsentIdentity: true}
// keep track of the first alternate reference (according to referenceLessThan)
var first *objectReference
// keep track of the first reference following verifiedAbsentIdentity (according to referenceLessThan)
var firstFollowing *objectReference
for _, dep := range deps {
for _, ownerRef := range dep.owners {
if ownerRef.UID != verifiedAbsentIdentity.UID {
// skip references that aren't the uid we care about
continue
}
if ownerReferenceMatchesCoordinates(ownerRef, verifiedAbsentIdentity.OwnerReference) {
if absentIdentityIsClusterScoped || verifiedAbsentIdentity.Namespace == dep.identity.Namespace {
// skip references that exactly match verifiedAbsentIdentity
continue
}
}
ref := objectReference{OwnerReference: ownerReferenceCoordinates(ownerRef), Namespace: dep.identity.Namespace}
if absentIdentityIsClusterScoped && ref.APIVersion == verifiedAbsentIdentity.APIVersion && ref.Kind == verifiedAbsentIdentity.Kind {
// we know this apiVersion/kind is cluster-scoped because of verifiedAbsentIdentity,
// so clear the namespace from the alternate identity
ref.Namespace = ""
}
if seenAlternates[ref] {
// skip references we've already seen
continue
}
seenAlternates[ref] = true
if first == nil || referenceLessThan(ref, *first) {
// this alternate comes first lexically
first = &ref
}
if referenceLessThan(verifiedAbsentIdentity, ref) && (firstFollowing == nil || referenceLessThan(ref, *firstFollowing)) {
// this alternate is the first following verifiedAbsentIdentity lexically
firstFollowing = &ref
}
}
}
// return the first alternate identity following the verified absent identity, if there is one
if firstFollowing != nil {
return firstFollowing
}
// otherwise return the first alternate identity
return first
}