-
Notifications
You must be signed in to change notification settings - Fork 485
/
handler.go
134 lines (111 loc) · 3.6 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package receiver
import (
"crypto/subtle"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/go-kit/log"
"github.com/grafana/agent/component/faro/receiver/internal/payload"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"golang.org/x/time/rate"
)
const apiKeyHeader = "x-api-key"
type handler struct {
log log.Logger
rateLimiter *rate.Limiter
exporters []exporter
errorsTotal *prometheus.CounterVec
argsMut sync.RWMutex
args ServerArguments
cors *cors.Cors
}
var _ http.Handler = (*handler)(nil)
func newHandler(l log.Logger, reg prometheus.Registerer, exporters []exporter) *handler {
errorsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "faro_receiver_exporter_errors_total",
Help: "Total number of errors produced by a receiver exporter",
}, []string{"exporter"})
reg.MustRegister(errorsTotal)
return &handler{
log: l,
rateLimiter: rate.NewLimiter(rate.Inf, 0),
exporters: exporters,
errorsTotal: errorsTotal,
}
}
func (h *handler) Update(args ServerArguments) {
h.argsMut.Lock()
defer h.argsMut.Unlock()
h.args = args
if args.RateLimiting.Enabled {
// Updating the rate limit to time.Now() would immediately fill the
// buckets. To allow requsts to immediately pass through, we adjust the
// time to set the limit/burst to to allow for both the normal rate and
// burst to be filled.
t := time.Now().Add(-time.Duration(float64(time.Second) * args.RateLimiting.Rate * args.RateLimiting.BurstSize))
h.rateLimiter.SetLimitAt(t, rate.Limit(args.RateLimiting.Rate))
h.rateLimiter.SetBurstAt(t, int(args.RateLimiting.BurstSize))
} else {
// Set to infinite rate limit.
h.rateLimiter.SetLimit(rate.Inf)
h.rateLimiter.SetBurst(0) // 0 burst is ignored when using rate.Inf.
}
if len(args.CORSAllowedOrigins) > 0 {
h.cors = cors.New(cors.Options{
AllowedOrigins: args.CORSAllowedOrigins,
AllowedHeaders: []string{apiKeyHeader, "content-type", "x-faro-session-id"},
})
} else {
h.cors = nil // Disable cors.
}
}
func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
h.argsMut.RLock()
defer h.argsMut.RUnlock()
if h.cors != nil {
h.cors.ServeHTTP(rw, req, h.handleRequest)
} else {
h.handleRequest(rw, req)
}
}
func (h *handler) handleRequest(rw http.ResponseWriter, req *http.Request) {
if !h.rateLimiter.Allow() {
http.Error(rw, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
// If an API key is configured, ensure the request has a matching key.
if len(h.args.APIKey) > 0 {
apiHeader := req.Header.Get(apiKeyHeader)
if subtle.ConstantTimeCompare([]byte(apiHeader), []byte(h.args.APIKey)) != 1 {
http.Error(rw, "API key not provided or incorrect", http.StatusUnauthorized)
return
}
}
// Validate content length.
if h.args.MaxAllowedPayloadSize > 0 && req.ContentLength > int64(h.args.MaxAllowedPayloadSize) {
http.Error(rw, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
return
}
var p payload.Payload
if err := json.NewDecoder(req.Body).Decode(&p); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
var wg sync.WaitGroup
for _, exp := range h.exporters {
wg.Add(1)
go func(exp exporter) {
defer wg.Done()
if err := exp.Export(req.Context(), p); err != nil {
level.Error(h.log).Log("msg", "exporter failed with error", "exporter", exp.Name(), "err", err)
h.errorsTotal.WithLabelValues(exp.Name()).Inc()
}
}(exp)
}
wg.Wait()
rw.WriteHeader(http.StatusAccepted)
_, _ = rw.Write([]byte("ok"))
}