/
apply_controller.go
692 lines (628 loc) · 27.4 KB
/
apply_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
package workv1alpha1
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"time"
"go.uber.org/atomic"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
)
const (
workFieldManagerName = "work-api-agent"
)
// ApplyWorkReconciler reconciles a Work object
type ApplyWorkReconciler struct {
client client.Client
spokeDynamicClient dynamic.Interface
spokeClient client.Client
restMapper meta.RESTMapper
recorder record.EventRecorder
concurrency int
workNameSpace string
joined *atomic.Bool
}
func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client,
restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, workNameSpace string) *ApplyWorkReconciler {
return &ApplyWorkReconciler{
client: hubClient,
spokeDynamicClient: spokeDynamicClient,
spokeClient: spokeClient,
restMapper: restMapper,
recorder: recorder,
concurrency: concurrency,
workNameSpace: workNameSpace,
joined: atomic.NewBool(false),
}
}
// applyAction represents the action we take to apply the manifest
// +enum
type applyAction string
const (
// ManifestCreatedAction indicates that we created the manifest for the first time.
ManifestCreatedAction applyAction = "ManifestCreated"
// ManifestThreeWayMergePatchAction indicates that we updated the manifest using three-way merge patch.
ManifestThreeWayMergePatchAction applyAction = "ManifestThreeWayMergePatched"
// ManifestServerSideAppliedAction indicates that we updated the manifest using server side apply.
ManifestServerSideAppliedAction applyAction = "ManifestServerSideApplied"
// ManifestNoChangeAction indicates that we don't need to change the manifest.
ManifestNoChangeAction applyAction = "ManifestNoChange"
)
// applyResult contains the result of a manifest being applied.
type applyResult struct {
identifier workv1alpha1.ResourceIdentifier
generation int64
action applyAction
err error
}
// Reconcile implement the control loop logic for Work object.
func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !r.joined.Load() {
klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName)
// Fetch the work resource
work := &workv1alpha1.Work{}
err := r.client.Get(ctx, req.NamespacedName, work)
switch {
case apierrors.IsNotFound(err):
klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName)
return ctrl.Result{}, nil
case err != nil:
klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName)
return ctrl.Result{}, err
}
logObjRef := klog.KObj(work)
// Handle deleting work, garbage collect the resources
if !work.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("resource is in the process of being deleted", work.Kind, logObjRef)
return r.garbageCollectAppliedWork(ctx, work)
}
// ensure that the appliedWork and the finalizer exist
appliedWork, err := r.ensureAppliedWork(ctx, work)
if err != nil {
return ctrl.Result{}, err
}
owner := metav1.OwnerReference{
APIVersion: workv1alpha1.GroupVersion.String(),
Kind: workv1alpha1.AppliedWorkKind,
Name: appliedWork.GetName(),
UID: appliedWork.GetUID(),
BlockOwnerDeletion: ptr.To(false),
}
// apply the manifests to the member cluster
results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner)
// collect the latency from the work update time to now.
lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
if ok {
workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
if parseErr != nil {
klog.ErrorS(parseErr, "failed to parse the last work update time", "work", logObjRef)
} else {
latency := time.Since(workUpdateTime)
metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
klog.V(2).InfoS("work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
}
} else {
klog.V(2).InfoS("work has no last update time", "work", work.GetName())
}
// generate the work condition based on the manifest apply result
errs := r.generateWorkCondition(results, work)
// update the work status
if err = r.client.Status().Update(ctx, work, &client.SubResourceUpdateOptions{}); err != nil {
klog.ErrorS(err, "failed to update work status", "work", logObjRef)
return ctrl.Result{}, err
}
if len(errs) == 0 {
klog.InfoS("successfully applied the work to the cluster", "work", logObjRef)
r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully")
}
// now we sync the status from work to appliedWork no matter if apply succeeds or not
newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork)
if genErr != nil {
klog.ErrorS(err, "failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef)
return ctrl.Result{}, err
}
// delete all the manifests that should not be in the cluster.
if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil {
klog.ErrorS(err, "resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef)
// we can't proceed to update the applied
return ctrl.Result{}, err
} else if len(staleRes) > 0 {
klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes))
for _, res := range staleRes {
klog.V(2).InfoS("successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res)
}
}
// update the appliedWork with the new work after the stales are deleted
appliedWork.Status.AppliedResources = newRes
if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.SubResourceUpdateOptions{}); err != nil {
klog.ErrorS(err, "failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName())
return ctrl.Result{}, err
}
err = utilerrors.NewAggregate(errs)
if err != nil {
klog.ErrorS(err, "manifest apply incomplete; the message is queued again for reconciliation",
"work", logObjRef)
}
// we periodically reconcile the work to make sure the member cluster state is in sync with the work
// even if the reconciling succeeds in case the resources on the member cluster is removed/changed.
return ctrl.Result{RequeueAfter: time.Minute * 5}, err
}
// garbageCollectAppliedWork deletes the appliedWork and all the manifests associated with it from the cluster.
func (r *ApplyWorkReconciler) garbageCollectAppliedWork(ctx context.Context, work *workv1alpha1.Work) (ctrl.Result, error) {
deletePolicy := metav1.DeletePropagationBackground
if !controllerutil.ContainsFinalizer(work, workFinalizer) {
return ctrl.Result{}, nil
}
// delete the appliedWork which will remove all the manifests associated with it
// TODO: allow orphaned manifest
appliedWork := workv1alpha1.AppliedWork{
ObjectMeta: metav1.ObjectMeta{Name: work.Name},
}
err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy})
switch {
case apierrors.IsNotFound(err):
klog.V(2).InfoS("the appliedWork is already deleted", "appliedWork", work.Name)
case err != nil:
klog.ErrorS(err, "failed to delete the appliedWork", "appliedWork", work.Name)
return ctrl.Result{}, err
default:
klog.InfoS("successfully deleted the appliedWork", "appliedWork", work.Name)
}
controllerutil.RemoveFinalizer(work, workFinalizer)
return ctrl.Result{}, r.client.Update(ctx, work, &client.UpdateOptions{})
}
// ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exsits on the cluster.
func (r *ApplyWorkReconciler) ensureAppliedWork(ctx context.Context, work *workv1alpha1.Work) (*workv1alpha1.AppliedWork, error) {
workRef := klog.KObj(work)
appliedWork := &workv1alpha1.AppliedWork{}
hasFinalizer := false
if controllerutil.ContainsFinalizer(work, workFinalizer) {
hasFinalizer = true
err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork)
switch {
case apierrors.IsNotFound(err):
klog.ErrorS(err, "appliedWork finalizer resource does not exist even with the finalizer, it will be recreated", "appliedWork", workRef.Name)
case err != nil:
klog.ErrorS(err, "failed to retrieve the appliedWork ", "appliedWork", workRef.Name)
return nil, err
default:
return appliedWork, nil
}
}
// we create the appliedWork before setting the finalizer, so it should always exist unless it's deleted behind our back
appliedWork = &workv1alpha1.AppliedWork{
ObjectMeta: metav1.ObjectMeta{
Name: work.Name,
},
Spec: workv1alpha1.AppliedWorkSpec{
WorkName: work.Name,
WorkNamespace: work.Namespace,
},
}
if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "appliedWork create failed", "appliedWork", workRef.Name)
return nil, err
}
if !hasFinalizer {
klog.InfoS("add the finalizer to the work", "work", workRef)
work.Finalizers = append(work.Finalizers, workFinalizer)
return appliedWork, r.client.Update(ctx, work, &client.UpdateOptions{})
}
klog.InfoS("recreated the appliedWork resource", "appliedWork", workRef.Name)
return appliedWork, nil
}
// applyManifests processes a given set of Manifests by: setting ownership, validating the manifest, and passing it on for application to the cluster.
func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []workv1alpha1.Manifest, owner metav1.OwnerReference) []applyResult {
var appliedObj *unstructured.Unstructured
results := make([]applyResult, len(manifests))
for index, manifest := range manifests {
var result applyResult
gvr, rawObj, err := r.decodeManifest(manifest)
switch {
case err != nil:
result.err = err
result.identifier = workv1alpha1.ResourceIdentifier{
Ordinal: index,
}
if rawObj != nil {
result.identifier.Group = rawObj.GroupVersionKind().Group
result.identifier.Version = rawObj.GroupVersionKind().Version
result.identifier.Kind = rawObj.GroupVersionKind().Kind
result.identifier.Namespace = rawObj.GetNamespace()
result.identifier.Name = rawObj.GetName()
}
default:
addOwnerRef(owner, rawObj)
appliedObj, result.action, result.err = r.applyUnstructured(ctx, gvr, rawObj)
result.identifier = buildResourceIdentifier(index, rawObj, gvr)
logObjRef := klog.ObjectRef{
Name: result.identifier.Name,
Namespace: result.identifier.Namespace,
}
if result.err == nil {
result.generation = appliedObj.GetGeneration()
klog.V(2).InfoS("apply manifest succeeded", "gvr", gvr, "manifest", logObjRef,
"apply action", result.action, "new ObservedGeneration", result.generation)
} else {
klog.ErrorS(result.err, "manifest upsert failed", "gvr", gvr, "manifest", logObjRef)
}
}
results[index] = result
}
return results
}
// Decodes the manifest into usable structs.
func (r *ApplyWorkReconciler) decodeManifest(manifest workv1alpha1.Manifest) (schema.GroupVersionResource, *unstructured.Unstructured, error) {
unstructuredObj := &unstructured.Unstructured{}
err := unstructuredObj.UnmarshalJSON(manifest.Raw)
if err != nil {
return schema.GroupVersionResource{}, nil, fmt.Errorf("failed to decode object: %w", err)
}
mapping, err := r.restMapper.RESTMapping(unstructuredObj.GroupVersionKind().GroupKind(), unstructuredObj.GroupVersionKind().Version)
if err != nil {
return schema.GroupVersionResource{}, unstructuredObj, fmt.Errorf("failed to find group/version/resource from restmapping: %w", err)
}
return mapping.Resource, unstructuredObj, nil
}
// applyUnstructured determines if an unstructured manifest object can & should be applied. It first validates
// the size of the last modified annotation of the manifest, it removes the annotation if the size crosses the annotation size threshold
// and then creates/updates the resource on the cluster using server side apply instead of three-way merge patch.
func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.GroupVersionResource,
manifestObj *unstructured.Unstructured) (*unstructured.Unstructured, applyAction, error) {
manifestRef := klog.ObjectRef{
Name: manifestObj.GetName(),
Namespace: manifestObj.GetNamespace(),
}
// compute the hash without taking into consider the last applied annotation
if err := setManifestHashAnnotation(manifestObj); err != nil {
return nil, ManifestNoChangeAction, err
}
// extract the common create procedure to reuse
var createFunc = func() (*unstructured.Unstructured, applyAction, error) {
// record the raw manifest with the hash annotation in the manifest
if _, err := setModifiedConfigurationAnnotation(manifestObj); err != nil {
return nil, ManifestNoChangeAction, err
}
actual, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Create(
ctx, manifestObj, metav1.CreateOptions{FieldManager: workFieldManagerName})
if err == nil {
klog.V(2).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef)
return actual, ManifestCreatedAction, nil
}
return nil, ManifestNoChangeAction, err
}
// support resources with generated name
if manifestObj.GetName() == "" && manifestObj.GetGenerateName() != "" {
klog.InfoS("create the resource with generated name regardless", "gvr", gvr, "manifest", manifestRef)
return createFunc()
}
// get the current object and create one if not found
curObj, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Get(ctx, manifestObj.GetName(), metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
return createFunc()
case err != nil:
return nil, ManifestNoChangeAction, err
}
// check if the existing manifest is managed by the work
if !isManifestManagedByWork(curObj.GetOwnerReferences()) {
err = fmt.Errorf("resource is not managed by the work controller")
klog.ErrorS(err, "skip applying a not managed manifest", "gvr", gvr, "obj", manifestRef)
return nil, ManifestNoChangeAction, err
}
// We only try to update the object if its spec hash value has changed.
if manifestObj.GetAnnotations()[manifestHashAnnotation] != curObj.GetAnnotations()[manifestHashAnnotation] {
// we need to merge the owner reference between the current and the manifest since we support one manifest
// belong to multiple work, so it contains the union of all the appliedWork.
manifestObj.SetOwnerReferences(mergeOwnerReference(curObj.GetOwnerReferences(), manifestObj.GetOwnerReferences()))
// record the raw manifest with the hash annotation in the manifest.
isModifiedConfigAnnotationNotEmpty, err := setModifiedConfigurationAnnotation(manifestObj)
if err != nil {
return nil, ManifestNoChangeAction, err
}
if !isModifiedConfigAnnotationNotEmpty {
klog.V(2).InfoS("using server side apply for manifest", "gvr", gvr, "manifest", manifestRef)
return r.applyObject(ctx, gvr, manifestObj)
}
klog.V(2).InfoS("using three way merge for manifest", "gvr", gvr, "manifest", manifestRef)
return r.patchCurrentResource(ctx, gvr, manifestObj, curObj)
}
return curObj, ManifestNoChangeAction, nil
}
// applyObject uses server side apply to apply the manifest.
func (r *ApplyWorkReconciler) applyObject(ctx context.Context, gvr schema.GroupVersionResource,
manifestObj *unstructured.Unstructured) (*unstructured.Unstructured, applyAction, error) {
manifestRef := klog.ObjectRef{
Name: manifestObj.GetName(),
Namespace: manifestObj.GetNamespace(),
}
options := metav1.ApplyOptions{
FieldManager: workFieldManagerName,
Force: true,
}
manifestObj, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Apply(ctx, manifestObj.GetName(), manifestObj, options)
if err != nil {
klog.ErrorS(err, "failed to apply object", "gvr", gvr, "manifest", manifestRef)
return nil, ManifestNoChangeAction, err
}
klog.V(2).InfoS("manifest apply succeeded", "gvr", gvr, "manifest", manifestRef)
return manifestObj, ManifestServerSideAppliedAction, nil
}
// patchCurrentResource uses three-way merge to patch the current resource with the new manifest we get from the work.
func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr schema.GroupVersionResource,
manifestObj, curObj *unstructured.Unstructured) (*unstructured.Unstructured, applyAction, error) {
manifestRef := klog.ObjectRef{
Name: manifestObj.GetName(),
Namespace: manifestObj.GetNamespace(),
}
klog.V(2).InfoS("manifest is modified", "gvr", gvr, "manifest", manifestRef,
"new hash", manifestObj.GetAnnotations()[manifestHashAnnotation],
"existing hash", curObj.GetAnnotations()[manifestHashAnnotation])
// create the three-way merge patch between the current, original and manifest similar to how kubectl apply does
patch, err := threeWayMergePatch(curObj, manifestObj)
if err != nil {
klog.ErrorS(err, "failed to generate the three way patch", "gvr", gvr, "manifest", manifestRef)
return nil, ManifestNoChangeAction, err
}
data, err := patch.Data(manifestObj)
if err != nil {
klog.ErrorS(err, "failed to generate the three way patch", "gvr", gvr, "manifest", manifestRef)
return nil, ManifestNoChangeAction, err
}
// Use client side apply the patch to the member cluster
manifestObj, patchErr := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).
Patch(ctx, manifestObj.GetName(), patch.Type(), data, metav1.PatchOptions{FieldManager: workFieldManagerName})
if patchErr != nil {
klog.ErrorS(patchErr, "failed to patch the manifest", "gvr", gvr, "manifest", manifestRef)
return nil, ManifestNoChangeAction, patchErr
}
klog.V(2).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef)
return manifestObj, ManifestThreeWayMergePatchAction, nil
}
// generateWorkCondition constructs the work condition based on the apply result
func (r *ApplyWorkReconciler) generateWorkCondition(results []applyResult, work *workv1alpha1.Work) []error {
var errs []error
// Update manifestCondition based on the results.
manifestConditions := make([]workv1alpha1.ManifestCondition, len(results))
for index, result := range results {
if result.err != nil {
errs = append(errs, result.err)
}
appliedCondition := buildManifestAppliedCondition(result.err, result.action, result.generation)
manifestCondition := workv1alpha1.ManifestCondition{
Identifier: result.identifier,
Conditions: []metav1.Condition{appliedCondition},
}
foundmanifestCondition := findManifestConditionByIdentifier(result.identifier, work.Status.ManifestConditions)
if foundmanifestCondition != nil {
manifestCondition.Conditions = foundmanifestCondition.Conditions
meta.SetStatusCondition(&manifestCondition.Conditions, appliedCondition)
}
manifestConditions[index] = manifestCondition
}
work.Status.ManifestConditions = manifestConditions
workCond := generateWorkAppliedCondition(manifestConditions, work.Generation)
work.Status.Conditions = []metav1.Condition{workCond}
return errs
}
// Join starts to reconcile
func (r *ApplyWorkReconciler) Join(_ context.Context) error {
if !r.joined.Load() {
klog.InfoS("mark the apply work reconciler joined")
}
r.joined.Store(true)
return nil
}
// Leave start
func (r *ApplyWorkReconciler) Leave(ctx context.Context) error {
var works workv1alpha1.WorkList
if r.joined.Load() {
klog.InfoS("mark the apply work reconciler left")
}
r.joined.Store(false)
// list all the work object we created in the member cluster namespace
listOpts := []client.ListOption{
client.InNamespace(r.workNameSpace),
}
if err := r.client.List(ctx, &works, listOpts...); err != nil {
klog.ErrorS(err, "failed to list all the work object", "clusterNS", r.workNameSpace)
return client.IgnoreNotFound(err)
}
// we leave the resources on the member cluster for now
for _, work := range works.Items {
staleWork := work.DeepCopy()
if controllerutil.ContainsFinalizer(staleWork, workFinalizer) {
controllerutil.RemoveFinalizer(staleWork, workFinalizer)
if updateErr := r.client.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "failed to remove the work finalizer from the work",
"clusterNS", r.workNameSpace, "work", klog.KObj(staleWork))
return updateErr
}
}
}
klog.V(2).InfoS("successfully removed all the work finalizers in the cluster namespace",
"clusterNS", r.workNameSpace, "number of work", len(works.Items))
return nil
}
// SetupWithManager wires up the controller.
func (r *ApplyWorkReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.concurrency,
}).
For(&workv1alpha1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}
// Generates a hash of the spec annotation from an unstructured object after we remove all the fields
// we have modified.
func computeManifestHash(obj *unstructured.Unstructured) (string, error) {
manifest := obj.DeepCopy()
// remove the last applied Annotation to avoid unlimited recursion
annotation := manifest.GetAnnotations()
if annotation != nil {
delete(annotation, manifestHashAnnotation)
delete(annotation, lastAppliedConfigAnnotation)
if len(annotation) == 0 {
manifest.SetAnnotations(nil)
} else {
manifest.SetAnnotations(annotation)
}
}
// strip the live object related fields just in case
manifest.SetResourceVersion("")
manifest.SetGeneration(0)
manifest.SetUID("")
manifest.SetSelfLink("")
manifest.SetDeletionTimestamp(nil)
manifest.SetManagedFields(nil)
unstructured.RemoveNestedField(manifest.Object, "metadata", "creationTimestamp")
unstructured.RemoveNestedField(manifest.Object, "status")
// compute the sha256 hash of the remaining data
jsonBytes, err := json.Marshal(manifest.Object)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", sha256.Sum256(jsonBytes)), nil
}
// isManifestManagedByWork determines if an object is managed by the work controller.
func isManifestManagedByWork(ownerRefs []metav1.OwnerReference) bool {
// an object is managed by the work if any of its owner reference is of type appliedWork
for _, ownerRef := range ownerRefs {
if ownerRef.APIVersion == workv1alpha1.GroupVersion.String() && ownerRef.Kind == workv1alpha1.AppliedWorkKind {
return true
}
}
return false
}
// findManifestConditionByIdentifier return a ManifestCondition by identifier
// 1. Find the manifest condition with the whole identifier.
// 2. If identifier only has ordinal, and a matched cannot be found, return nil.
// 3. Try to find properties, other than the ordinal, within the identifier.
func findManifestConditionByIdentifier(identifier workv1alpha1.ResourceIdentifier, manifestConditions []workv1alpha1.ManifestCondition) *workv1alpha1.ManifestCondition {
for _, manifestCondition := range manifestConditions {
if identifier == manifestCondition.Identifier {
return &manifestCondition
}
}
if identifier == (workv1alpha1.ResourceIdentifier{Ordinal: identifier.Ordinal}) {
return nil
}
identifierCopy := identifier.DeepCopy()
for _, manifestCondition := range manifestConditions {
identifierCopy.Ordinal = manifestCondition.Identifier.Ordinal
if *identifierCopy == manifestCondition.Identifier {
return &manifestCondition
}
}
return nil
}
// setManifestHashAnnotation computes the hash of the provided manifest and sets an annotation of the
// hash on the provided unstructured object.
func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error {
manifestHash, err := computeManifestHash(manifestObj)
if err != nil {
return err
}
annotation := manifestObj.GetAnnotations()
if annotation == nil {
annotation = map[string]string{}
}
annotation[manifestHashAnnotation] = manifestHash
manifestObj.SetAnnotations(annotation)
return nil
}
// Builds a resource identifier for a given unstructured.Unstructured object.
func buildResourceIdentifier(index int, object *unstructured.Unstructured, gvr schema.GroupVersionResource) workv1alpha1.ResourceIdentifier {
return workv1alpha1.ResourceIdentifier{
Ordinal: index,
Group: object.GroupVersionKind().Group,
Version: object.GroupVersionKind().Version,
Kind: object.GroupVersionKind().Kind,
Namespace: object.GetNamespace(),
Name: object.GetName(),
Resource: gvr.Resource,
}
}
func buildManifestAppliedCondition(err error, action applyAction, observedGeneration int64) metav1.Condition {
if err != nil {
return metav1.Condition{
Type: ConditionTypeApplied,
Status: metav1.ConditionFalse,
ObservedGeneration: observedGeneration,
LastTransitionTime: metav1.Now(),
Reason: "appliedManifestFailed",
Message: fmt.Sprintf("Failed to apply manifest: %v", err),
}
}
return metav1.Condition{
Type: ConditionTypeApplied,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
ObservedGeneration: observedGeneration,
Reason: string(action),
Message: string(action),
}
}
// generateWorkAppliedCondition generate applied status condition for work.
// If one of the manifests is applied failed on the spoke, the applied status condition of the work is false.
func generateWorkAppliedCondition(manifestConditions []workv1alpha1.ManifestCondition, observedGeneration int64) metav1.Condition {
for _, manifestCond := range manifestConditions {
if meta.IsStatusConditionFalse(manifestCond.Conditions, ConditionTypeApplied) {
return metav1.Condition{
Type: ConditionTypeApplied,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: "appliedWorkFailed",
Message: "Failed to apply work",
ObservedGeneration: observedGeneration,
}
}
}
return metav1.Condition{
Type: ConditionTypeApplied,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "appliedWorkComplete",
Message: "Apply work complete",
ObservedGeneration: observedGeneration,
}
}