-
Notifications
You must be signed in to change notification settings - Fork 1
/
mpt.go
178 lines (159 loc) · 4.88 KB
/
mpt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mpt
import (
"github.com/assetcloud/chain/common"
clog "github.com/assetcloud/chain/common/log"
log "github.com/assetcloud/chain/common/log/log15"
"github.com/assetcloud/chain/queue"
drivers "github.com/assetcloud/chain/system/store"
"github.com/assetcloud/chain/types"
mpt "github.com/assetcloud/plugin/plugin/store/mpt/db"
lru "github.com/hashicorp/golang-lru"
)
var mlog = log.New("module", "mpt")
// SetLogLevel set log level
func SetLogLevel(level string) {
clog.SetLogLevel(level)
}
// DisableLog disable log
func DisableLog() {
mlog.SetHandler(log.DiscardHandler())
}
// Store mpt store struct
type Store struct {
*drivers.BaseStore
trees map[string]*mpt.TrieEx
cache *lru.Cache
}
func init() {
drivers.Reg("mpt", New)
}
// New new mpt store module
func New(cfg *types.Store, sub []byte, chaincfg *types.ChainConfig) queue.Module {
bs := drivers.NewBaseStore(cfg)
mpts := &Store{bs, make(map[string]*mpt.TrieEx), nil}
mpts.cache, _ = lru.New(10)
bs.SetChild(mpts)
return mpts
}
// Close close mpt store
func (mpts *Store) Close() {
mpts.BaseStore.Close()
mlog.Info("store mavl closed")
}
// Set set k v to mpt store db; sync is true represent write sync
func (mpts *Store) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
hash, err := mpt.SetKVPair(mpts.GetDB(), datas, sync)
if err != nil {
mlog.Error("mpt store error", "err", err)
return nil, err
}
return hash, nil
}
// Get get values by keys
func (mpts *Store) Get(datas *types.StoreGet) [][]byte {
var tree *mpt.TrieEx
var err error
values := make([][]byte, len(datas.Keys))
search := string(datas.StateHash)
if data, ok := mpts.cache.Get(search); ok {
tree = data.(*mpt.TrieEx)
} else if data, ok := mpts.trees[search]; ok {
tree = data
} else {
tree, err = mpt.NewEx(common.BytesToHash(datas.StateHash), mpt.NewDatabase(mpts.GetDB()))
if nil != err {
mlog.Error("Store get can not find a trie")
}
if nil == err {
mpts.cache.Add(search, tree)
}
mlog.Debug("store mpt get tree", "err", err, "StateHash", common.ToHex(datas.StateHash))
}
if err == nil {
for i := 0; i < len(datas.Keys); i++ {
value, err := tree.TryGet(datas.Keys[i])
if nil == err {
values[i] = value
}
}
}
return values
}
// MemSet set keys values to memcory mpt, return root hash and error
func (mpts *Store) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
var err error
var tree *mpt.TrieEx
tree, err = mpt.NewEx(common.BytesToHash(datas.StateHash), mpt.NewDatabase(mpts.GetDB()))
if err != nil {
mlog.Info("MemSet create a new trie", "err", err)
return nil, err
}
for i := 0; i < len(datas.KV); i++ {
tree.Update(datas.KV[i].Key, datas.KV[i].Value)
}
root, err := tree.Commit(nil)
if err != nil {
mlog.Error("MemSet Commit to memory trie fail")
return nil, err
}
hash := root[:]
mpts.trees[string(hash)] = tree
if len(mpts.trees) > 1000 {
mlog.Error("too many trees in cache")
}
return hash, nil
}
// Commit convert memcory mpt to storage db
func (mpts *Store) Commit(req *types.ReqHash) ([]byte, error) {
tree, ok := mpts.trees[string(req.Hash)]
if !ok {
mlog.Error("store mpt commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
err := tree.Commit2Db(common.BytesToHash(req.Hash), true)
if nil != err {
mlog.Error("store mpt commit", "err", types.ErrHashNotFound)
return nil, types.ErrDataBaseDamage
}
delete(mpts.trees, string(req.Hash))
return req.Hash, nil
}
// MemSetUpgrade set keys values to memcory mpt, return root hash and error
func (mpts *Store) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
//not support
return nil, nil
}
// CommitUpgrade convert memcory mpt to storage db
func (mpts *Store) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
//not support
return nil, nil
}
// Rollback 回退将缓存的mpt树删除掉
func (mpts *Store) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := mpts.trees[string(req.Hash)]
if !ok {
mlog.Error("store mavl rollback", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
delete(mpts.trees, string(req.Hash))
return req.Hash, nil
}
// Del ...
func (mpts *Store) Del(req *types.StoreDel) ([]byte, error) {
//not support
return nil, nil
}
// IterateRangeByStateHash 迭代实现功能; statehash:当前状态hash, start:开始查找的key, end: 结束的key, ascending:升序,降序, fn 迭代回调函数
func (mpts *Store) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
mpt.IterateRangeByStateHash(mpts.GetDB(), statehash, start, end, ascending, fn)
}
// ProcEvent not support message
func (mpts *Store) ProcEvent(msg *queue.Message) {
if msg == nil {
return
}
msg.ReplyErr("Store", types.ErrActionNotSupport)
}