forked from Fantom-foundation/lachesis-base
/
leveldb.go
248 lines (210 loc) · 6.46 KB
/
leveldb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
//go:build !js
// +build !js
// Package leveldb implements the key-value database layer based on LevelDB.
package leveldb
import (
"sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/frenchie-foundation/lachesis-base/kvdb"
)
const (
// minCache is the minimum amount of memory in bytes to allocate to leveldb
// read and write caching, split half and half.
minCache = opt.KiB
// minHandles is the minimum number of files handles to allocate to the open
// database files.
minHandles = 16
)
// Database is a persistent key-value store. Apart from basic data storage
// functionality it also supports batch writes and iterating over the keyspace in
// binary-alphabetical order.
type Database struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
quitLock sync.Mutex // Mutex protecting the quit channel access
onClose func() error
onDrop func()
}
// New returns a wrapped LevelDB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(path string, cache int, handles int, close func() error, drop func()) (*Database, error) {
// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
}
if handles < minHandles {
handles = minHandles
}
// Open the db and recover any potential corruptions
db, err := leveldb.OpenFile(path, &opt.Options{
OpenFilesCacheCapacity: handles,
BlockCacheCapacity: cache / 2,
WriteBuffer: cache / 4, // Two of these are used internally
Filter: filter.NewBloomFilter(10),
})
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
// Assemble the wrapper with all the registered metrics
ldb := &Database{
fn: path,
db: db,
}
ldb.onClose = close
ldb.onDrop = drop
return ldb, nil
}
// Close stops the metrics collection, flushes any pending data to disk and closes
// all io accesses to the underlying key-value store.
func (db *Database) Close() error {
db.quitLock.Lock()
defer db.quitLock.Unlock()
if db.db == nil {
panic("already closed")
}
ldb := db.db
db.db = nil
if db.onClose != nil {
if err := db.onClose(); err != nil {
return err
}
db.onClose = nil
}
if err := ldb.Close(); err != nil {
return err
}
return nil
}
// Drop whole database.
func (db *Database) Drop() {
if db.db != nil {
panic("Close database first!")
}
if db.onDrop != nil {
db.onDrop()
}
}
// Has retrieves if a key is present in the key-value store.
func (db *Database) Has(key []byte) (bool, error) {
return db.db.Has(key, nil)
}
// get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
dat, err := db.db.Get(key, nil)
if err != nil && err == leveldb.ErrNotFound {
return nil, nil
}
return dat, err
}
// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
return db.db.Put(key, value, nil)
}
// Delete removes the key from the key-value store.
func (db *Database) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *Database) NewBatch() kvdb.Batch {
return &batch{
db: db.db,
b: new(leveldb.Batch),
}
}
// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (db *Database) NewIterator(prefix []byte, start []byte) kvdb.Iterator {
return db.db.NewIterator(bytesPrefixRange(prefix, start), nil)
}
// Stat returns a particular internal stat of the database.
func (db *Database) Stat(property string) (string, error) {
return db.db.GetProperty(property)
}
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (db *Database) Compact(start []byte, limit []byte) error {
return db.db.CompactRange(util.Range{Start: start, Limit: limit})
}
// Path returns the path to the database directory.
func (db *Database) Path() string {
return db.fn
}
// batch is a write-only leveldb batch that commits changes to its host database
// when Write is called. A batch cannot be used concurrently.
type batch struct {
db *leveldb.DB
b *leveldb.Batch
size int
}
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.b.Put(key, value)
b.size += len(value)
return nil
}
// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.b.Delete(key)
b.size++
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
}
// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
return b.db.Write(b.b, nil)
}
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.b.Reset()
b.size = 0
}
// Replay replays the batch contents.
func (b *batch) Replay(w kvdb.Writer) error {
return b.b.Replay(&replayer{writer: w})
}
// replayer is a small wrapper to implement the correct replay methods.
type replayer struct {
writer kvdb.Writer
failure error
}
// Put inserts the given value into the key-value data store.
func (r *replayer) Put(key, value []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (r *replayer) Delete(key []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Delete(key)
}
// bytesPrefixRange returns key range that satisfy
// - the given prefix, and
// - the given seek position
func bytesPrefixRange(prefix, start []byte) *util.Range {
r := util.BytesPrefix(prefix)
r.Start = append(r.Start, start...)
return r
}