-
Notifications
You must be signed in to change notification settings - Fork 114
/
event.go
139 lines (122 loc) · 3.4 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package event
import (
"context"
"errors"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
libClient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/akuity/kargo/internal/logging"
)
type recorder struct {
backoff wait.Backoff
sink record.EventSink
logger *log.Entry
newEventHandlerFn func(event *corev1.Event) func() error
}
// NewRecorder returns a new record.EventRecorder that records all events
// without aggregation, even the given event is correlated.
//
// NOTE: This recorder must be used with caution as it creates a new Event
// on every event without throttling / spam filtering features - which are
// included in the event recorder from k8s.io/client-go. This could lead
// to performance issues with the Kubernetes API server.
func NewRecorder(
ctx context.Context,
scheme *runtime.Scheme,
client libClient.Client,
name string,
) record.EventRecorder {
logger := logging.LoggerFromContext(ctx)
internalRecorder := newRecorder(ctx, client, logger)
b := record.NewBroadcaster()
b.StartEventWatcher(internalRecorder.handleEvent)
return b.NewRecorder(
scheme,
corev1.EventSource{
Component: name,
},
)
}
func newRecorder(
ctx context.Context,
client libClient.Client,
logger *log.Entry,
) *recorder {
r := &recorder{
backoff: retry.DefaultRetry, // TODO: Make it configurable
sink: newSink(ctx, client),
logger: logger,
}
r.newEventHandlerFn = r.createEvent
return r
}
func (r *recorder) handleEvent(event *corev1.Event) {
if err := retry.OnError(
r.backoff,
r.newRetryDecider(event),
r.newEventHandlerFn(event),
); err != nil {
r.logger.WithError(err).Error("Unable to handle event", "event", event)
}
}
func (r *recorder) createEvent(event *corev1.Event) func() error {
return func() error {
// Always create event instead of patching correlated events
_, err := r.sink.Create(event)
return err
}
}
// newRetryDecider returns a function that decides whether
// to re-record event or not on given error.
func (r *recorder) newRetryDecider(event *corev1.Event) func(error) bool {
return func(err error) bool {
logger := r.logger.
WithField("event", event).
WithError(err)
var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) {
if apierrors.IsAlreadyExists(err) ||
apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
logger.Info("Server rejected event (will not retry!)")
return false
}
// Retry on other status errors
}
logger.Error("Unable to write event (may retry after backoff)")
return true
}
}
var (
_ record.EventSink = &sink{}
)
type sink struct {
ctx context.Context
client libClient.Client
}
func newSink(
ctx context.Context,
client libClient.Client,
) *sink {
return &sink{
ctx: ctx,
client: client,
}
}
func (e *sink) Create(event *corev1.Event) (*corev1.Event, error) {
err := e.client.Create(e.ctx, event)
return event, err
}
func (e *sink) Update(event *corev1.Event) (*corev1.Event, error) {
err := e.client.Update(e.ctx, event)
return event, err
}
func (e *sink) Patch(event *corev1.Event, data []byte) (*corev1.Event, error) {
err := e.client.Patch(e.ctx, event, libClient.RawPatch(types.StrategicMergePatchType, data))
return event, err
}