/
wal_database.go
115 lines (101 loc) · 2.99 KB
/
wal_database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package wal
import (
"github.com/PhoenixGlobal/Phoenix-Chain-SDK/libs/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type IWALDatabase interface {
Put(key []byte, value []byte, wo *opt.WriteOptions) error
Delete(key []byte) error
Get(key []byte) ([]byte, error)
Has(key []byte) (bool, error)
NewIterator(key []byte, wo *opt.ReadOptions) iterator.Iterator
Close()
}
type WALDatabase struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
log log.Logger // Contextual logger tracking the database path
}
func createWalDB(file string) (IWALDatabase, error) {
if file == "" {
return nil, errors.New("create waldb error,file is empty")
}
db, err := openDatabase(file)
if err != nil {
return nil, err
}
return db, nil
}
func openDatabase(file string) (IWALDatabase, error) {
db, err := newWALDatabase(file, 0, 0)
if err != nil {
return nil, err
}
return db, nil
}
func newWALDatabase(file string, cache int, handles int) (*WALDatabase, error) {
logger := log.New("Wal_database", file)
// Ensure we have some minimal caching and file guarantees
if cache < 16 {
cache = 16
}
if handles < 16 {
handles = 16
}
logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)
// Open the db and recover any potential corruptions
db, err := leveldb.OpenFile(file, &opt.Options{
OpenFilesCacheCapacity: handles,
BlockCacheCapacity: cache / 2 * opt.MiB,
WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally
Filter: filter.NewBloomFilter(10),
})
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
db, err = leveldb.RecoverFile(file, nil)
}
// (Re)check for errors and abort if opening of the db failed
if err != nil {
return nil, err
}
return &WALDatabase{
fn: file,
db: db,
log: logger,
}, nil
}
// Put puts the given key / value to the queue
func (db *WALDatabase) Put(key []byte, value []byte, wo *opt.WriteOptions) error {
return db.db.Put(key, value, wo)
}
func (db *WALDatabase) Has(key []byte) (bool, error) {
return db.db.Has(key, nil)
}
func (db *WALDatabase) NewIterator(key []byte, wo *opt.ReadOptions) iterator.Iterator {
return db.db.NewIterator(util.BytesPrefix(key), wo)
}
// Get returns the given key if it's present.
func (db *WALDatabase) Get(key []byte) ([]byte, error) {
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
}
return dat, nil
}
// Delete deletes the key from the queue and database
func (db *WALDatabase) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
func (db *WALDatabase) Close() {
// Stop the metrics collection to avoid internal database races
err := db.db.Close()
if err == nil {
db.log.Info("Database closed")
} else {
db.log.Error("Failed to close database", "err", err)
}
}