-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
pipe_filter.go
108 lines (88 loc) · 2.31 KB
/
pipe_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package logstorage
import (
"fmt"
"unsafe"
)
// pipeFilter processes '| filter ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe
type pipeFilter struct {
// f is a filter to apply to the written rows.
f filter
}
func (pf *pipeFilter) String() string {
return "filter " + pf.f.String()
}
func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
fs := newFieldsSet()
pf.f.updateNeededFields(fs)
for f := range fs {
unneededFields.remove(f)
}
} else {
pf.f.updateNeededFields(neededFields)
}
}
func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeFilterProcessorShard, workersCount)
pfp := &pipeFilterProcessor{
pf: pf,
ppBase: ppBase,
shards: shards,
}
return pfp
}
type pipeFilterProcessor struct {
pf *pipeFilter
ppBase pipeProcessor
shards []pipeFilterProcessorShard
}
type pipeFilterProcessorShard struct {
pipeFilterProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeFilterProcessorShardNopad{})%128]byte
}
type pipeFilterProcessorShardNopad struct {
br blockResult
bm bitmap
}
func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pfp.shards[workerID]
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
pfp.pf.f.applyToBlockResult(br, bm)
if bm.areAllBitsSet() {
// Fast path - the filter didn't filter out anything - send br to the base pipe as is.
pfp.ppBase.writeBlock(workerID, br)
return
}
if bm.isZero() {
// Nothing to send
return
}
// Slow path - copy the remaining rows from br to shard.br before sending them to base pipe.
shard.br.initFromFilterAllColumns(br, bm)
pfp.ppBase.writeBlock(workerID, &shard.br)
}
func (pfp *pipeFilterProcessor) flush() error {
return nil
}
func parsePipeFilter(lex *lexer) (*pipeFilter, error) {
if !lex.isKeyword("filter") {
return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
}
lex.nextToken()
f, err := parseFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'filter': %w", err)
}
pf := &pipeFilter{
f: f,
}
return pf, nil
}