-
Notifications
You must be signed in to change notification settings - Fork 25
/
notifiersink.go
111 lines (87 loc) · 3.39 KB
/
notifiersink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.
package eventbroadcaster
import (
"context"
"fmt"
"github.com/DataDog/chaos-controller/api/v1beta1"
"github.com/DataDog/chaos-controller/eventnotifier"
notifTypes "github.com/DataDog/chaos-controller/eventnotifier/types"
"go.uber.org/zap"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type NotifierSink struct {
client client.Client
notifier eventnotifier.Notifier
logger *zap.SugaredLogger
}
// RegisterNotifierSinks builds notifiers sinks and registers them on the given broadcaster
func RegisterNotifierSinks(mgr ctrl.Manager, broadcaster record.EventBroadcaster, notifiersConfig eventnotifier.NotifiersConfig, logger *zap.SugaredLogger) (err error) {
client := mgr.GetClient()
notifiers, err := eventnotifier.GetNotifiers(notifiersConfig, logger)
for _, notifier := range notifiers {
logger.Infof("notifier %s enabled", notifier.GetNotifierName())
broadcaster.StartRecordingToSink(&NotifierSink{client: client, notifier: notifier, logger: logger})
}
corev1Client, _ := corev1client.NewForConfig(mgr.GetConfig())
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: corev1Client.Events("")})
return
}
func (s *NotifierSink) Create(event *corev1.Event) (*corev1.Event, error) {
dis, err := s.getDisruption(event)
if err != nil {
return event, nil
}
if err = s.parseEventToNotifier(event, dis); err != nil {
s.logger.Error(err)
return event, nil
}
return event, nil
}
func (s *NotifierSink) Update(event *corev1.Event) (*corev1.Event, error) {
return event, nil
}
func (s *NotifierSink) Patch(oldEvent *corev1.Event, data []byte) (*corev1.Event, error) {
return oldEvent, nil
}
// getDisruption fetches the disruption object of the event from the controller-runtime client
func (s *NotifierSink) getDisruption(event *corev1.Event) (v1beta1.Disruption, error) {
dis := v1beta1.Disruption{}
if event.InvolvedObject.Kind != v1beta1.DisruptionKind {
return v1beta1.Disruption{}, fmt.Errorf("eventnotifier: not a disruption")
}
if err := s.client.Get(context.Background(), types.NamespacedName{Namespace: event.InvolvedObject.Namespace, Name: event.InvolvedObject.Name}, &dis); err != nil {
return v1beta1.Disruption{}, err
}
return dis, nil
}
// parseEventToNotifier contains the event parsing and notification logic
func (s *NotifierSink) parseEventToNotifier(event *corev1.Event, dis v1beta1.Disruption) (err error) {
switch event.Type {
case corev1.EventTypeWarning:
err = s.notifier.Notify(dis, *event, notifTypes.NotificationWarning)
case corev1.EventTypeNormal:
if v1beta1.IsNotifiableEvent(*event) {
switch {
case v1beta1.IsRecoveryEvent(*event):
err = s.notifier.Notify(dis, *event, notifTypes.NotificationSuccess)
case v1beta1.IsCompletionEvent(*event):
err = s.notifier.Notify(dis, *event, notifTypes.NotificationCompletion)
default:
err = s.notifier.Notify(dis, *event, notifTypes.NotificationInfo)
}
} else {
err = nil
}
default:
err = fmt.Errorf("notifier: not a notifiable event")
}
return
}