-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
169 lines (140 loc) · 5.38 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package watcher
import (
"context"
"fmt"
"strconv"
"github.com/rancher/rancher/pkg/controllers/user/alert/common"
"github.com/rancher/rancher/pkg/controllers/user/alert/manager"
"github.com/rancher/rancher/pkg/controllers/user/workload"
"github.com/rancher/types/apis/core/v1"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/rancher/types/config"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
type EventWatcher struct {
eventLister v1.EventLister
clusterAlertRuleLister v3.ClusterAlertRuleLister
alertManager *manager.AlertManager
clusterName string
clusterLister v3.ClusterLister
workloadFetcher workloadFetcher
podLister v1.PodLister
}
func StartEventWatcher(ctx context.Context, cluster *config.UserContext, manager *manager.AlertManager) {
events := cluster.Core.Events("")
workloadFetcher := workloadFetcher{
workloadController: workload.NewWorkloadController(ctx, cluster.UserOnlyContext(), nil),
}
eventWatcher := &EventWatcher{
eventLister: events.Controller().Lister(),
clusterAlertRuleLister: cluster.Management.Management.ClusterAlertRules(cluster.ClusterName).Controller().Lister(),
alertManager: manager,
clusterName: cluster.ClusterName,
clusterLister: cluster.Management.Management.Clusters("").Controller().Lister(),
workloadFetcher: workloadFetcher,
podLister: cluster.Core.Pods(metav1.NamespaceAll).Controller().Lister(),
}
events.AddHandler(ctx, "cluster-event-alert-watcher", eventWatcher.Sync)
}
func (l *EventWatcher) Sync(key string, obj *corev1.Event) (runtime.Object, error) {
if l.alertManager.IsDeploy == false {
return nil, nil
}
if obj == nil {
return nil, nil
}
clusterAlerts, err := l.clusterAlertRuleLister.List("", labels.NewSelector())
if err != nil {
return nil, err
}
for _, alert := range clusterAlerts {
if alert.Status.AlertState == "inactive" || alert.Status.AlertState == "muted" || alert.Spec.EventRule == nil {
continue
}
if alert.Spec.EventRule.EventType == obj.Type && alert.Spec.EventRule.ResourceKind == obj.InvolvedObject.Kind {
ruleID := common.GetRuleID(alert.Spec.GroupName, alert.Name)
clusterDisplayName := common.GetClusterDisplayName(l.clusterName, l.clusterLister)
data := map[string]string{}
data["rule_id"] = ruleID
data["group_id"] = alert.Spec.GroupName
data["alert_name"] = alert.Spec.DisplayName
data["alert_type"] = "event"
data["event_type"] = alert.Spec.EventRule.EventType
data["resource_kind"] = alert.Spec.EventRule.ResourceKind
data["severity"] = alert.Spec.Severity
data["cluster_name"] = clusterDisplayName
data["target_name"] = obj.InvolvedObject.Name
data["target_namespace"] = obj.InvolvedObject.Namespace
data["event_count"] = strconv.Itoa(int(obj.Count))
data["event_message"] = obj.Message
data["event_firstseen"] = fmt.Sprintf("%s", obj.FirstTimestamp)
data["event_lastseen"] = fmt.Sprintf("%s", obj.LastTimestamp)
if alert.Spec.EventRule.ResourceKind == "Pod" || alert.Spec.EventRule.ResourceKind == "Deployment" || alert.Spec.EventRule.ResourceKind == "StatefulSet" || alert.Spec.EventRule.ResourceKind == "DaemonSet" {
workloadName, err := l.getWorkloadInfo(obj.InvolvedObject.Namespace, obj.InvolvedObject.Name, alert.Spec.EventRule.ResourceKind)
if err != nil {
errors.Wrap(err, "failed to fetch workload info")
}
if workloadName != "" {
data["workload_name"] = workloadName
}
}
if err := l.alertManager.SendAlert(data); err != nil {
logrus.Errorf("Failed to send alert: %v", err)
}
}
}
return nil, nil
}
func (l *EventWatcher) getWorkloadInfo(namespace, name, kind string) (string, error) {
if kind == "Pod" {
pod, err := l.podLister.Get(namespace, name)
if err != nil {
return "", errors.Wrapf(err, "failed to get pod %s:%s", namespace, name)
}
if len(pod.OwnerReferences) == 0 {
return pod.Name, nil
}
ownerRef := pod.OwnerReferences[0]
name = ownerRef.Name
kind = ownerRef.Kind
}
workloadName, err := l.workloadFetcher.getWorkloadName(namespace, name, kind)
if err != nil {
return "", errors.Wrap(err, "Failed to get workload info for alert")
}
return workloadName, nil
}
type workloadFetcher struct {
workloadController workload.CommonController
}
func (w *workloadFetcher) getWorkloadName(namespace, name, kind string) (string, error) {
if kind == "Deployment" || kind == "StatefulSet" || kind == "DaemonSet" || kind == "CronJob" {
return name, nil
}
workloadID := fmt.Sprintf("%s:%s:%s", kind, namespace, name)
workload, err := w.workloadController.GetByWorkloadID(workloadID)
if err != nil {
return "", errors.Wrapf(err, "get workload %s failed", workloadID)
}
allRef := workload.OwnerReferences
if len(allRef) == 0 {
return name, nil
}
ref := allRef[0]
refName := ref.Name
refKind := ref.Kind
if kind == "Job" && refKind != "CronJob" {
return name, nil
}
refWorkloadID := fmt.Sprintf("%s:%s:%s", refKind, namespace, refName)
refWorkload, err := w.workloadController.GetByWorkloadID(refWorkloadID)
if err != nil {
return "", errors.Wrapf(err, "get workload %s failed", workloadID)
}
return w.getWorkloadName(refWorkload.Namespace, refWorkload.Name, refWorkload.Kind)
}