-
Notifications
You must be signed in to change notification settings - Fork 2
/
dogu_requeue_handler.go
169 lines (137 loc) · 5.13 KB
/
dogu_requeue_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package controllers
import (
"context"
"errors"
"fmt"
"time"
k8sv1 "github.com/cloudogu/k8s-dogu-operator/api/v1"
"github.com/hashicorp/go-multierror"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// requeuableError indicates that the current error requires the operator to requeue the dogu.
type requeuableError interface {
// Requeue returns true when the error should produce a requeue for the current dogu resource operation.
Requeue() bool
}
// requeuableError indicates that the current error requires the operator to requeue the dogu.
type requeuableErrorWithTime interface {
requeuableError
// GetRequeueTime return the time to wait before the next reconciliation. The constant ExponentialRequeueTime indicates
// that the requeue time increased exponentially.
GetRequeueTime() time.Duration
}
// requeuableErrorWithState indicates that the current error requires the operator to requeue the dogu and set the state
// in dogu status.
type requeuableErrorWithState interface {
requeuableErrorWithTime
// GetState returns the current state of the reconciled resource.
// In most cases it can be empty if no async state mechanism is used.
GetState() string
}
// doguRequeueHandler is responsible to requeue a dogu resource after it failed.
type doguRequeueHandler struct {
client client.Client
// nonCacheClient is required to list all events while filtering them by their fields.
nonCacheClient kubernetes.Interface
namespace string
recorder record.EventRecorder
}
// NewDoguRequeueHandler creates a new dogu requeue handler.
func NewDoguRequeueHandler(client client.Client, recorder record.EventRecorder, namespace string) (*doguRequeueHandler, error) {
clusterConfig, err := ctrl.GetConfig()
if err != nil {
return nil, fmt.Errorf("failed to load cluster configuration: %w", err)
}
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return nil, fmt.Errorf("cannot create kubernetes client: %w", err)
}
return &doguRequeueHandler{
client: client,
nonCacheClient: clientSet,
namespace: namespace,
recorder: recorder,
}, nil
}
// Handle takes an error and handles the requeue process for the current dogu operation.
func (d *doguRequeueHandler) Handle(ctx context.Context, contextMessage string, doguResource *k8sv1.Dogu, err error, onRequeue func(dogu *k8sv1.Dogu)) (ctrl.Result, error) {
return d.handleRequeue(ctx, contextMessage, doguResource, err, onRequeue)
}
func (d *doguRequeueHandler) handleRequeue(ctx context.Context, contextMessage string, doguResource *k8sv1.Dogu, originalErr error, onRequeue func(dogu *k8sv1.Dogu)) (ctrl.Result, error) {
if !shouldRequeue(originalErr) {
return ctrl.Result{}, nil
}
requeueTime := getRequeueTime(doguResource, originalErr)
if onRequeue != nil {
onRequeue(doguResource)
}
doguResource.Status.RequeuePhase = getRequeuePhase(originalErr)
updateError := doguResource.Update(ctx, d.client)
if updateError != nil {
return ctrl.Result{}, fmt.Errorf("failed to update dogu status: %w", updateError)
}
result := ctrl.Result{RequeueAfter: requeueTime}
err := d.fireRequeueEvent(ctx, doguResource, result)
if err != nil {
return ctrl.Result{}, err
}
log.FromContext(ctx).Error(err, fmt.Sprintf("%s: requeue in %s seconds because of: %s", contextMessage, requeueTime, originalErr.Error()))
return result, nil
}
func getRequeuePhase(err error) string {
var errorWithState requeuableErrorWithState
if errors.As(err, &errorWithState) {
return errorWithState.GetState()
}
return ""
}
func getRequeueTime(dogu *k8sv1.Dogu, err error) time.Duration {
var errorWithTime requeuableErrorWithTime
if errors.As(err, &errorWithTime) {
return errorWithTime.GetRequeueTime()
}
return dogu.Status.NextRequeue()
}
func shouldRequeue(err error) bool {
if err == nil {
return false
}
for _, checkErr := range getAllErrorsFromChain(err) {
var requeueableError requeuableError
if errors.As(checkErr, &requeueableError) {
if requeueableError.Requeue() {
return true
}
}
}
return false
}
func getAllErrorsFromChain(err error) []error {
multiError, ok := err.(*multierror.Error)
if !ok {
return []error{err}
}
return multiError.Errors
}
func (d *doguRequeueHandler) fireRequeueEvent(ctx context.Context, doguResource *k8sv1.Dogu, result ctrl.Result) error {
doguEvents, err := d.nonCacheClient.CoreV1().Events(d.namespace).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("reason=%s,involvedObject.name=%s", RequeueEventReason, doguResource.Name),
})
if err != nil {
return fmt.Errorf("failed to get all requeue errors: %w", err)
}
for _, event := range doguEvents.Items {
err = d.nonCacheClient.CoreV1().Events(d.namespace).Delete(ctx, event.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete old requeue event: %w", err)
}
}
d.recorder.Eventf(doguResource, v1.EventTypeNormal, RequeueEventReason, "Trying again in %s.", result.RequeueAfter.String())
return nil
}