-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
bloomfilter.go
170 lines (153 loc) · 3.97 KB
/
bloomfilter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package logstorage
import (
"fmt"
"sync"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// bloomFilterHashesCount is the number of different hashes to use for bloom filter.
const bloomFilterHashesCount = 6
// bloomFilterBitsPerItem is the number of bits to use per each token.
const bloomFilterBitsPerItem = 16
// bloomFilterMarshal appends marshaled bloom filter for tokens to dst and returns the result.
func bloomFilterMarshal(dst []byte, tokens []string) []byte {
bf := getBloomFilter()
bf.mustInit(tokens)
dst = bf.marshal(dst)
putBloomFilter(bf)
return dst
}
type bloomFilter struct {
bits []uint64
}
func (bf *bloomFilter) reset() {
bits := bf.bits
for i := range bits {
bits[i] = 0
}
bf.bits = bits[:0]
}
// marshal appends marshaled bf to dst and returns the result.
func (bf *bloomFilter) marshal(dst []byte) []byte {
bits := bf.bits
for _, word := range bits {
dst = encoding.MarshalUint64(dst, word)
}
return dst
}
// unmarshal unmarshals bf from src.
func (bf *bloomFilter) unmarshal(src []byte) error {
if len(src)%8 != 0 {
return fmt.Errorf("cannot unmarshal bloomFilter from src with size not multiple by 8; len(src)=%d", len(src))
}
bf.reset()
wordsCount := len(src) / 8
bits := slicesutil.SetLength(bf.bits, wordsCount)
for i := range bits {
bits[i] = encoding.UnmarshalUint64(src)
src = src[8:]
}
bf.bits = bits
return nil
}
// mustInit initializes bf with the given tokens
func (bf *bloomFilter) mustInit(tokens []string) {
bitsCount := len(tokens) * bloomFilterBitsPerItem
wordsCount := (bitsCount + 63) / 64
bits := slicesutil.SetLength(bf.bits, wordsCount)
bloomFilterAdd(bits, tokens)
bf.bits = bits
}
// bloomFilterAdd adds the given tokens to the bloom filter bits
func bloomFilterAdd(bits []uint64, tokens []string) {
maxBits := uint64(len(bits)) * 64
var buf [8]byte
hp := (*uint64)(unsafe.Pointer(&buf[0]))
for _, token := range tokens {
*hp = xxhash.Sum64(bytesutil.ToUnsafeBytes(token))
for i := 0; i < bloomFilterHashesCount; i++ {
hi := xxhash.Sum64(buf[:])
(*hp)++
idx := hi % maxBits
i := idx / 64
j := idx % 64
mask := uint64(1) << j
w := bits[i]
if (w & mask) == 0 {
bits[i] = w | mask
}
}
}
}
// containsAll returns true if bf contains all the given tokens.
func (bf *bloomFilter) containsAll(tokens []string) bool {
bits := bf.bits
if len(bits) == 0 {
return true
}
maxBits := uint64(len(bits)) * 64
var buf [8]byte
hp := (*uint64)(unsafe.Pointer(&buf[0]))
for _, token := range tokens {
*hp = xxhash.Sum64(bytesutil.ToUnsafeBytes(token))
for i := 0; i < bloomFilterHashesCount; i++ {
hi := xxhash.Sum64(buf[:])
(*hp)++
idx := hi % maxBits
i := idx / 64
j := idx % 64
mask := uint64(1) << j
w := bits[i]
if (w & mask) == 0 {
// The token is missing
return false
}
}
}
return true
}
// containsAny returns true if bf contains at least a single token from the given tokens.
func (bf *bloomFilter) containsAny(tokens []string) bool {
bits := bf.bits
if len(bits) == 0 {
return true
}
maxBits := uint64(len(bits)) * 64
var buf [8]byte
hp := (*uint64)(unsafe.Pointer(&buf[0]))
nextToken:
for _, token := range tokens {
*hp = xxhash.Sum64(bytesutil.ToUnsafeBytes(token))
for i := 0; i < bloomFilterHashesCount; i++ {
hi := xxhash.Sum64(buf[:])
(*hp)++
idx := hi % maxBits
i := idx / 64
j := idx % 64
mask := uint64(1) << j
w := bits[i]
if (w & mask) == 0 {
// The token is missing. Check the next token
continue nextToken
}
}
// It is likely the token exists in the bloom filter
return true
}
return false
}
func getBloomFilter() *bloomFilter {
v := bloomFilterPool.Get()
if v == nil {
return &bloomFilter{}
}
return v.(*bloomFilter)
}
func putBloomFilter(bf *bloomFilter) {
bf.reset()
bloomFilterPool.Put(bf)
}
var bloomFilterPool sync.Pool