This repository has been archived by the owner on Oct 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
statefulset_rollout_controller.go
81 lines (70 loc) · 2.85 KB
/
statefulset_rollout_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package statefulset
import (
"context"
"fmt"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"code.cloudfoundry.org/quarks-utils/pkg/config"
"code.cloudfoundry.org/quarks-utils/pkg/ctxlog"
"code.cloudfoundry.org/quarks-utils/pkg/monitorednamespace"
)
// AddStatefulSetRollout creates a new statefulset rollout controller and adds it to the manager.
// The purpose of this controller is to remove the partition of the statefulset if the canary succeeds.
func AddStatefulSetRollout(ctx context.Context, config *config.Config, mgr manager.Manager) error {
ctx = ctxlog.NewContextWithRecorder(ctx, "statefulset-rollout-reconciler", mgr.GetEventRecorderFor("statefulset-rollout-recorder"))
r := NewStatefulSetRolloutReconciler(ctx, config, mgr)
// Create a new controller
c, err := controller.New("statefulset-rollout-controller", mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: config.MaxQuarksStatefulSetWorkers,
})
if err != nil {
return errors.Wrap(err, "Adding StatefulSet rollout controller to manager failed.")
}
nsPred := monitorednamespace.NewNSPredicate(ctx, mgr.GetClient(), config.MonitoredID)
// Trigger when annotation is set
statefulSetPredicates := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool {
if CheckUpdate(e) {
ctxlog.NewPredicateEvent(e.ObjectNew).Debug(
ctx, e.ObjectNew, "StatefulSet",
fmt.Sprintf("Update predicate passed for '%s/%s' for statefulset rollout", e.ObjectNew.GetNamespace(), e.ObjectNew.GetName()),
)
return true
}
return false
},
}
err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForObject{}, nsPred, statefulSetPredicates)
if err != nil {
return errors.Wrapf(err, "Watching StatefulSet failed in StatefulSet rollout controller.")
}
return nil
}
// CheckUpdate checks if update event should be processed
func CheckUpdate(e event.UpdateEvent) bool {
newSts := e.ObjectNew.(*appsv1.StatefulSet)
state, ok := newSts.Annotations[AnnotationCanaryRollout]
if !ok || state == rolloutStateDone || state == rolloutStateFailed {
return false
}
if state == rolloutStatePending {
return true
}
oldSts := e.ObjectOld.(*appsv1.StatefulSet)
if oldSts.Status.ReadyReplicas == newSts.Status.ReadyReplicas &&
oldSts.Status.UpdatedReplicas == newSts.Status.UpdatedReplicas &&
oldSts.Status.Replicas == newSts.Status.Replicas {
return false
}
return true
}