-
Notifications
You must be signed in to change notification settings - Fork 713
/
http-events-handler.go
82 lines (68 loc) · 2.9 KB
/
http-events-handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sensors
import (
"fmt"
"io/ioutil"
"net/http"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor"
)
// HttpEventProtocol handles events sent over HTTP
func (sec *sensorExecutionCtx) HttpEventProtocol() {
// create a http server. this server listens for events from gateway.
sec.server = &http.Server{Addr: fmt.Sprintf(":%s", sec.sensor.Spec.EventProtocol.Http.Port)}
// add a handler to handle incoming events
http.HandleFunc("/", sec.httpEventHandler)
sec.log.WithField(common.LabelPort, sec.sensor.Spec.EventProtocol.Http.Port).Info("sensor started listening")
if err := sec.server.ListenAndServe(); err != nil {
sec.log.WithError(err).Error("sensor server stopped")
// escalate error
labels := map[string]string{
common.LabelEventType: string(common.EscalationEventType),
common.LabelSensorName: sec.sensor.Name,
common.LabelOperation: "server_shutdown",
}
if err := common.GenerateK8sEvent(sec.kubeClient, fmt.Sprintf("sensor server stopped"), common.EscalationEventType,
"server shutdown", sec.sensor.Name, sec.sensor.Namespace, sec.controllerInstanceID, sensor.Kind, labels); err != nil {
sec.log.WithError(err).Error("failed to create K8s event to log server shutdown error")
}
}
}
// Handles events received from gateways sent over http
func (sec *sensorExecutionCtx) httpEventHandler(w http.ResponseWriter, r *http.Request) {
var response string
sec.log.Info("received an event from gateway")
body, err := ioutil.ReadAll(r.Body)
if err != nil {
response = "failed to read request body"
sec.log.WithError(err).Error(response)
common.SendErrorResponse(w, response)
}
event, err := sec.parseEvent(body)
if err != nil {
response = "failed to parse request into event"
sec.log.WithError(err).Error(response)
common.SendErrorResponse(w, response)
}
// validate whether the event is from gateway that this sensor is watching and send event over internal queue if valid
if sec.sendEventToInternalQueue(event, w) {
response = "message successfully sent over internal queue"
sec.log.WithField(common.LabelEventSource, event.Context.Source.Host).Info(response)
common.SendSuccessResponse(w, response)
return
}
response = "event is from unknown source"
sec.log.WithField(common.LabelEventSource, event.Context.Source.Host).Warn(response)
common.SendErrorResponse(w, response)
}