-
Notifications
You must be signed in to change notification settings - Fork 2
/
reconcile.go
253 lines (215 loc) · 7.02 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package v1
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/darkowlzz/operator-toolkit/constant"
tkctrl "github.com/darkowlzz/operator-toolkit/controller"
"github.com/darkowlzz/operator-toolkit/controller/stateless-action/v1/action"
"github.com/darkowlzz/operator-toolkit/telemetry"
)
// Name of the instrumentation.
const instrumentationName = constant.LibraryName + "/controller/stateless-action"
// Reconciler is the StatelessAction reconciler.
type Reconciler struct {
name string
ctrlr Controller
client client.Client
scheme *runtime.Scheme
actionRetryPeriod time.Duration
actionTimeout time.Duration
inst *telemetry.Instrumentation
}
// ReconcilerOption is used to configure Reconciler.
type ReconcilerOption func(*Reconciler)
// WithName sets the name of the Reconciler.
func WithName(name string) ReconcilerOption {
return func(r *Reconciler) {
r.name = name
}
}
// WithActionRetryPeriod sets the action retry period when it fails.
func WithActionRetryPeriod(duration time.Duration) ReconcilerOption {
return func(r *Reconciler) {
r.actionRetryPeriod = duration
}
}
func WithActionTimeout(duration time.Duration) ReconcilerOption {
return func(r *Reconciler) {
r.actionTimeout = duration
}
}
// WithScheme sets the runtime Scheme of the Reconciler.
func WithScheme(scheme *runtime.Scheme) ReconcilerOption {
return func(r *Reconciler) {
r.scheme = scheme
}
}
// WithInstrumentation configures the instrumentation of the Reconciler.
func WithInstrumentation(tp trace.TracerProvider, mp metric.MeterProvider, log logr.Logger) ReconcilerOption {
return func(r *Reconciler) {
// Populate the instrumentation with reconciler data.
if log != nil && r.name != "" {
log = log.WithValues("reconciler", r.name)
}
r.inst = telemetry.NewInstrumentationWithProviders(instrumentationName, tp, mp, log)
}
}
func (r *Reconciler) Init(mgr ctrl.Manager, ctrlr Controller, opts ...ReconcilerOption) {
r.ctrlr = ctrlr
// Use manager if provided. This is helpful in tests to provide explicit
// client and scheme without a manager.
if mgr != nil {
r.client = mgr.GetClient()
r.scheme = mgr.GetScheme()
}
// Run the options to override the defaults.
for _, opt := range opts {
opt(r)
}
// If instrumentation is nil, create a new instrumentation with default
// providers.
if r.inst == nil {
WithInstrumentation(nil, nil, ctrl.Log)(r)
}
}
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, reterr error) {
ctx, span, _, log := r.inst.Start(ctx, r.name+": Reconcile")
defer span.End()
start := time.Now()
defer tkctrl.LogReconcileFinish(log, "reconciliation finished", start, &result, &reterr)
span.SetAttributes(attribute.String("object-key", req.NamespacedName.String()))
controller := r.ctrlr
// Get an instance of the target object.
// NOTE: Since the object can be fetched from any backend, we don't know
// about the error code to be able to perform a proper not found error
// check. If it's a k8s apimachinery "not found" error, ignore it. Any
// other error will result in returning error. In order to ignore not found
// from other backend, return a nil object.
obj, err := controller.GetObject(ctx, req.NamespacedName)
if err != nil {
reterr = client.IgnoreNotFound(err)
if reterr != nil {
span.RecordError(reterr)
}
return
}
// Return if the object is nil.
if obj == nil {
span.AddEvent("empty object")
return
}
// Check if an action is required.
requireAction, err := controller.RequireAction(ctx, obj)
if err != nil {
span.RecordError(err)
reterr = err
return
}
// If an action is required, run an action manager for the target object.
if requireAction {
span.AddEvent("Action required, running action manager")
if err := r.RunActionManager(ctx, obj); err != nil {
span.RecordError(err)
reterr = err
return
}
}
return
}
// RunActionManager runs the actions in the action manager based on the given
// object.
func (r *Reconciler) RunActionManager(ctx context.Context, o interface{}) error {
ctx, span, _, log := r.inst.Start(ctx, r.name+": run action manager")
defer span.End()
span.AddEvent("Build action manager")
actmgr, err := r.ctrlr.BuildActionManager(o)
if err != nil {
span.RecordError(err)
return errors.Wrapf(err, "failed to build action manager")
}
// Get the objects to run action on.
objects, err := actmgr.GetObjects(ctx)
if err != nil {
span.RecordError(err)
return errors.Wrapf(err, "failed to get objects from action manager")
}
span.AddEvent(fmt.Sprintf("Running actions for %d objects", len(objects)))
// Run the action in a goroutine.
for _, obj := range objects {
go func(o interface{}) {
if runErr := r.RunAction(actmgr, o); runErr != nil {
log.Error(runErr, "failed to run action")
}
}(obj)
}
return nil
}
// RunAction checks if an action needs to be run before running it. It also
// runs a deferred function at the end.
func (r *Reconciler) RunAction(actmgr action.Manager, o interface{}) (retErr error) {
name, err := actmgr.GetName(o)
if err != nil {
retErr = errors.Wrapf(err, "failed to get action manager name")
return
}
// Create a context with timeout to be able to cancel the action if it
// can't be completed within the given time.
ctx, cancel := context.WithTimeout(context.Background(), r.actionTimeout)
defer cancel()
ctx, span, _, log := r.inst.Start(ctx, r.name+": run action")
defer span.End()
// Set action info in the logger.
log = log.WithValues("action", name)
span.SetAttributes(
attribute.String("actionName", name),
attribute.Int64("timeout", int64(r.actionTimeout)),
attribute.Int64("retryPeriod", int64(r.actionRetryPeriod)),
)
// Defer the action Defer() function.
defer func() {
if deferErr := actmgr.Defer(ctx, o); deferErr != nil {
span.RecordError(deferErr)
retErr = errors.Wrapf(deferErr, "failed to run deferred action")
return
}
}()
// First run, handle any failure by continuing execution and retry.
span.AddEvent("First action run")
if runErr := actmgr.Run(ctx, o); runErr != nil {
span.RecordError(runErr)
log.Info("action run failed, will retry", "error", runErr)
}
// Check and run the action periodically if the check fails.
for {
select {
case <-time.After(r.actionRetryPeriod):
checkResult, checkErr := actmgr.Check(ctx, o)
if checkErr != nil {
log.Error(checkErr, "failed to perform action check, retrying")
continue
}
if checkResult {
span.AddEvent("Check result true, rerun action")
if runErr := actmgr.Run(ctx, o); runErr != nil {
log.Error(runErr, "action run retry failed")
}
} else {
// Action successful, end the action.
log.V(6).Info("action successful", "object", o)
return
}
case <-ctx.Done():
log.Info("context cancelled, terminating action")
return
}
}
}