-
Notifications
You must be signed in to change notification settings - Fork 272
/
webhook.go
209 lines (178 loc) · 5.42 KB
/
webhook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package webhook
import (
"bytes"
"fmt"
"net/http"
"net/url"
"strings"
"text/template"
"time"
"github.com/AliyunContainerService/kube-eventer/common/filters"
"github.com/AliyunContainerService/kube-eventer/common/kubernetes"
"github.com/AliyunContainerService/kube-eventer/core"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)
const (
SinkName = "webHook"
Warning = "Warning"
Normal = "Normal"
)
var (
// body template of event
defaultBodyTemplate = `
{
"EventType": "{{ .Type }}",
"EventKind": "{{ .InvolvedObject.Kind }}"
"EventReason": "{{ .Reason }}",
"EventTime": "{{ .EventTime }}",
"EventMessage": "{{ .Message }}"
}`
)
type WebHookSink struct {
filters map[string]filters.Filter
headerMap map[string]string
endpoint string
method string
bodyTemplate string
bodyConfigMapName string
bodyConfigMapNamespace string
}
func (ws *WebHookSink) Name() string {
return SinkName
}
func (ws *WebHookSink) ExportEvents(batch *core.EventBatch) {
for _, event := range batch.Events {
err := ws.Send(event)
if err != nil {
klog.Warningf("Failed to send event to WebHook sink,because of %v", err)
}
time.Sleep(50 * time.Millisecond)
}
}
// send msg to generic webHook
func (ws *WebHookSink) Send(event *v1.Event) (err error) {
for _, v := range ws.filters {
if !v.Filter(event) {
return
}
}
body, err := ws.RenderBodyTemplate(event)
if err != nil {
klog.Errorf("Failed to RenderBodyTemplate,because of %v", err)
return err
}
req, err := http.NewRequest(ws.method, ws.endpoint, strings.NewReader(body))
// append header to http request
if ws.headerMap != nil && len(ws.headerMap) != 0 {
for k, v := range ws.headerMap {
req.Header.Set(k, v)
}
}
if err != nil {
klog.Errorf("Failed to create request,because of %v", err)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
klog.Errorf("Failed to send event to sink,because of %v", err)
return
}
defer resp.Body.Close()
if resp != nil && resp.StatusCode != http.StatusOK {
klog.Errorf("failed to send msg to sink, because the response code is %d", resp.StatusCode)
return
}
return nil
}
func (ws *WebHookSink) RenderBodyTemplate(event *v1.Event) (body string, err error) {
var tpl bytes.Buffer
tp, err := template.New("body").Parse(ws.bodyTemplate)
if err != nil {
klog.Errorf("Failed to parse template,because of %v", err)
return "", err
}
if err := tp.Execute(&tpl, event); err != nil {
klog.Errorf("Failed to renderTemplate,because of %v", err)
return "", err
}
return tpl.String(), nil
}
func (ws *WebHookSink) Stop() {
// not implement
return
}
// init WebHookSink with url params
func NewWebHookSink(uri *url.URL) (*WebHookSink, error) {
s := &WebHookSink{
// default http method
method: http.MethodGet,
bodyTemplate: defaultBodyTemplate,
filters: make(map[string]filters.Filter),
}
if len(uri.Host) > 0 {
s.endpoint = uri.String()
} else {
klog.Errorf("uri host's length is 0 and pls check your uri: %v", uri)
return nil, fmt.Errorf("uri host is not valid.url: %v", uri)
}
opts := uri.Query()
if len(opts["method"]) >= 1 {
s.method = opts["method"][0]
}
// set header of webHook
s.headerMap = parseHeaders(opts["header"])
level := Warning
if len(opts["level"]) >= 1 {
level = opts["level"][0]
}
s.filters["LevelFilter"] = filters.NewGenericFilter("Type", []string{level}, false)
// namespace filter doesn't support regexp
namespaces := filters.GetValues(opts["namespaces"])
s.filters["NamespacesFilter"] = filters.NewGenericFilter("Namespace", namespaces, false)
// such as node,pod,component and so on
// kinds:https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#lists-and-simple-kinds
kinds := filters.GetValues(opts["kinds"])
s.filters["KindsFilter"] = filters.NewGenericFilter("Kind", kinds, false)
// reason filter support regexp.
reasons := opts["reason"]
s.filters["ReasonsFilter"] = filters.NewGenericFilter("Reason", reasons, true)
if len(opts["custom_body_configmap"]) >= 1 {
s.bodyConfigMapName = opts["custom_body_configmap"][0]
if len(opts["custom_body_configmap_namespace"]) >= 1 {
s.bodyConfigMapNamespace = opts["custom_body_configmap_namespace"][0]
} else {
s.bodyConfigMapNamespace = "default"
}
client, err := kubernetes.GetKubernetesClient(nil)
if err != nil {
klog.Warningf("Failed to get kubernetes client and use default bodyTemplate instead")
s.bodyTemplate = defaultBodyTemplate
return s, nil
}
configmap, err := client.CoreV1().ConfigMaps(s.bodyConfigMapNamespace).Get(s.bodyConfigMapName, metav1.GetOptions{})
if err != nil {
klog.Warningf("Failed to get configMap %s in namespace %s and use default bodyTemplate instead,because of %v", s.bodyConfigMapName, s.bodyConfigMapNamespace, err)
s.bodyTemplate = defaultBodyTemplate
return s, nil
}
if content, ok := configmap.Data["content"]; !ok {
klog.Warningf("Failed to get configMap content and use default bodyTemplate instead,because of %v", err)
s.bodyTemplate = defaultBodyTemplate
return s, nil
} else {
s.bodyTemplate = content
}
}
return s, nil
}
func parseHeaders(headers []string) map[string]string {
headerMap := make(map[string]string)
for _, h := range headers {
if arr := strings.Split(h, "="); len(arr) == 2 {
headerMap[arr[0]] = arr[1]
}
}
return headerMap
}