forked from grafana/loki
/
limit.go
90 lines (77 loc) · 1.91 KB
/
limit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package stages
import (
"context"
"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)
const (
ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
)
var ratelimitDropReason = "ratelimit_drop_stage"
type LimitConfig struct {
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
}
func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &LimitConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateLimitConfig(cfg)
if err != nil {
return nil, err
}
r := &limitStage{
logger: log.With(logger, "component", "stage", "type", "limit"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
rateLimiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst),
}
return r, nil
}
func validateLimitConfig(cfg *LimitConfig) error {
if cfg.Rate <= 0 || cfg.Burst <= 0 {
return errors.Errorf(ErrLimitStageInvalidRateOrBurst)
}
return nil
}
// limitStage applies Label matchers to determine if the include stages should be run
type limitStage struct {
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
dropCount *prometheus.CounterVec
}
func (m *limitStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if !m.shouldThrottle() {
out <- e
continue
}
}
}()
return out
}
func (m *limitStage) shouldThrottle() bool {
if m.cfg.Drop {
if m.rateLimiter.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
return true
}
_ = m.rateLimiter.Wait(context.Background())
return false
}
// Name implements Stage
func (m *limitStage) Name() string {
return StageTypeLimit
}