forked from ava-labs/coreth
/
code_syncer.go
255 lines (219 loc) · 8.23 KB
/
code_syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// (c) 2021-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package statesync
import (
"context"
"errors"
"fmt"
"sync"
"github.com/dim4egster/coreth/core/rawdb"
"github.com/dim4egster/coreth/ethdb"
"github.com/dim4egster/coreth/params"
statesyncclient "github.com/dim4egster/coreth/sync/client"
"github.com/dim4egster/qmallgo/ids"
"github.com/ethereum/go-ethereum/common"
)
const (
DefaultMaxOutstandingCodeHashes = 5000
DefaultNumCodeFetchingWorkers = 5
)
var errFailedToAddCodeHashesToQueue = errors.New("failed to add code hashes to queue")
// CodeSyncerConfig defines the configuration of the code syncer
type CodeSyncerConfig struct {
// Maximum number of outstanding code hashes in the queue before the code syncer should block.
MaxOutstandingCodeHashes int
// Number of worker threads to fetch code from the network
NumCodeFetchingWorkers int
// Client for fetching code from the network
Client statesyncclient.Client
// Database for the code syncer to use.
DB ethdb.Database
}
// codeSyncer syncs code bytes from the network in a seprate thread.
// Tracks outstanding requests in the DB, so that it will still fulfill them if interrupted.
type codeSyncer struct {
lock sync.Mutex
CodeSyncerConfig
outstandingCodeHashes ids.Set // Set of code hashes that we need to fetch from the network.
codeHashes chan common.Hash // Channel of incoming code hash requests
// Used to set terminal error or pass nil to [errChan] if successful.
errOnce sync.Once
errChan chan error
// Passed in details from the context used to start the codeSyncer
cancel context.CancelFunc
done <-chan struct{}
}
// newCodeSyncer returns a a code syncer that will sync code bytes from the network in a separate thread.
func newCodeSyncer(config CodeSyncerConfig) *codeSyncer {
return &codeSyncer{
CodeSyncerConfig: config,
codeHashes: make(chan common.Hash, config.MaxOutstandingCodeHashes),
outstandingCodeHashes: ids.NewSet(0),
errChan: make(chan error, 1),
}
}
// start the worker thread and populate the code hashes queue with active work.
// blocks until all outstanding code requests from a previous sync have been
// queued for fetching (or ctx is cancelled).
func (c *codeSyncer) start(ctx context.Context) {
ctx, c.cancel = context.WithCancel(ctx)
c.done = ctx.Done()
wg := sync.WaitGroup{}
// Start [numCodeFetchingWorkers] threads to fetch code from the network.
for i := 0; i < c.NumCodeFetchingWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.work(ctx); err != nil {
c.setError(err)
}
}()
}
err := c.addCodeToFetchFromDBToQueue()
if err != nil {
c.setError(err)
}
// Wait for all the worker threads to complete before signalling success via setError(nil).
// Note: if any of the worker threads errored already, setError will be a no-op here.
go func() {
wg.Wait()
c.setError(nil)
}()
}
// Clean out any codeToFetch markers from the database that are no longer needed and
// add any outstanding markers to the queue.
func (c *codeSyncer) addCodeToFetchFromDBToQueue() error {
it := rawdb.NewCodeToFetchIterator(c.DB)
defer it.Release()
batch := c.DB.NewBatch()
codeHashes := make([]common.Hash, 0)
for it.Next() {
codeHash := common.BytesToHash(it.Key()[len(rawdb.CodeToFetchPrefix):])
// If we already have the codeHash, delete the marker from the database and continue
if rawdb.HasCode(c.DB, codeHash) {
rawdb.DeleteCodeToFetch(batch, codeHash)
// Write the batch to disk if it has reached the ideal batch size.
if batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write batch removing old code markers: %w", err)
}
batch.Reset()
}
continue
}
codeHashes = append(codeHashes, codeHash)
}
if err := it.Error(); err != nil {
return fmt.Errorf("failed to iterate code entries to fetch: %w", err)
}
if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write batch removing old code markers: %w", err)
}
}
return c.addCode(codeHashes)
}
// work fulfills any incoming requests from the producer channel by fetching code bytes from the network
// and fulfilling them by updating the database.
func (c *codeSyncer) work(ctx context.Context) error {
codeHashes := make([]common.Hash, 0, params.MaxCodeHashesPerRequest)
for {
select {
case <-ctx.Done(): // If ctx is done, set the error to the ctx error since work has been cancelled.
return ctx.Err()
case codeHash, ok := <-c.codeHashes:
// If there are no more [codeHashes], fulfill a last code request for any [codeHashes] previously
// read from the channel, then return.
if !ok {
if len(codeHashes) > 0 {
return c.fulfillCodeRequest(ctx, codeHashes)
}
return nil
}
codeHashes = append(codeHashes, codeHash)
// Try to wait for at least [MaxCodeHashesPerRequest] code hashes to batch into a single request
// if there's more work remaining.
if len(codeHashes) < params.MaxCodeHashesPerRequest {
continue
}
if err := c.fulfillCodeRequest(ctx, codeHashes); err != nil {
return err
}
// Reset the codeHashes array
codeHashes = codeHashes[:0]
}
}
}
// fulfillCodeRequest sends a request for [codeHashes], writes the result to the database, and
// marks the work as complete.
// codeHashes should not be empty or contain duplicate hashes.
// Returns an error if one is encountered, signaling the worker thread to terminate.
func (c *codeSyncer) fulfillCodeRequest(ctx context.Context, codeHashes []common.Hash) error {
codeByteSlices, err := c.Client.GetCode(ctx, codeHashes)
if err != nil {
return err
}
// Hold the lock while modifying outstandingCodeHashes.
c.lock.Lock()
batch := c.DB.NewBatch()
for i, codeHash := range codeHashes {
rawdb.DeleteCodeToFetch(batch, codeHash)
c.outstandingCodeHashes.Remove(ids.ID(codeHash))
rawdb.WriteCode(batch, codeHash, codeByteSlices[i])
}
c.lock.Unlock() // Release the lock before writing the batch
if err := batch.Write(); err != nil {
return fmt.Errorf("faild to write batch for fulfilled code requests: %w", err)
}
return nil
}
// addCode checks if [codeHashes] need to be fetched from the network and adds them to the queue if so.
// assumes that [codeHashes] are valid non-empty code hashes.
func (c *codeSyncer) addCode(codeHashes []common.Hash) error {
batch := c.DB.NewBatch()
c.lock.Lock()
selectedCodeHashes := make([]common.Hash, 0, len(codeHashes))
for _, codeHash := range codeHashes {
// Add the code hash to the queue if it's not already on the queue and we do not already have it
// in the database.
if !c.outstandingCodeHashes.Contains(ids.ID(codeHash)) && !rawdb.HasCode(c.DB, codeHash) {
selectedCodeHashes = append(selectedCodeHashes, codeHash)
c.outstandingCodeHashes.Add(ids.ID(codeHash))
rawdb.AddCodeToFetch(batch, codeHash)
}
}
c.lock.Unlock()
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write batch of code to fetch markers due to: %w", err)
}
return c.addHashesToQueue(selectedCodeHashes)
}
// notifyAccountTrieCompleted notifies the code syncer that there will be no more incoming
// code hashes from syncing the account trie, so it only needs to compelete its outstanding
// work.
// Note: this allows the worker threads to exit and return a nil error.
func (c *codeSyncer) notifyAccountTrieCompleted() {
close(c.codeHashes)
}
// addHashesToQueue adds [codeHashes] to the queue and blocks until it is able to do so.
// This should be called after all other operation to add code hashes to the queue has been completed.
func (c *codeSyncer) addHashesToQueue(codeHashes []common.Hash) error {
for _, codeHash := range codeHashes {
select {
case c.codeHashes <- codeHash:
case <-c.done:
return errFailedToAddCodeHashesToQueue
}
}
return nil
}
// setError sets the error to the first error that occurs and adds it to the error channel.
// If [err] is nil, setError indicates that codeSyncer has finished code syncing successfully.
func (c *codeSyncer) setError(err error) {
c.errOnce.Do(func() {
c.cancel()
c.errChan <- err
})
}
// Done returns an error channel to indicate the return status of code syncing.
func (c *codeSyncer) Done() <-chan error { return c.errChan }