/
generic_controller.go
295 lines (249 loc) · 10.3 KB
/
generic_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
package controllers
import (
"context"
"fmt"
"time"
"github.com/benbjohnson/clock"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime/schema"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/Azure/azure-service-operator/v2/internal/config"
"github.com/Azure/azure-service-operator/v2/internal/genericarmclient"
. "github.com/Azure/azure-service-operator/v2/internal/logging"
"github.com/Azure/azure-service-operator/v2/internal/reconcilers"
"github.com/Azure/azure-service-operator/v2/internal/util/kubeclient"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/conditions"
)
type (
ARMClientFactory func(genruntime.MetaObject) *genericarmclient.GenericClient
LoggerFactory func(genruntime.MetaObject) logr.Logger
)
// GenericReconciler reconciles resources
type GenericReconciler struct {
LoggerFactory LoggerFactory
ARMClientFactory ARMClientFactory
KubeClient *kubeclient.Client
ResourceResolver *genruntime.Resolver
Recorder record.EventRecorder
Name string
Config config.Values
GVK schema.GroupVersionKind
RequeueDelayOverride time.Duration
PositiveConditions *conditions.PositiveConditionBuilder
}
var _ reconcile.Reconciler = &GenericReconciler{} // GenericReconciler is a reconcile.Reconciler
type Options struct {
controller.Options
// options specific to our controller
RequeueDelay time.Duration
Config config.Values
LoggerFactory func(obj metav1.Object) logr.Logger
}
func (options *Options) setDefaults() {
// default logger to the controller-runtime logger
if options.Log == nil {
options.Log = ctrl.Log
}
}
func RegisterWebhooks(mgr ctrl.Manager, objs []client.Object) error {
var errs []error
for _, obj := range objs {
if err := registerWebhook(mgr, obj); err != nil {
errs = append(errs, err)
}
}
return kerrors.NewAggregate(errs)
}
func registerWebhook(mgr ctrl.Manager, obj client.Object) error {
_, err := conversion.EnforcePtr(obj)
if err != nil {
return errors.Wrap(err, "obj was expected to be ptr but was not")
}
return ctrl.NewWebhookManagedBy(mgr).For(obj).Complete()
}
func RegisterAll(mgr ctrl.Manager, clientFactory ARMClientFactory, objs []client.Object, options Options) error {
options.setDefaults()
reconciledResourceLookup, err := MakeResourceGVKLookup(mgr, objs)
if err != nil {
return err
}
var errs []error
for _, obj := range objs {
if err := register(mgr, reconciledResourceLookup, clientFactory, obj, options); err != nil {
errs = append(errs, err)
}
}
return kerrors.NewAggregate(errs)
}
func register(
mgr ctrl.Manager,
reconciledResourceLookup map[schema.GroupKind]schema.GroupVersionKind,
clientFactory ARMClientFactory,
obj client.Object,
options Options) error {
v, err := conversion.EnforcePtr(obj)
if err != nil {
return errors.Wrap(err, "obj was expected to be ptr but was not")
}
t := v.Type()
controllerName := fmt.Sprintf("%sController", t.Name())
// Use the provided GVK to construct a new runtime object of the desired concrete type.
gvk, err := apiutil.GVKForObject(obj, mgr.GetScheme())
if err != nil {
return errors.Wrapf(err, "creating GVK for obj %T", obj)
}
options.Log.V(Status).Info("Registering", "GVK", gvk)
// TODO: Do we need to add any index fields here? DavidJ's controller index's status.id - see its usage
// TODO: of IndexField
kubeClient := kubeclient.NewClient(mgr.GetClient(), mgr.GetScheme())
loggerFactory := func(mo genruntime.MetaObject) logr.Logger {
result := options.Log
if options.LoggerFactory != nil {
if factoryResult := options.LoggerFactory(mo); factoryResult != nil {
result = factoryResult
}
}
return result.WithName(controllerName)
}
reconciler := &GenericReconciler{
ARMClientFactory: clientFactory,
KubeClient: kubeClient,
ResourceResolver: genruntime.NewResolver(kubeClient, reconciledResourceLookup),
Name: t.Name(),
Config: options.Config,
LoggerFactory: loggerFactory,
Recorder: mgr.GetEventRecorderFor(controllerName),
GVK: gvk,
RequeueDelayOverride: options.RequeueDelay,
PositiveConditions: conditions.NewPositiveConditionBuilder(clock.New()),
}
err = ctrl.NewControllerManagedBy(mgr).
For(obj).
// Note: These predicates prevent status updates from triggering a reconcile.
// to learn more look at https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/predicate#GenerationChangedPredicate
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(options.Options).
Complete(reconciler)
if err != nil {
return errors.Wrap(err, "unable to build controllers / reconciler")
}
return nil
}
// MakeResourceGVKLookup creates a map of schema.GroupKind to schema.GroupVersionKind. This can be used to look up
// the version of a GroupKind that is being reconciled.
func MakeResourceGVKLookup(mgr ctrl.Manager, objs []client.Object) (map[schema.GroupKind]schema.GroupVersionKind, error) {
result := make(map[schema.GroupKind]schema.GroupVersionKind)
for _, obj := range objs {
gvk, err := apiutil.GVKForObject(obj, mgr.GetScheme())
if err != nil {
return nil, errors.Wrapf(err, "creating GVK for obj %T", obj)
}
groupKind := schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}
if existing, ok := result[groupKind]; ok {
return nil, errors.Errorf("somehow group: %q, kind: %q was already registered with version %q", gvk.Group, gvk.Kind, existing.Version)
}
result[groupKind] = gvk
}
return result, nil
}
// NamespaceAnnotation defines the annotation name to use when marking
// a resource with the namespace of the managing operator.
const NamespaceAnnotation = "serviceoperator.azure.com/operator-namespace"
// Reconcile will take state in K8s and apply it to Azure
func (gr *GenericReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
obj, err := gr.KubeClient.GetObjectOrDefault(ctx, req.NamespacedName, gr.GVK)
if err != nil {
return ctrl.Result{}, err
}
if obj == nil {
// This means that the resource doesn't exist
return ctrl.Result{}, nil
}
// Always operate on a copy rather than the object from the client, as per
// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md, which says:
// Never mutate original objects! Caches are shared across controllers, this means that if you mutate your "copy"
// (actually a reference or shallow copy) of an object, you'll mess up other controllers (not just your own).
obj = obj.DeepCopyObject().(client.Object)
// The Go type for the Kubernetes object must understand how to
// convert itself to/from the corresponding Azure types.
metaObj, ok := obj.(genruntime.MetaObject)
if !ok {
return ctrl.Result{}, errors.Errorf("object is not a genruntime.MetaObject, found type: %T", obj)
}
log := gr.LoggerFactory(metaObj).WithValues("name", req.Name, "namespace", req.Namespace, "azureName", metaObj.AzureName())
log.V(Verbose).Info(
"Reconcile invoked",
"kind", fmt.Sprintf("%T", obj),
"resourceVersion", obj.GetResourceVersion(),
"generation", obj.GetGeneration())
// Ensure the resource is tagged with the operator's namespace.
annotations := metaObj.GetAnnotations()
reconcilerNamespace := annotations[NamespaceAnnotation]
if reconcilerNamespace != gr.Config.PodNamespace && reconcilerNamespace != "" {
// We don't want to get into a fight with another operator -
// so if we see another operator already has this object leave
// it alone. This will do the right thing in the case of two
// operators trying to manage the same namespace. It makes
// moving objects between namespaces or changing which
// operator owns a namespace fiddlier (since you'd need to
// remove the annotation) but those operations are likely to
// be rare.
message := fmt.Sprintf("Operators in %q and %q are both configured to manage this resource", gr.Config.PodNamespace, reconcilerNamespace)
gr.Recorder.Event(obj, corev1.EventTypeWarning, "Overlap", message)
return ctrl.Result{}, nil
} else if reconcilerNamespace == "" && gr.Config.PodNamespace != "" {
genruntime.AddAnnotation(metaObj, NamespaceAnnotation, gr.Config.PodNamespace)
return ctrl.Result{Requeue: true}, gr.KubeClient.Client.Update(ctx, obj)
}
// TODO: We need some factory-lookup here
reconciler := reconcilers.NewAzureDeploymentReconciler(
metaObj,
log,
gr.ARMClientFactory(metaObj),
gr.Recorder,
gr.KubeClient,
gr.ResourceResolver,
gr.PositiveConditions)
result, err := reconciler.Reconcile(ctx)
if err != nil {
return ctrl.Result{}, err
}
// If we have a requeue delay override, set it for all situations where
// we are requeueing.
hasRequeueDelayOverride := gr.RequeueDelayOverride != time.Duration(0)
isRequeueing := result.Requeue || result.RequeueAfter > time.Duration(0)
if hasRequeueDelayOverride && isRequeueing {
result.RequeueAfter = gr.RequeueDelayOverride
result.Requeue = true
}
return result, nil
}
// NewRateLimiter creates a new workqueue.Ratelimiter for use controlling the speed of reconciliation.
// It throttles individual requests exponentially and also controls for multiple requests.
func NewRateLimiter(minBackoff time.Duration, maxBackoff time.Duration) workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(minBackoff, maxBackoff),
// TODO: We could have an azure global (or per subscription) bucket rate limiter to prevent running into subscription
// TODO: level throttling. For now though just stay with the default that client-go uses.
// 10 rps, 100 bucket (spike) size. This is across all requests (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}