/
handler.go
112 lines (91 loc) · 3.34 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package errorshandler
import (
"encoding/json"
"fmt"
"time"
"github.com/codex-team/hawk.collector/pkg/accounts"
"github.com/codex-team/hawk.collector/pkg/broker"
"github.com/codex-team/hawk.collector/pkg/redis"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
const DefaultQueueName = "errors/default"
// Handler of error messages
type Handler struct {
Broker *broker.Broker
JwtSecret string
// Maximum POST body size in bytes for error messages
MaxErrorCatcherMessageSize int
ErrorsBlockedByLimit prometheus.Counter
ErrorsProcessed prometheus.Counter
RedisClient *redis.RedisClient
AccountsMongoDBClient *accounts.AccountsMongoDBClient
NonDefaultQueues map[string]bool
}
func (handler *Handler) process(body []byte) ResponseMessage {
// Check if the body is a valid JSON with the Message structure
message := CatcherMessage{}
err := json.Unmarshal(body, &message)
if err != nil {
return ResponseMessage{400, true, "Invalid JSON format"}
}
if len(message.Payload) == 0 {
return ResponseMessage{400, true, "Payload is empty"}
}
if message.Token == "" {
return ResponseMessage{400, true, "Token is empty"}
}
if message.CatcherType == "" {
return ResponseMessage{400, true, "CatcherType is empty"}
}
projectId, ok := handler.AccountsMongoDBClient.ValidTokens[message.Token]
if !ok {
log.Debugf("Token %s is not in the accounts cache", message.Token)
return ResponseMessage{400, true, fmt.Sprintf("Integration token invalid: %s", message.Token)}
}
log.Debugf("Found project with ID %s for integration token %s", projectId, message.Token)
if handler.RedisClient.IsBlocked(projectId) {
handler.ErrorsBlockedByLimit.Inc()
return ResponseMessage{402, true, "Project has exceeded the events limit"}
}
// Validate if message is a valid JSON
stringMessage := string(message.Payload)
if !gjson.Valid(stringMessage) {
return ResponseMessage{400, true, "Invalid payload JSON format"}
}
modifiedMessage, err := sjson.Set(stringMessage, "timestamp", time.Now().Unix())
if err != nil {
return ResponseMessage{400, true, fmt.Sprintf("%s", err)}
}
// convert message to JSON format
messageToSend := BrokerMessage{ProjectId: projectId, Payload: []byte(modifiedMessage), CatcherType: message.CatcherType}
rawMessage, err := json.Marshal(messageToSend)
if err != nil {
log.Errorf("Message marshalling error: %v", err)
return ResponseMessage{400, true, "Cannot encode message to JSON"}
}
// send serialized message to a broker
brokerMessage := broker.Message{Payload: rawMessage, Route: handler.determineQueue(message.CatcherType)}
log.Debugf("Send to queue: %s", brokerMessage)
handler.Broker.Chan <- brokerMessage
// increment processed errors counter
handler.ErrorsProcessed.Inc()
return ResponseMessage{200, false, "OK"}
}
// determineQueue - determine RabbitMQ route from catcherType
func (handler *Handler) determineQueue(catcherType string) string {
if _, ok := handler.NonDefaultQueues[catcherType]; ok {
return catcherType
}
return DefaultQueueName
}
// GetQueueCache - construct searching set from array of queue names
func GetQueueCache(nonDefaultQueues []string) map[string]bool {
cache := make(map[string]bool)
for _, queue := range nonDefaultQueues {
cache[fmt.Sprintf("errors/%s", queue)] = true
}
return cache
}