/
shard.go
200 lines (167 loc) · 5.3 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package leveldb_shard
import (
"bytes"
"encoding/binary"
"fmt"
"path/filepath"
"strings"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type LeveldbShard struct {
dbs []*leveldb.DB
dbCount uint32
}
var shardIdxKey = []byte("__DB_SHARED_INDEX__")
func NewLeveldbShard(savePath string, diskCount int, diskShards int) (shard *LeveldbShard, err error) {
shard = &LeveldbShard{
dbs: make([]*leveldb.DB, diskCount*diskShards),
dbCount: uint32(diskCount * diskShards),
}
// clean when error
defer func() {
if err != nil {
for _, db := range shard.dbs {
if db != nil {
_ = db.Close()
}
}
shard = nil
}
}()
levelDBOptions := &opt.Options{
OpenFilesCacheCapacity: 128,
WriteBuffer: 8 << 20, //8MB, max memory occupyv = 8*2*diskCount*diskShards
BlockCacheCapacity: 16 << 20, //16MB
Filter: filter.NewBloomFilter(8),
DisableSeeksCompaction: true,
}
// async open
wg := sync.WaitGroup{}
for i := 0; i < diskCount; i++ {
for j := 0; j < diskShards; j++ {
shardPath := filepath.Join(savePath, fmt.Sprintf("disk%02d", i), fmt.Sprintf("block%02d", j))
dbIndex := i*diskShards + j
wg.Add(1)
go func() {
defer wg.Done()
ldb, openErr := leveldb.OpenFile(shardPath, levelDBOptions)
if openErr != nil {
err = openErr
return
}
indexByte := make([]byte, 8)
binary.BigEndian.PutUint64(indexByte, uint64(dbIndex))
inDBIndex, getErr := ldb.Get(shardIdxKey, nil)
if getErr != nil {
if getErr == leveldb.ErrNotFound {
putErr := ldb.Put(shardIdxKey, indexByte, nil)
if putErr != nil {
err = putErr
return
}
} else {
err = getErr
return
}
} else if bytes.Compare(indexByte, inDBIndex) != 0 {
err = fmt.Errorf("db shard index error, need %v, got %v", indexByte, inDBIndex)
return
}
shard.dbs[dbIndex] = ldb
}()
}
}
wg.Wait()
return shard, err
}
func (l *LeveldbShard) mapDB(key []byte) *leveldb.DB {
return l.dbs[mapDBIndex(key, l.dbCount)]
}
// Has retrieves if a key is present in the key-value data store.
func (l *LeveldbShard) Has(key []byte) (bool, error) {
return l.mapDB(key).Has(key, nil)
}
// Get retrieves the given key if it's present in the key-value data store.
func (l *LeveldbShard) Get(key []byte) ([]byte, error) {
return l.mapDB(key).Get(key, nil)
}
// Put inserts the given value into the key-value data store.
func (l *LeveldbShard) Put(key []byte, value []byte) error {
return l.mapDB(key).Put(key, value, nil)
}
// Delete removes the key from the key-value data store.
func (l *LeveldbShard) Delete(key []byte) error {
return l.mapDB(key).Delete(key, nil)
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (l *LeveldbShard) NewBatch() ethdb.Batch {
return NewLeveldbShardBatch(l)
}
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the key-value database.
func (l *LeveldbShard) NewIterator() ethdb.Iterator {
return l.iterator(nil)
}
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (l *LeveldbShard) NewIteratorWithStart(start []byte) ethdb.Iterator {
return l.iterator(&util.Range{Start: start})
}
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
func (l *LeveldbShard) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
return l.iterator(util.BytesPrefix(prefix))
}
func (l *LeveldbShard) iterator(slice *util.Range) ethdb.Iterator {
iters := make([]iterator.Iterator, l.dbCount)
for i, db := range l.dbs {
iter := db.NewIterator(slice, nil)
iters[i] = iter
}
return iterator.NewMergedIterator(iters, comparer.DefaultComparer, true)
}
// Stat returns a particular internal stat of the database.
func (l *LeveldbShard) Stat(property string) (string, error) {
sb := strings.Builder{}
for i, db := range l.dbs {
getProperty, err := db.GetProperty(property)
if err != nil {
return "", err
}
sb.WriteString(fmt.Sprintf("=== shard %02d ===\n", i))
sb.WriteString(getProperty)
sb.WriteString("\n")
}
return sb.String(), nil
}
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (l *LeveldbShard) Compact(start []byte, limit []byte) (err error) {
return parallelRunAndReturnErr(int(l.dbCount), func(i int) error {
return l.dbs[i].CompactRange(util.Range{Start: start, Limit: limit})
})
}
// Close all the DB
func (l *LeveldbShard) Close() error {
for _, db := range l.dbs {
err := db.Close()
if err != nil {
return err
}
}
return nil
}