-
Notifications
You must be signed in to change notification settings - Fork 18
/
controller.go
410 lines (358 loc) · 18.4 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"time"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// EtcdadmClusterReconciler reconciles a EtcdadmCluster object
type EtcdadmClusterReconciler struct {
controller controller.Controller
client.Client
recorder record.EventRecorder
uncachedClient client.Reader
Log logr.Logger
Scheme *runtime.Scheme
etcdHealthCheckConfig etcdHealthCheckConfig
}
func (r *EtcdadmClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, done <-chan struct{}) error {
c, err := ctrl.NewControllerManagedBy(mgr).
For(&etcdv1.EtcdadmCluster{}).
Owns(&clusterv1.Machine{}).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(r.ClusterToEtcdadmCluster),
predicates.ClusterUnpausedAndInfrastructureReady(r.Log),
)
if err != nil {
return errors.Wrap(err, "failed adding Watch for Clusters to controller manager")
}
r.controller = c
r.recorder = mgr.GetEventRecorderFor("etcdadm-cluster-controller")
r.uncachedClient = mgr.GetAPIReader()
go r.startHealthCheckLoop(ctx, done)
return nil
}
// +kubebuilder:rbac:groups=etcdcluster.cluster.x-k8s.io,resources=etcdadmclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcdcluster.cluster.x-k8s.io,resources=etcdadmclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=bootstrap.cluster.x-k8s.io,resources=etcdadmconfigs;etcdadmconfigs/status,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=configmaps;events;secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=*,verbs=get;list;watch;create;update;patch;delete
func (r *EtcdadmClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, reterr error) {
log := r.Log.WithValues("etcdadmcluster", req.NamespacedName)
// Lookup the etcdadm cluster object
etcdCluster := &etcdv1.EtcdadmCluster{}
if err := r.Client.Get(ctx, req.NamespacedName, etcdCluster); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get etcdadm cluster")
return ctrl.Result{}, err
}
// Fetch the CAPI Cluster.
cluster, err := util.GetOwnerCluster(ctx, r.Client, etcdCluster.ObjectMeta)
if err != nil {
log.Error(err, "Failed to retrieve owner Cluster from the API Server")
return ctrl.Result{}, err
}
if cluster == nil {
log.Info("Cluster Controller has not yet set OwnerRef on etcd")
return ctrl.Result{}, nil
}
if !cluster.Status.InfrastructureReady {
log.Info("Infrastructure cluster is not yet ready")
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
if annotations.IsPaused(cluster, etcdCluster) {
log.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
// Initialize the patch helper.
patchHelper, err := patch.NewHelper(etcdCluster, r.Client)
if err != nil {
log.Error(err, "Failed to configure the patch helper")
return ctrl.Result{Requeue: true}, nil
}
// Add finalizer first if it does not exist to avoid the race condition between init and delete
if !controllerutil.ContainsFinalizer(etcdCluster, etcdv1.EtcdadmClusterFinalizer) {
controllerutil.AddFinalizer(etcdCluster, etcdv1.EtcdadmClusterFinalizer)
// patch and return right away instead of reusing the main defer,
// because the main defer may take too much time to get cluster status
patchOpts := []patch.Option{patch.WithStatusObservedGeneration{}}
if err := patchHelper.Patch(ctx, etcdCluster, patchOpts...); err != nil {
log.Error(err, "Failed to patch EtcdadmCluster to add finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
defer func() {
etcdMachines, err := r.checkOwnedMachines(ctx, log, etcdCluster, cluster)
if err != nil {
reterr = kerrors.NewAggregate([]error{reterr, err})
return
} else {
if err := r.updateMachinesEtcdReadyLabel(ctx, log, etcdMachines); err != nil {
log.Error(err, "Failed to update etcd ready labels in machines")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
// Always attempt to update status.
if err := r.updateStatus(ctx, etcdCluster, cluster, etcdMachines); err != nil {
log.Error(err, "Failed to update EtcdadmCluster Status")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}
if conditions.IsFalse(etcdCluster, etcdv1.EtcdMachinesSpecUpToDateCondition) &&
conditions.GetReason(etcdCluster, etcdv1.EtcdMachinesSpecUpToDateCondition) == etcdv1.EtcdRollingUpdateInProgressReason {
// set ready to false, so that CAPI cluster controller will pause KCP so it doesn't keep checking if endpoints are updated
etcdCluster.Status.Ready = false
}
// Always attempt to Patch the EtcdadmCluster object and status after each reconciliation.
if err := patchEtcdCluster(ctx, patchHelper, etcdCluster); err != nil {
log.Error(err, "Failed to patch EtcdadmCluster")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
if reterr == nil && !res.Requeue && !(res.RequeueAfter > 0) && etcdCluster.ObjectMeta.DeletionTimestamp.IsZero() {
if !etcdCluster.Status.Ready {
res = ctrl.Result{RequeueAfter: 20 * time.Second}
}
}
}()
if !etcdCluster.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion reconciliation loop.
return r.reconcileDelete(ctx, etcdCluster, cluster)
}
return r.reconcile(ctx, etcdCluster, cluster)
}
func (r *EtcdadmClusterReconciler) reconcile(ctx context.Context, etcdCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (ctrl.Result, error) {
log := r.Log.WithName(etcdCluster.Name)
var desiredReplicas int
// Reconcile the external infrastructure reference.
if err := r.reconcileExternalReference(ctx, cluster, etcdCluster.Spec.InfrastructureTemplate); err != nil {
return ctrl.Result{}, err
}
etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, r.uncachedClient, cluster, EtcdClusterMachines(cluster.Name, etcdCluster.Name))
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "Error filtering machines for etcd cluster")
}
ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdCluster))
ep, err := NewEtcdPlane(ctx, r.Client, cluster, etcdCluster, ownedMachines)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "Error initializing internal object EtcdPlane")
}
if len(ownedMachines) != len(etcdMachines) {
if conditions.IsUnknown(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition) || conditions.IsTrue(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition) {
conditions.MarkFalse(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition, etcdv1.EtcdClusterHasOutdatedMembersReason, clusterv1.ConditionSeverityInfo, "%d etcd members have outdated spec", len(etcdMachines.Difference(ownedMachines)))
}
/* These would be the out-of-date etcd machines still belonging to the current etcd cluster as etcd members, but not owned by the EtcdadmCluster object
When upgrading a cluster, etcd machines need to be upgraded first so that the new etcd endpoints become available. But the outdated controlplane machines
will keep trying to connect to the etcd members they were configured with. So we cannot delete these older etcd members till controlplane rollout has finished.
So this is only possible after an upgrade, and these machines can be deleted only after controlplane upgrade has finished. */
if _, ok := etcdCluster.Annotations[clusterv1.ControlPlaneUpgradeCompletedAnnotation]; ok {
outdatedMachines := etcdMachines.Difference(ownedMachines)
log.Info(fmt.Sprintf("Controlplane upgrade has completed, deleting older outdated etcd members: %v", outdatedMachines.Names()))
for _, outdatedMachine := range outdatedMachines {
outdatedMachineAddress := getEtcdMachineAddress(outdatedMachine)
if err := r.removeEtcdMachine(ctx, etcdCluster, cluster, outdatedMachine, outdatedMachineAddress); err != nil {
return ctrl.Result{}, err
}
}
// requeue so controller reconciles after last machine is deleted and the "EtcdClusterHasNoOutdatedMembersCondition" is marked true
return ctrl.Result{Requeue: true}, nil
}
} else {
if _, ok := etcdCluster.Annotations[clusterv1.ControlPlaneUpgradeCompletedAnnotation]; ok {
log.Info("Outdated etcd members deleted, removing controlplane-upgrade complete annotation")
delete(etcdCluster.Annotations, clusterv1.ControlPlaneUpgradeCompletedAnnotation)
}
if conditions.IsFalse(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition) {
log.Info(fmt.Sprintf("Outdated etcd members deleted, setting %s to true", etcdv1.EtcdClusterHasNoOutdatedMembersCondition))
conditions.MarkTrue(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition)
}
}
// This aggregates the state of all machines
conditions.SetAggregate(etcdCluster, etcdv1.EtcdMachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false))
numCurrentMachines := len(ownedMachines)
desiredReplicas = int(*etcdCluster.Spec.Replicas)
// Etcd machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := ep.MachinesNeedingRollout()
switch {
case len(needRollout) > 0:
log.Info("Rolling out Etcd machines", "needRollout", needRollout.Names())
if conditions.IsFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition) && len(ep.UpToDateMachines()) > 0 {
// update is already in progress, some machines have been rolled out with the new spec
newestUpToDateMachine := ep.NewestUpToDateMachine()
newestUpToDateMachineCreationTime := newestUpToDateMachine.CreationTimestamp.Time
nextMachineUpdateTime := newestUpToDateMachineCreationTime.Add(time.Duration(minEtcdMemberReadySeconds) * time.Second)
if nextMachineUpdateTime.After(time.Now()) {
// the latest machine with updated spec should get more time for etcd data sync
// requeue this after
after := time.Until(nextMachineUpdateTime)
log.Info(fmt.Sprintf("Requeueing etcdadm cluster for updating next machine after %s", after.String()))
return ctrl.Result{RequeueAfter: after}, nil
}
// otherwise, if the minimum time to wait between successive machine updates has passed,
// check that the latest etcd member is ready
address := getEtcdMachineAddress(newestUpToDateMachine)
if address == "" {
return ctrl.Result{}, nil
}
// if member passes healthcheck, that is proof that data sync happened and we can proceed further with upgrade
if err := r.performEndpointHealthCheck(ctx, cluster, getMemberClientURL(address), true); err != nil {
return ctrl.Result{}, err
}
}
conditions.MarkFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition, etcdv1.EtcdRollingUpdateInProgressReason, clusterv1.ConditionSeverityWarning, "Rolling %d replicas with outdated spec (%d replicas up to date)", len(needRollout), len(ep.Machines)-len(needRollout))
return r.upgradeEtcdCluster(ctx, cluster, etcdCluster, ep, needRollout)
default:
// make sure last upgrade operation is marked as completed.
// NOTE: we are checking the condition already exists in order to avoid to set this condition at the first
// reconciliation/before a rolling upgrade actually starts.
if conditions.Has(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition) {
conditions.MarkTrue(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition)
_, hasUpgradeAnnotation := etcdCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
if hasUpgradeAnnotation {
delete(etcdCluster.Annotations, etcdv1.UpgradeInProgressAnnotation)
}
}
}
switch {
case numCurrentMachines < desiredReplicas && numCurrentMachines == 0:
// Create first etcd machine to run etcdadm init
log.Info("Initializing etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
conditions.MarkFalse(etcdCluster, etcdv1.InitializedCondition, etcdv1.WaitingForEtcdadmInitReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(etcdCluster, etcdv1.EtcdEndpointsAvailable, etcdv1.WaitingForEtcdadmEndpointsToPassHealthcheckReason, clusterv1.ConditionSeverityInfo, "")
return r.intializeEtcdCluster(ctx, etcdCluster, cluster, ep)
case numCurrentMachines > 0 && conditions.IsFalse(etcdCluster, etcdv1.InitializedCondition):
// as soon as first etcd machine is up, etcdadm init would be run on it to initialize the etcd cluster, update the condition
if !etcdCluster.Status.Initialized {
// defer func in Reconcile will requeue it after 20 sec
return ctrl.Result{}, nil
}
// since etcd cluster has been initialized
conditions.MarkTrue(etcdCluster, etcdv1.InitializedCondition)
case numCurrentMachines < desiredReplicas && numCurrentMachines > 0:
log.Info("Scaling up etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
return r.scaleUpEtcdCluster(ctx, etcdCluster, cluster, ep)
case numCurrentMachines > desiredReplicas:
log.Info("Scaling down etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
// The last parameter corresponds to Machines that need to be rolled out, eg during upgrade, should always be empty here.
return r.scaleDownEtcdCluster(ctx, etcdCluster, cluster, ep, collections.Machines{})
}
return ctrl.Result{}, nil
}
func (r *EtcdadmClusterReconciler) reconcileDelete(ctx context.Context, etcdCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx, "cluster", cluster.Name)
log.Info("Reconcile EtcdadmCluster deletion")
etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, r.uncachedClient, cluster, EtcdClusterMachines(cluster.Name, etcdCluster.Name))
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "Error filtering machines for etcd cluster")
}
ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdCluster))
if len(ownedMachines) == 0 {
// If no etcd machines are left, remove the finalizer
controllerutil.RemoveFinalizer(etcdCluster, etcdv1.EtcdadmClusterFinalizer)
return ctrl.Result{}, nil
}
// This aggregates the state of all machines
conditions.SetAggregate(etcdCluster, etcdv1.EtcdMachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false))
// Delete etcd machines
machinesToDelete := ownedMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
var errs []error
for _, m := range machinesToDelete {
logger := log.WithValues("machine", m)
if err := r.Client.Delete(ctx, m); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to cleanup owned machine")
errs = append(errs, err)
}
}
if len(errs) > 0 {
err := kerrors.NewAggregate(errs)
r.recorder.Eventf(etcdCluster, corev1.EventTypeWarning, "FailedDelete",
"Failed to delete etcd Machines for cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
return ctrl.Result{}, err
}
conditions.MarkFalse(etcdCluster, etcdv1.EtcdClusterResizeCompleted, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "")
// requeue to check if machines are deleted and remove the finalizer
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// ClusterToEtcdadmCluster is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
// for EtcdadmCluster based on updates to a Cluster.
func (r *EtcdadmClusterReconciler) ClusterToEtcdadmCluster(o client.Object) []ctrl.Request {
c, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}
etcdRef := c.Spec.ManagedExternalEtcdRef
if etcdRef != nil && etcdRef.Kind == "EtcdadmCluster" {
return []ctrl.Request{{NamespacedName: client.ObjectKey{Namespace: etcdRef.Namespace, Name: etcdRef.Name}}}
}
return nil
}
func patchEtcdCluster(ctx context.Context, patchHelper *patch.Helper, ec *etcdv1.EtcdadmCluster) error {
// SetSummary sets the Ready condition on an object, in this case the EtcdadmCluster as an aggregate of all conditions defined on EtcdadmCluster
conditions.SetSummary(ec,
conditions.WithConditions(
etcdv1.EtcdMachinesSpecUpToDateCondition,
etcdv1.EtcdCertificatesAvailableCondition,
etcdv1.EtcdMachinesReadyCondition,
etcdv1.EtcdClusterResizeCompleted,
etcdv1.InitializedCondition,
etcdv1.EtcdClusterHasNoOutdatedMembersCondition,
etcdv1.EtcdEndpointsAvailable,
),
)
// patch the EtcdadmCluster conditions based on current values at the end of every reconcile
return patchHelper.Patch(
ctx,
ec,
patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{
clusterv1.ReadyCondition,
etcdv1.EtcdMachinesSpecUpToDateCondition,
etcdv1.EtcdCertificatesAvailableCondition,
etcdv1.EtcdMachinesReadyCondition,
etcdv1.EtcdClusterResizeCompleted,
etcdv1.InitializedCondition,
etcdv1.EtcdClusterHasNoOutdatedMembersCondition,
etcdv1.EtcdEndpointsAvailable,
}},
patch.WithStatusObservedGeneration{},
)
}