/
manager.go
371 lines (318 loc) · 11.3 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
package account
import (
"errors"
"github.com/LemoFoundationLtd/lemochain-core/chain/types"
"github.com/LemoFoundationLtd/lemochain-core/common"
"github.com/LemoFoundationLtd/lemochain-core/common/log"
"github.com/LemoFoundationLtd/lemochain-core/store"
"github.com/LemoFoundationLtd/lemochain-core/store/protocol"
"github.com/LemoFoundationLtd/lemochain-core/store/trie"
"math/big"
)
// Trie cache generation limit after which to evict trie nodes from memory.
const MaxTrieCacheGen = uint16(120)
var (
ErrRevisionNotExist = errors.New("revision cannot be reverted")
ErrNoEvents = errors.New("the times of pop event is more than push")
ErrSnapshotIsBroken = errors.New("the snapshot is broken")
)
// TxsProduct is the product of transaction execution
type TxsProduct struct {
Txs types.Transactions // The transactions executed indeed. These transactions will be packaged in a block
GasUsed uint64 // gas used by all transactions
ChangeLogs types.ChangeLogSlice
VersionRoot common.Hash
}
// Manager is used to maintain the newest and not confirmed account data. It will save all data to the db when finished a block's transactions processing.
type Manager struct {
db protocol.ChainDB
trieDb *store.TrieDatabase // used to access tire data in file
acctDb *store.AccountTrieDB
// Manager loads all data from the branch where the baseBlock is
baseBlock *types.Block
baseBlockHash common.Hash
// This map holds 'live' accounts, which will get modified while processing a state transition.
accountCache map[common.Address]*SafeAccount
processor *LogProcessor
versionTrie *trie.SecureTrie
}
// NewManager creates a new Manager. It is used to maintain account changes based on the block environment which specified by blockHash
func NewManager(blockHash common.Hash, db protocol.ChainDB) *Manager {
if db == nil {
panic("account.NewManager is called without a database")
}
manager := &Manager{
db: db,
baseBlockHash: blockHash,
accountCache: make(map[common.Address]*SafeAccount),
trieDb: db.GetTrieDatabase(),
}
if err := manager.loadBaseBlock(); err != nil {
log.Errorf("load block[%s] fail: %s\n", manager.baseBlockHash.Hex(), err.Error())
panic(err)
}
manager.acctDb, _ = db.GetActDatabase(blockHash)
manager.processor = NewLogProcessor(manager)
return manager
}
// GetAccount loads account from cache or db, or creates a new one if it's not exist.
func (am *Manager) GetAccount(address common.Address) types.AccountAccessor {
cached := am.accountCache[address]
if cached == nil {
data, _ := am.acctDb.Get(address)
account := NewAccount(am.db, address, data)
cached = NewSafeAccount(am.processor, account)
// cache it
am.accountCache[address] = cached
}
return cached
}
// GetCanonicalAccount loads an readonly account object from confirmed block in db, or creates a new one if it's not exist. The Modification of the account will not be recorded to store.
func (am *Manager) GetCanonicalAccount(address common.Address) types.AccountAccessor {
data, err := am.db.GetAccount(address)
if err != nil && err != store.ErrNotExist {
panic(err)
}
return NewAccount(am.db, address, data)
}
// getRawAccount loads an account same as GetAccount, but editing the account of this method returned is not going to generate change logs.
// This method is used for ChangeLog.Redo/Undo.
func (am *Manager) getRawAccount(address common.Address) *Account {
safeAccount := am.GetAccount(address)
// Change this account will change safeAccount. They are same pointers
return safeAccount.(*SafeAccount).rawAccount
}
// AddEvent records the event during transaction's execution.
func (am *Manager) AddEvent(event *types.Event) {
if (event.Address == common.Address{}) {
panic("account.Manager.AddEvent() is called without a Address or TxHash")
}
account := am.GetAccount(event.Address)
account.PushEvent(event)
}
// // GetEvents returns all events since last reset
func (am *Manager) GetEvents() []*types.Event {
events := make([]*types.Event, 0)
for _, v := range am.accountCache {
events = append(events, v.GetEvents()...)
}
return events
}
// GetEvents returns all events since last reset
// func (am *Manager) GetEventsByTx(txHash common.Hash) []*types.Event {
// result := make([]*types.Event, 0)
// for _, event := range am.processor.GetEvents() {
// if event.TxHash == txHash {
// result = append(result, event)
// }
// }
// return result
// }
// GetChangeLogs returns all change logs since last reset
func (am *Manager) GetChangeLogs() types.ChangeLogSlice {
return am.processor.GetChangeLogs()
}
// getVersionTrie loads version trie by the version root from baseBlockHash
func (am *Manager) getVersionTrie() *trie.SecureTrie {
if am.versionTrie == nil {
var root common.Hash
// not genesis block
if (am.baseBlockHash != common.Hash{}) {
// load last version trie root
root = am.baseBlock.Header.VersionRoot
}
var err error
am.versionTrie, err = trie.NewSecure(root, am.trieDb, MaxTrieCacheGen)
if err != nil {
panic(err)
}
}
return am.versionTrie
}
func (am *Manager) GetVersionRoot() common.Hash {
return am.getVersionTrie().Hash()
}
// clear clears all data from one block so that Manager can get ready to process another block's transactions
func (am *Manager) clear() {
am.accountCache = make(map[common.Address]*SafeAccount)
am.processor.Clear()
am.versionTrie = nil
}
// Reset clears out all data and switch state to the new block environment.
func (am *Manager) Reset(blockHash common.Hash) {
am.baseBlockHash = blockHash
if err := am.loadBaseBlock(); err != nil {
log.Errorf("Reset to block[%s] fail: %s\n", am.baseBlockHash.Hex(), err.Error())
panic(err)
}
am.acctDb, _ = am.db.GetActDatabase(blockHash)
am.clear()
}
func (am *Manager) loadBaseBlock() (err error) {
var block *types.Block
if (am.baseBlockHash != common.Hash{}) {
block, err = am.db.GetBlockByHash(am.baseBlockHash)
if err != nil {
panic("load base block err: " + err.Error())
}
}
am.baseBlock = block
return err
}
// Snapshot returns an identifier for the current revision of the state.
func (am *Manager) Snapshot() int {
return am.processor.Snapshot()
}
// RevertToSnapshot reverts all state changes made since the given revision.
func (am *Manager) RevertToSnapshot(revid int) {
am.processor.RevertToSnapshot(revid)
}
func (am *Manager) logGrouping() map[common.Address]types.ChangeLogSlice {
logsByAccount := make(map[common.Address]types.ChangeLogSlice)
logs := am.processor.changeLogs
for _, log := range logs {
logsByAccount[log.Address] = append(logsByAccount[log.Address], log)
}
return logsByAccount
}
// Finalise finalises the state, clears the change caches and update tries.
func (am *Manager) Finalise() error {
logsByAccount := am.logGrouping()
versionTrie := am.getVersionTrie()
currentHeight := am.currentBlockHeight()
for _, account := range am.accountCache {
if len(logsByAccount[account.GetAddress()]) <= 0 {
continue
}
oldStorageRoot := account.rawAccount.GetStorageRoot()
oldAssetCodeRoot := account.rawAccount.GetAssetCodeRoot()
oldAssetIdRoot := account.rawAccount.GetAssetIdRoot()
oldEquityRoot := account.rawAccount.GetEquityRoot()
// update account and contract storage
if err := account.rawAccount.Finalise(); err != nil {
return err
}
newStorageRoot := account.rawAccount.GetStorageRoot()
newAssetCodeRoot := account.rawAccount.GetAssetCodeRoot()
newAssetIdRoot := account.rawAccount.GetAssetIdRoot()
newEquityRoot := account.rawAccount.GetEquityRoot()
if newStorageRoot != oldStorageRoot {
log := NewStorageRootLog(account.GetAddress(), am.processor, oldStorageRoot, newStorageRoot)
am.processor.PushChangeLog(log)
}
if newAssetCodeRoot != oldAssetCodeRoot {
log, _ := NewAssetCodeRootLog(account.GetAddress(), am.processor, oldAssetCodeRoot, newAssetCodeRoot)
am.processor.PushChangeLog(log)
}
if newAssetIdRoot != oldAssetIdRoot {
log, _ := NewAssetIdRootLog(account.GetAddress(), am.processor, oldAssetIdRoot, newAssetIdRoot)
am.processor.PushChangeLog(log)
}
if newEquityRoot != oldEquityRoot {
log, _ := NewEquityRootLog(account.GetAddress(), am.processor, oldEquityRoot, newEquityRoot)
am.processor.PushChangeLog(log)
}
logs := logsByAccount[account.GetAddress()]
eventIndex := uint(0)
for _, changeLog := range logs {
if changeLog.LogType == AddEventLog {
newVal := changeLog.NewVal.(*types.Event)
newVal.Index = eventIndex
eventIndex = eventIndex + 1
}
nextVersion := account.rawAccount.GetVersion(changeLog.LogType) + 1
// set version record in rawAccount.data.NewestRecords
changeLog.Version = nextVersion
account.rawAccount.SetVersion(changeLog.LogType, nextVersion, currentHeight)
// update version trie
k := versionTrieKey(account.GetAddress(), changeLog.LogType)
if err := versionTrie.TryUpdate(k, big.NewInt(int64(changeLog.Version)).Bytes()); err != nil {
return err
}
}
}
return nil
}
func versionTrieKey(address common.Address, logType types.ChangeLogType) []byte {
return append(address.Bytes(), big.NewInt(int64(logType)).Bytes()...)
}
// Save writes dirty data into db.
func (am *Manager) Save(newBlockHash common.Hash) error {
logsByAccount := am.logGrouping()
acctDatabase, _ := am.db.GetActDatabase(newBlockHash)
for _, account := range am.accountCache {
if len(logsByAccount[account.GetAddress()]) <= 0 {
continue
}
if err := account.rawAccount.Save(); err != nil {
return err
}
// save accounts to db
acctDatabase.Put(account.rawAccount.data, am.currentBlockHeight())
}
am.db.CandidatesRanking(newBlockHash)
// update version trie nodes' hash
root, err := am.getVersionTrie().Commit(nil)
if err != nil {
return err
}
// save version trie
err = am.trieDb.Commit(root, false)
if err != nil {
log.Errorf("save version trie fail: %v", err)
return err
}
log.Debugf("save version trie success: %#x", root)
am.clear()
return nil
}
// GetTxProduct get the product of transaction execution
func (am *Manager) GetTxsProduct(txs types.Transactions, gasUsed uint64) *TxsProduct {
return &TxsProduct{
Txs: txs,
GasUsed: gasUsed,
ChangeLogs: am.GetChangeLogs(),
VersionRoot: am.GetVersionRoot(),
}
}
// Rebuild loads and redo all change logs to update account to the newest state.
func (am *Manager) Rebuild(address common.Address, logs types.ChangeLogSlice) error {
_, err := am.processor.Rebuild(address, logs)
if err != nil {
return err
}
return nil
// save account
// return am.db.SetAccounts(am.baseBlockHash, []*types.AccountData{account.data})
}
// MergeChangeLogs merges the change logs for same account in block. Then update the version of change logs and account.
func (am *Manager) MergeChangeLogs() {
am.processor.MergeChangeLogs()
}
func (am *Manager) Stop(graceful bool) error {
return nil
}
func (am *Manager) currentBlockHeight() uint32 {
if am.baseBlock == nil {
return 0
}
return am.baseBlock.Height() + 1
}
func (am *Manager) RebuildAll(b *types.Block) error {
am.Reset(b.ParentHash())
if b.ChangeLogs != nil {
for _, cl := range b.ChangeLogs {
if cl.LogType == StorageRootLog ||
cl.LogType == AssetIdRootLog ||
cl.LogType == AssetCodeRootLog ||
cl.LogType == EquityRootLog {
continue
}
if err := cl.Redo(am.processor); err != nil {
return err
}
am.processor.changeLogs = append(am.processor.changeLogs, cl)
}
}
return nil
}