-
Notifications
You must be signed in to change notification settings - Fork 11
/
loader_kv.go
executable file
·79 lines (66 loc) · 1.55 KB
/
loader_kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package crowd
import (
"fmt"
"github.com/RoaringBitmap/roaring"
"github.com/deepfabric/busybee/pkg/pb/rpcpb"
"github.com/deepfabric/busybee/pkg/storage"
"github.com/deepfabric/busybee/pkg/util"
"github.com/fagongzi/util/protoc"
)
const (
limit uint64 = 80000
)
type kvLoader struct {
store storage.Storage
}
// NewKVLoader returns a KV bitmap loader
func NewKVLoader(store storage.Storage) Loader {
return &kvLoader{
store: store,
}
}
func (l *kvLoader) Get(key []byte) (*roaring.Bitmap, error) {
bm := util.AcquireBitmap()
resp := rpcpb.AcquireUint32SliceResponse()
start := uint32(0)
total := uint64(0)
for {
req := rpcpb.AcquireBMRangeRequest()
req.Key = key
req.Start = start
req.Limit = limit
value, err := l.store.ExecCommand(req)
if err != nil {
return nil, err
}
resp.Reset()
protoc.MustUnmarshal(resp, value)
if len(resp.Values) == 0 {
break
}
total += uint64(len(resp.Values))
bm.AddMany(resp.Values)
start = bm.Maximum() + 1
}
rpcpb.ReleaseUint32SliceResponse(resp)
if total < kb {
logger.Infof("load %d crowd from KV with key<%s>, %d bytes",
bm.GetCardinality(),
string(key),
total)
} else if total < mb {
logger.Infof("load %d crowd from KV with key<%s>, %d KB",
bm.GetCardinality(),
string(key),
total/kb)
} else {
logger.Infof("load %d crowd from KV with key<%s>, %d MB",
bm.GetCardinality(),
string(key),
total/mb)
}
return bm, nil
}
func (l *kvLoader) Set(key []byte, data []byte) (uint64, uint32, error) {
return 0, 0, fmt.Errorf("KV loader not support Set")
}