forked from grafana/loki
/
limit.go
145 lines (124 loc) · 4 KB
/
limit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package stages
import (
"context"
"fmt"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/util"
"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
const (
ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
ErrLimitStageByLabelMustDrop = "When ratelimiting by label, drop must be true"
MinReasonableMaxDistinctLabels = 10000 // 80bytes per rate.Limiter ~ 1MiB memory
)
var ratelimitDropReason = "ratelimit_drop_stage"
type LimitConfig struct {
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
ByLabelName string `mapstructure:"by_label_name"`
MaxDistinctLabels int `mapstructure:"max_distinct_labels"`
}
func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &LimitConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateLimitConfig(cfg)
if err != nil {
return nil, err
}
logger = log.With(logger, "component", "stage", "type", "limit")
if cfg.ByLabelName != "" && cfg.MaxDistinctLabels < MinReasonableMaxDistinctLabels {
level.Warn(logger).Log(
"msg",
fmt.Sprintf("max_distinct_labels was adjusted up to the minimal reasonable value of %d", MinReasonableMaxDistinctLabels),
)
cfg.MaxDistinctLabels = MinReasonableMaxDistinctLabels
}
r := &limitStage{
logger: logger,
cfg: cfg,
dropCount: getDropCountMetric(registerer),
}
if cfg.ByLabelName != "" {
r.dropCountByLabel = getDropCountByLabelMetric(registerer)
newRateLimiter := func() *rate.Limiter { return rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst) }
gcCb := func() { r.dropCountByLabel.Reset() }
r.rateLimiterByLabel = util.NewGenMap[model.LabelValue, *rate.Limiter](cfg.MaxDistinctLabels, newRateLimiter, gcCb)
} else {
r.rateLimiter = rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst)
}
return r, nil
}
func validateLimitConfig(cfg *LimitConfig) error {
if cfg.Rate <= 0 || cfg.Burst <= 0 {
return errors.Errorf(ErrLimitStageInvalidRateOrBurst)
}
if cfg.ByLabelName != "" && !cfg.Drop {
return errors.Errorf(ErrLimitStageByLabelMustDrop)
}
return nil
}
// limitStage applies Label matchers to determine if the include stages should be run
type limitStage struct {
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
rateLimiterByLabel util.GenerationalMap[model.LabelValue, *rate.Limiter]
dropCount *prometheus.CounterVec
dropCountByLabel *prometheus.CounterVec
byLabelName model.LabelName
}
func (m *limitStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if !m.shouldThrottle(e.Labels) {
out <- e
continue
}
}
}()
return out
}
func (m *limitStage) shouldThrottle(labels model.LabelSet) bool {
if m.cfg.ByLabelName != "" {
labelValue, ok := labels[model.LabelName(m.cfg.ByLabelName)]
if !ok {
return false // if no label found, dont ratelimit
}
rl := m.rateLimiterByLabel.GetOrCreate(labelValue)
if rl.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
m.dropCountByLabel.WithLabelValues(m.cfg.ByLabelName, string(labelValue)).Inc()
return true
}
if m.cfg.Drop {
if m.rateLimiter.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
return true
}
_ = m.rateLimiter.Wait(context.Background())
return false
}
// Name implements Stage
func (m *limitStage) Name() string {
return StageTypeLimit
}
func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
[]string{"label_name", "label_value"})
}