/
leveldb.go
147 lines (126 loc) · 3.58 KB
/
leveldb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package leveldb
import (
"github.com/iden3/go-merkletree-sql"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Storage implements the db.Storage interface
type Storage struct {
ldb *leveldb.DB
prefix []byte
}
// StorageTx implements the db.Tx interface
type StorageTx struct {
*Storage
cache merkletree.KvMap
}
// NewLevelDbStorage returns a new Storage
func NewLevelDbStorage(path string, errorIfMissing bool) (*Storage, error) {
o := &opt.Options{
ErrorIfMissing: errorIfMissing,
}
ldb, err := leveldb.OpenFile(path, o)
if err != nil {
return nil, err
}
return &Storage{ldb, []byte{}}, nil
}
// WithPrefix implements the method WithPrefix of the interface db.Storage
func (l *Storage) WithPrefix(prefix []byte) merkletree.Storage {
return &Storage{l.ldb, merkletree.Concat(l.prefix, prefix)}
}
// NewTx implements the method NewTx of the interface db.Storage
func (l *Storage) NewTx() (merkletree.Tx, error) {
return &StorageTx{l, make(merkletree.KvMap)}, nil
}
// Get retrieves a value from a key in the db.Storage
func (l *Storage) Get(key []byte) ([]byte, error) {
v, err := l.ldb.Get(merkletree.Concat(l.prefix, key[:]), nil)
if err == errors.ErrNotFound {
return nil, merkletree.ErrNotFound
}
return v, err
}
// Iterate implements the method Iterate of the interface db.Storage
func (l *Storage) Iterate(f func([]byte, []byte) (bool, error)) error {
// FIXME: Use the prefix!
snapshot, err := l.ldb.GetSnapshot()
if err != nil {
return err
}
iter := snapshot.NewIterator(util.BytesPrefix(l.prefix), nil)
defer iter.Release()
for iter.Next() {
localKey := iter.Key()[len(l.prefix):]
if cont, err := f(localKey, iter.Value()); err != nil {
return err
} else if !cont {
break
}
}
iter.Release()
return iter.Error()
}
// Get retreives a value from a key in the interface db.Tx
func (tx *StorageTx) Get(key []byte) ([]byte, error) {
var err error
fullkey := merkletree.Concat(tx.prefix, key)
if value, ok := tx.cache.Get(fullkey); ok {
return value, nil
}
value, err := tx.ldb.Get(fullkey, nil)
if err == errors.ErrNotFound {
return nil, merkletree.ErrNotFound
}
return value, err
}
// Put saves a key:value into the db.Storage
func (tx *StorageTx) Put(k, v []byte) error {
tx.cache.Put(merkletree.Concat(tx.prefix, k[:]), v)
return nil
}
// Add implements the method Add of the interface db.Tx
func (tx *StorageTx) Add(atx merkletree.Tx) error {
ldbtx := atx.(*StorageTx)
for _, v := range ldbtx.cache {
tx.cache.Put(v.K, v.V)
}
return nil
}
// Commit implements the method Commit of the interface db.Tx
func (tx *StorageTx) Commit() error {
var batch leveldb.Batch
for _, v := range tx.cache {
batch.Put(v.K, v.V)
}
tx.cache = nil
return tx.ldb.Write(&batch, nil)
}
// Close implements the method Close of the interface db.Tx
func (tx *StorageTx) Close() {
tx.cache = nil
}
// Close implements the method Close of the interface db.Storage
func (l *Storage) Close() {
if err := l.ldb.Close(); err != nil {
panic(err)
}
}
// LevelDB is an extra method that returns the *leveldb.DB
func (l *Storage) LevelDB() *leveldb.DB {
return l.ldb
}
// List implements the method List of the interface db.Storage
func (l *Storage) List(limit int) ([]merkletree.KV, error) {
ret := []merkletree.KV{}
err := l.Iterate(func(key []byte, value []byte) (bool, error) {
ret = append(ret, merkletree.KV{K: merkletree.Clone(key), V: merkletree.Clone(value)})
if len(ret) == limit {
return false, nil
}
return true, nil
})
return ret, err
}