-
Notifications
You must be signed in to change notification settings - Fork 11
/
batch.go
114 lines (95 loc) · 2.18 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package storage
import (
"github.com/deepfabric/beehive/pb"
"github.com/deepfabric/beehive/pb/raftcmdpb"
bhstorage "github.com/deepfabric/beehive/storage"
bhutil "github.com/deepfabric/beehive/util"
"github.com/deepfabric/busybee/pkg/pb/rpcpb"
"github.com/fagongzi/goetty"
"github.com/fagongzi/log"
)
const (
opAdd = iota
opRemove
opClear
opDel
opSet
)
type batchType interface {
support() []rpcpb.Type
addReq(*raftcmdpb.Request, *raftcmdpb.Response, *batch, map[string]interface{})
exec(bhstorage.DataStorage, *batch) error
reset()
}
type batch struct {
shard uint64
group uint64
writtenBytes uint64
changedBytes int64
bs *beeStorage
wb *bhutil.WriteBatch
keys []goetty.Slice
values [][]byte
types []batchType
fn map[rpcpb.Type]batchType
}
func newBatch(bs *beeStorage, types ...batchType) *batch {
b := &batch{
bs: bs,
types: types,
fn: make(map[rpcpb.Type]batchType),
wb: bhutil.NewWriteBatch(),
}
for _, tp := range types {
for _, t := range tp.support() {
b.fn[t] = tp
}
}
return b
}
func (b *batch) Add(shard uint64, req *raftcmdpb.Request, attrs map[string]interface{}) (bool, *raftcmdpb.Response, error) {
if b.shard != 0 && b.shard != shard {
log.Fatalf("BUG: diffent shard opts in a batch, %d, %d",
b.shard,
shard)
}
b.shard = shard
b.group = req.Group
resp := pb.AcquireResponse()
if tp, ok := b.fn[rpcpb.Type(req.CustemType)]; ok {
tp.addReq(req, resp, b, attrs)
return true, resp, nil
}
return false, nil, nil
}
func (b *batch) get(key []byte) ([]byte, error) {
return b.bs.getStoreByGroup(b.group).Get(key)
}
func (b *batch) Execute() (uint64, int64, error) {
s := b.bs.getStoreByGroup(b.group)
for _, tp := range b.types {
err := tp.exec(s, b)
if err != nil {
return 0, 0, err
}
}
for idx := range b.keys {
b.wb.Set(b.keys[idx].Data(), b.values[idx])
}
err := s.Write(b.wb, false)
if err != nil {
return 0, 0, err
}
return b.writtenBytes, b.changedBytes, err
}
func (b *batch) Reset() {
b.shard = 0
b.writtenBytes = 0
b.changedBytes = 0
b.keys = b.keys[:0]
b.values = b.values[:0]
b.wb.Reset()
for _, tp := range b.types {
tp.reset()
}
}