forked from ava-labs/coreth
/
trie_sync_tasks.go
150 lines (127 loc) · 4.77 KB
/
trie_sync_tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// (c) 2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package statesync
import (
"fmt"
"github.com/MetalBlockchain/coreth/core/rawdb"
"github.com/MetalBlockchain/coreth/core/types"
"github.com/MetalBlockchain/coreth/ethdb"
"github.com/MetalBlockchain/coreth/sync/syncutils"
"github.com/MetalBlockchain/coreth/trie"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
var (
_ syncTask = &mainTrieTask{}
_ syncTask = &storageTrieTask{}
)
type syncTask interface {
// IterateLeafs should return an iterator over
// trie leafs already persisted to disk for this
// trie. Used for restoring progress in case of an
// interrupted sync and for hashing segments.
IterateLeafs(seek common.Hash) ethdb.Iterator
// callbacks used to form a LeafSyncTask
OnStart() (bool, error)
OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) error
OnFinish() error
}
type mainTrieTask struct {
sync *stateSync
}
func NewMainTrieTask(sync *stateSync) syncTask {
return &mainTrieTask{
sync: sync,
}
}
func (m *mainTrieTask) IterateLeafs(seek common.Hash) ethdb.Iterator {
snapshot := m.sync.snapshot
return &syncutils.AccountIterator{AccountIterator: snapshot.AccountIterator(seek)}
}
// OnStart always returns false since the main trie task cannot be skipped.
func (m *mainTrieTask) OnStart() (bool, error) {
return false, nil
}
func (m *mainTrieTask) OnFinish() error {
return m.sync.onMainTrieFinished()
}
func (m *mainTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) error {
codeHashes := make([]common.Hash, 0)
// loop over the keys, decode them as accounts, then check for any
// storage or code we need to sync as well.
for i, key := range keys {
var acc types.StateAccount
accountHash := common.BytesToHash(key)
if err := rlp.DecodeBytes(vals[i], &acc); err != nil {
return fmt.Errorf("could not decode main trie as account, key=%s, valueLen=%d, err=%w", accountHash, len(vals[i]), err)
}
// persist the account data
writeAccountSnapshot(db, accountHash, acc)
// check if this account has storage root that we need to fetch
if acc.Root != (common.Hash{}) && acc.Root != types.EmptyRootHash {
m.sync.trieQueue.RegisterStorageTrie(acc.Root, accountHash)
}
// check if this account has code and add it to codeHashes to fetch
// at the end of this loop.
codeHash := common.BytesToHash(acc.CodeHash)
if codeHash != (common.Hash{}) && codeHash != types.EmptyCodeHash {
codeHashes = append(codeHashes, codeHash)
}
}
// Add collected code hashes to the code syncer.
return m.sync.codeSyncer.addCode(codeHashes)
}
type storageTrieTask struct {
sync *stateSync
root common.Hash
accounts []common.Hash
}
func NewStorageTrieTask(sync *stateSync, root common.Hash, accounts []common.Hash) syncTask {
return &storageTrieTask{
sync: sync,
root: root,
accounts: accounts,
}
}
func (s *storageTrieTask) IterateLeafs(seek common.Hash) ethdb.Iterator {
snapshot := s.sync.snapshot
it, _ := snapshot.StorageIterator(s.accounts[0], seek)
return &syncutils.StorageIterator{StorageIterator: it}
}
func (s *storageTrieTask) OnStart() (bool, error) {
// check if this storage root is on disk
var firstAccount common.Hash
if len(s.accounts) > 0 {
firstAccount = s.accounts[0]
}
storageTrie, err := trie.New(trie.StorageTrieID(s.sync.root, s.root, firstAccount), s.sync.trieDB)
if err != nil {
return false, nil
}
// If the storage trie is already on disk, we only need to populate the storage snapshot for [accountHash]
// with the trie contents. There is no need to re-sync the trie, since it is already present.
for _, account := range s.accounts {
if err := writeAccountStorageSnapshotFromTrie(s.sync.db.NewBatch(), s.sync.batchSize, account, storageTrie); err != nil {
// If the storage trie cannot be iterated (due to an incomplete trie from pruning this storage trie in the past)
// then we re-sync it here. Therefore, this error is not fatal and we can safely continue here.
log.Info("could not populate storage snapshot from trie with existing root, syncing from peers instead", "account", account, "root", s.root, "err", err)
return false, nil
}
}
// Populating the snapshot from the existing storage trie succeeded,
// return true to skip this task.
return true, s.sync.onStorageTrieFinished(s.root)
}
func (s *storageTrieTask) OnFinish() error {
return s.sync.onStorageTrieFinished(s.root)
}
func (s *storageTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) error {
// persists the trie leafs to the snapshot for all accounts associated with this root
for _, account := range s.accounts {
for i, key := range keys {
rawdb.WriteStorageSnapshot(db, account, common.BytesToHash(key), vals[i])
}
}
return nil
}