-
Notifications
You must be signed in to change notification settings - Fork 3
/
controller.go
148 lines (126 loc) · 5.36 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package rollout
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/manager"
)
type controller struct {
client client.Client
cooldown time.Duration
}
// NewController re-synthesizes compositions when their synthesizer has changed while honoring a cooldown period.
func NewController(mgr ctrl.Manager, cooldown time.Duration) error {
c := &controller{
client: mgr.GetClient(),
cooldown: cooldown,
}
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Synthesizer{}).
Watches(&apiv1.Composition{}, newCompositionHandler()).
// TODO: Filter some events?
WithLogConstructor(manager.NewLogConstructor(mgr, "synthesizerRolloutController")).
Complete(c)
}
func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
syn := &apiv1.Synthesizer{}
err := c.client.Get(ctx, req.NamespacedName, syn)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(fmt.Errorf("gettting synthesizer: %w", err))
}
logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerNamespace", syn.Namespace, "synthesizerGeneration", syn.Generation)
compList := &apiv1.CompositionList{}
err = c.client.List(ctx, compList, client.MatchingFields{
manager.IdxCompositionsBySynthesizer: syn.Name,
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("listing compositions: %w", err)
}
var latestRollout time.Time
for _, comp := range compList.Items {
if comp.Status.CurrentSynthesis != nil && comp.Status.PreviousSynthesis != nil && comp.Status.CurrentSynthesis.Synthesized != nil && comp.Status.CurrentSynthesis.Synthesized.Time.After(latestRollout) {
latestRollout = comp.Status.CurrentSynthesis.Synthesized.Time
}
}
if delta := time.Since(latestRollout); delta < c.cooldown {
return ctrl.Result{RequeueAfter: c.cooldown - delta}, nil
}
// randomize list to avoid always rolling out changes in the same order
rand.Shuffle(len(compList.Items), func(i, j int) { compList.Items[i] = compList.Items[j] })
for _, comp := range compList.Items {
comp := comp
logger := logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation)
// Compositions aren't eligible to receive an updated synthesizer when:
// - They haven't ever been synthesized (they'll use the latest inputs anyway)
// - They are currently being synthesized
// - They are already in sync with the latest inputs
if comp.Status.CurrentSynthesis == nil || comp.Status.CurrentSynthesis.Synthesized == nil || isInSync(&comp, syn) {
continue
}
swapStates(&comp)
err = c.client.Status().Update(ctx, &comp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err)
}
logger.V(1).Info("advancing rollout process")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}
func swapStates(comp *apiv1.Composition) {
// If the previous state has been synthesized but not the current, keep the previous to avoid orphaning deleted resources
if comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.Synthesized != nil {
comp.Status.PreviousSynthesis = comp.Status.CurrentSynthesis
}
comp.Status.CurrentSynthesis = &apiv1.Synthesis{
ObservedCompositionGeneration: comp.Generation,
}
}
func isInSync(comp *apiv1.Composition, syn *apiv1.Synthesizer) bool {
return comp.Status.CurrentSynthesis.ObservedSynthesizerGeneration >= syn.Generation
}
func newCompositionHandler() handler.EventHandler {
return &handler.Funcs{
CreateFunc: func(ctx context.Context, ce event.CreateEvent, rli workqueue.RateLimitingInterface) {
// No need to handle creation events since the status will always be nil.
},
DeleteFunc: func(ctx context.Context, de event.DeleteEvent, rli workqueue.RateLimitingInterface) {
// We don't handle deletes on purpose, since a composition being deleted can only ever
// result in the cooldown period being shortened i.e. we lose track of a more recent
// rollout event.
//
// It's okay that this state can be lost, since it falls within the promised semantics
// of this controller. But ideally we can avoid it when possible.
},
UpdateFunc: func(ctx context.Context, ue event.UpdateEvent, rli workqueue.RateLimitingInterface) {
newComp, ok := ue.ObjectNew.(*apiv1.Composition)
if !ok {
logr.FromContextOrDiscard(ctx).V(0).Info("unexpected type given to newCompositionToSynthesizerHandler")
return
}
oldComp, ok := ue.ObjectOld.(*apiv1.Composition)
if !ok {
rli.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: newComp.Spec.Synthesizer.Name}})
return
}
// Nothing we care about has changed
if oldComp.Spec.Synthesizer.Name == newComp.Spec.Synthesizer.Name &&
oldComp.Status.CurrentSynthesis != nil && newComp.Status.CurrentSynthesis != nil &&
oldComp.Status.CurrentSynthesis.UUID == newComp.Status.CurrentSynthesis.UUID {
return
}
rli.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: newComp.Spec.Synthesizer.Name}})
},
}
}