-
Notifications
You must be signed in to change notification settings - Fork 2
/
shard_batch.go
119 lines (99 loc) · 2.64 KB
/
shard_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package leveldb_shard
import (
"runtime"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb"
)
var batchesPool = sync.Pool{
New: func() interface{} {
return &leveldb.Batch{}
},
}
type LeveldbShardBatch struct {
shard *LeveldbShard
batches []*leveldb.Batch
batchesCount uint32
}
func NewLeveldbShardBatch(shard *LeveldbShard) *LeveldbShardBatch {
shardBatch := &LeveldbShardBatch{
batches: make([]*leveldb.Batch, shard.dbCount),
batchesCount: shard.dbCount,
shard: shard,
}
for i := uint32(0); i < shard.dbCount; i++ {
shardBatch.batches[i] = batchesPool.Get().(*leveldb.Batch)
}
runtime.SetFinalizer(shardBatch, func(o *LeveldbShardBatch) {
for _, batch := range o.batches {
batch.Reset()
batchesPool.Put(batch)
}
o.batches = nil
})
return shardBatch
}
func (l *LeveldbShardBatch) mapBatch(key []byte) *leveldb.Batch {
return l.batches[mapDBIndex(key, l.batchesCount)]
}
// Put inserts the given value into the key-value data store.
func (l *LeveldbShardBatch) Put(key []byte, value []byte) error {
l.mapBatch(key).Put(key, value)
return nil
}
// Delete removes the key from the key-value data store.
func (l *LeveldbShardBatch) Delete(key []byte) error {
l.mapBatch(key).Delete(key)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (l *LeveldbShardBatch) ValueSize() int {
size := 0
for _, batch := range l.batches {
size += batch.Len()
}
return size
}
// Write flushes any accumulated data to disk.
func (l *LeveldbShardBatch) Write() (err error) {
return parallelRunAndReturnErr(int(l.batchesCount), func(i int) error {
return l.shard.dbs[i].Write(l.batches[i], nil)
})
}
// Reset resets the batch for reuse.
func (l *LeveldbShardBatch) Reset() {
for _, batch := range l.batches {
batch.Reset()
}
}
// Replay replays the batch contents.
func (l *LeveldbShardBatch) Replay(w ethdb.KeyValueWriter) error {
for _, batch := range l.batches {
err := batch.Replay(&replayer{writer: w})
if err != nil {
return err
}
}
return nil
}
// replayer is a small wrapper to implement the correct replay methods.
type replayer struct {
writer ethdb.KeyValueWriter
failure error
}
// Put inserts the given value into the key-value data store.
func (r *replayer) Put(key, value []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (r *replayer) Delete(key []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Delete(key)
}