-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
stream_filter.go
90 lines (76 loc) · 2.17 KB
/
stream_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package logstorage
import (
"strconv"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
)
// StreamFilter is a filter for streams, e.g. `_stream:{...}`
type StreamFilter struct {
orFilters []*andStreamFilter
}
func (sf *StreamFilter) isEmpty() bool {
for _, af := range sf.orFilters {
if len(af.tagFilters) > 0 {
return false
}
}
return true
}
func (sf *StreamFilter) marshalForCacheKey(dst []byte) []byte {
dst = encoding.MarshalVarUint64(dst, uint64(len(sf.orFilters)))
for _, af := range sf.orFilters {
dst = encoding.MarshalVarUint64(dst, uint64(len(af.tagFilters)))
for _, f := range af.tagFilters {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.tagName))
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.op))
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.value))
}
}
return dst
}
func (sf *StreamFilter) String() string {
a := make([]string, len(sf.orFilters))
for i := range a {
a[i] = sf.orFilters[i].String()
}
return "{" + strings.Join(a, " or ") + "}"
}
type andStreamFilter struct {
tagFilters []*streamTagFilter
}
func (af *andStreamFilter) String() string {
a := make([]string, len(af.tagFilters))
for i := range a {
a[i] = af.tagFilters[i].String()
}
return strings.Join(a, ",")
}
// streamTagFilter is a filter for `tagName op value`
type streamTagFilter struct {
// tagName is the name for the tag to filter
tagName string
// op is operation such as `=`, `!=`, `=~` or `!~`
op string
// value is the value
value string
regexpOnce sync.Once
regexp *regexutil.PromRegex
}
func (tf *streamTagFilter) getRegexp() *regexutil.PromRegex {
tf.regexpOnce.Do(tf.initRegexp)
return tf.regexp
}
func (tf *streamTagFilter) initRegexp() {
re, err := regexutil.NewPromRegex(tf.value)
if err != nil {
logger.Panicf("BUG: cannot parse regexp %q: %s", tf.value, err)
}
tf.regexp = re
}
func (tf *streamTagFilter) String() string {
return quoteTokenIfNeeded(tf.tagName) + tf.op + strconv.Quote(tf.value)
}