forked from grafana/loki
/
validator.go
134 lines (114 loc) · 5.05 KB
/
validator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package distributor
import (
"errors"
"net/http"
"strings"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/frelon/loki/v2/pkg/logproto"
"github.com/frelon/loki/v2/pkg/validation"
)
const (
timeFormat = time.RFC3339
)
type Validator struct {
Limits
}
func NewValidator(l Limits) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
}
type validationContext struct {
rejectOldSample bool
rejectOldSampleMaxAge int64
creationGracePeriod int64
maxLineSize int
maxLineSizeTruncate bool
maxLabelNamesPerSeries int
maxLabelNameLength int
maxLabelValueLength int
userID string
}
func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext {
return validationContext{
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
}
}
// ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat)
if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}
if ts > ctx.creationGracePeriod {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}
if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}
return nil
}
// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error {
if len(ls) == 0 {
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc()
return httpgrpc.Errorf(http.StatusBadRequest, validation.MissingLabelsErrorMsg)
}
numLabelNames := len(ls)
if numLabelNames > ctx.maxLabelNamesPerSeries {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
}
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
}
lastLabelName = l.Name
}
return nil
}
func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes))
}