/
namespaced.go
162 lines (125 loc) · 5.04 KB
/
namespaced.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
capsulev1beta2 "github.com/clastix/capsule/api/v1beta2"
)
type Namespaced struct {
client client.Client
processor Processor
}
func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
}
return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.TenantResource{}).
Complete(r)
}
//nolint:dupl
func (r *Namespaced) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
log.Info("start processing")
// Retrieving the TenantResource
tntResource := &capsulev1beta2.TenantResource{}
if err := r.client.Get(ctx, request.NamespacedName, tntResource); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Request object not found, could have been deleted after reconcile request")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
patchHelper, err := patch.NewHelper(tntResource, r.client)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to init patch helper")
}
defer func() {
if e := patchHelper.Patch(ctx, tntResource); e != nil {
if err == nil {
err = errors.Wrap(e, "failed to patch TenantResource")
}
}
}()
// Handle deleted TenantResource
if !tntResource.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, tntResource)
}
// Handle non-deleted TenantResource
return r.reconcileNormal(ctx, tntResource)
}
func (r *Namespaced) reconcileNormal(ctx context.Context, tntResource *capsulev1beta2.TenantResource) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
if *tntResource.Spec.PruningOnDelete {
controllerutil.AddFinalizer(tntResource, finalizer)
}
// Adding the default value for the status
if tntResource.Status.ProcessedItems == nil {
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0)
}
// Retrieving the parent of the Tenant Resource:
// can be owned, or being deployed in one of its Namespace.
tl := &capsulev1beta2.TenantList{}
if err := r.client.List(ctx, tl, client.MatchingFieldsSelector{Selector: fields.OneTermEqualSelector(".status.namespaces", tntResource.GetNamespace())}); err != nil {
log.Error(err, "unable to detect the Tenant for the given TenantResource")
return reconcile.Result{}, err
}
if len(tl.Items) == 0 {
log.Info("skipping sync, the current Namespace is not belonging to any Global")
return reconcile.Result{}, nil
}
err := new(multierror.Error)
// A TenantResource is made of several Resource sections, each one with specific options:
// the Status can be updated only in case of no errors across all of them to guarantee a valid and coherent status.
processedItems := sets.NewString()
tenantLabel, labelErr := capsulev1beta2.GetTypeLabel(&capsulev1beta2.Tenant{})
if labelErr != nil {
log.Error(labelErr, "expected label for selection")
return reconcile.Result{}, labelErr
}
for index, resource := range tntResource.Spec.Resources {
items, sectionErr := r.processor.HandleSection(ctx, tl.Items[0], false, tenantLabel, index, resource)
if sectionErr != nil {
// Upon a process error storing the last error occurred and continuing to iterate,
// avoid to block the whole processing.
err = multierror.Append(err, sectionErr)
} else {
processedItems.Insert(items...)
}
}
if err.ErrorOrNil() != nil {
log.Error(err, "unable to replicate the requested resources")
return reconcile.Result{}, err
}
if r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems.AsSet(), sets.Set[string](processedItems)) {
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0, len(processedItems))
for _, item := range processedItems.List() {
if or := (capsulev1beta2.ObjectReferenceStatus{}); or.ParseFromString(item) == nil {
tntResource.Status.ProcessedItems = append(tntResource.Status.ProcessedItems, or)
}
}
}
log.Info("processing completed")
return reconcile.Result{Requeue: true, RequeueAfter: tntResource.Spec.ResyncPeriod.Duration}, nil
}
func (r *Namespaced) reconcileDelete(ctx context.Context, tntResource *capsulev1beta2.TenantResource) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
if *tntResource.Spec.PruningOnDelete {
r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems.AsSet(), nil)
}
controllerutil.RemoveFinalizer(tntResource, finalizer)
log.Info("processing completed")
return reconcile.Result{Requeue: true, RequeueAfter: tntResource.Spec.ResyncPeriod.Duration}, nil
}