-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
filter_and.go
127 lines (112 loc) · 2.62 KB
/
filter_and.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package logstorage
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// filterAnd contains filters joined by AND opertor.
//
// It is expressed as `f1 AND f2 ... AND fN` in LogsQL.
type filterAnd struct {
filters []filter
msgTokensOnce sync.Once
msgTokens []string
}
func (fa *filterAnd) String() string {
filters := fa.filters
a := make([]string, len(filters))
for i, f := range filters {
s := f.String()
switch f.(type) {
case *filterOr:
s = "(" + s + ")"
}
a[i] = s
}
return strings.Join(a, " ")
}
func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) {
if !fa.matchMessageBloomFilter(bs) {
// Fast path - fa doesn't match _msg bloom filter.
bm.resetBits()
return
}
// Slow path - verify every filter separately.
for _, f := range fa.filters {
f.apply(bs, bm)
if bm.isZero() {
// Shortcut - there is no need in applying the remaining filters,
// since the result will be zero anyway.
return
}
}
}
func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool {
tokens := fa.getMessageTokens()
if len(tokens) == 0 {
return true
}
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
return matchStringByAllTokens(v, tokens)
}
ch := bs.csh.getColumnHeader("_msg")
if ch == nil {
return false
}
if ch.valueType == valueTypeDict {
return matchDictValuesByAllTokens(ch.valuesDict.values, tokens)
}
return matchBloomFilterAllTokens(bs, ch, tokens)
}
func (fa *filterAnd) getMessageTokens() []string {
fa.msgTokensOnce.Do(fa.initMsgTokens)
return fa.msgTokens
}
func (fa *filterAnd) initMsgTokens() {
var a []string
for _, f := range fa.filters {
switch t := f.(type) {
case *filterPhrase:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
case *filterSequence:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
case *filterExact:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
case *filterExactPrefix:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
case *filterPrefix:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
}
}
fa.msgTokens = a
}
func matchStringByAllTokens(v string, tokens []string) bool {
for _, token := range tokens {
if !matchPhrase(v, token) {
return false
}
}
return true
}
func matchDictValuesByAllTokens(dictValues, tokens []string) bool {
bb := bbPool.Get()
for _, v := range dictValues {
bb.B = append(bb.B, v...)
bb.B = append(bb.B, ',')
}
v := bytesutil.ToUnsafeString(bb.B)
ok := matchStringByAllTokens(v, tokens)
bbPool.Put(bb)
return ok
}