forked from ethereum/go-ethereum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
3112 lines (2836 loc) · 106 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"bytes"
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/msgrate"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"golang.org/x/crypto/sha3"
)
const (
// minRequestSize is the minimum number of bytes to request from a remote peer.
// This number is used as the low cap for account and storage range requests.
// Bytecode and trienode are limited inherently by item count (1).
minRequestSize = 64 * 1024
// maxRequestSize is the maximum number of bytes to request from a remote peer.
// This number is used as the high cap for account and storage range requests.
// Bytecode and trienode are limited more explicitly by the caps below.
maxRequestSize = 512 * 1024
// maxCodeRequestCount is the maximum number of bytecode blobs to request in a
// single query. If this number is too low, we're not filling responses fully
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
//
// Deployed bytecodes are currently capped at 24KB, so the minimum request
// size should be maxRequestSize / 24K. Assuming that most contracts do not
// come close to that, requesting 4x should be a good approximation.
maxCodeRequestCount = maxRequestSize / (24 * 1024) * 4
// maxTrieRequestCount is the maximum number of trie node blobs to request in
// a single query. If this number is too low, we're not filling responses fully
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005
// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and excessively expanding
// the state trie breadth wise.
minTrienodeHealThrottle = 1
// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount
// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33
// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)
var (
// accountConcurrency is the number of chunks to split the account trie into
// to allow concurrent retrievals.
accountConcurrency = 16
// storageConcurrency is the number of chunks to split the a large contract
// storage trie into to allow concurrent retrievals.
storageConcurrency = 16
)
// ErrCancelled is returned from snap syncing if the operation was prematurely
// terminated.
var ErrCancelled = errors.New("sync cancelled")
// accountRequest tracks a pending account range request to ensure responses are
// to actual requests and to validate any security constraints.
//
// Concurrency note: account requests and responses are handled concurrently from
// the main runloop to allow Merkle proof verifications on the peer's thread and
// to drop on invalid response. The request struct must contain all the data to
// construct the response without accessing runloop internals (i.e. task). That
// is only included to allow the runloop to match a response to the task being
// synced without having yet another set of maps.
type accountRequest struct {
peer string // Peer to which this request is assigned
id uint64 // Request ID of this request
time time.Time // Timestamp when the request was sent
deliver chan *accountResponse // Channel to deliver successful response on
revert chan *accountRequest // Channel to deliver request failure on
cancel chan struct{} // Channel to track sync cancellation
timeout *time.Timer // Timer to track delivery timeout
stale chan struct{} // Channel to signal the request was dropped
origin common.Hash // First account requested to allow continuation checks
limit common.Hash // Last account requested to allow non-overlapping chunking
task *accountTask // Task which this request is filling (only access fields through the runloop!!)
}
// accountResponse is an already Merkle-verified remote response to an account
// range request. It contains the subtrie for the requested account range and
// the database that's going to be filled with the internal nodes on commit.
type accountResponse struct {
task *accountTask // Task which this request is filling
hashes []common.Hash // Account hashes in the returned range
accounts []*types.StateAccount // Expanded accounts in the returned range
cont bool // Whether the account range has a continuation
}
// bytecodeRequest tracks a pending bytecode request to ensure responses are to
// actual requests and to validate any security constraints.
//
// Concurrency note: bytecode requests and responses are handled concurrently from
// the main runloop to allow Keccak256 hash verifications on the peer's thread and
// to drop on invalid response. The request struct must contain all the data to
// construct the response without accessing runloop internals (i.e. task). That
// is only included to allow the runloop to match a response to the task being
// synced without having yet another set of maps.
type bytecodeRequest struct {
peer string // Peer to which this request is assigned
id uint64 // Request ID of this request
time time.Time // Timestamp when the request was sent
deliver chan *bytecodeResponse // Channel to deliver successful response on
revert chan *bytecodeRequest // Channel to deliver request failure on
cancel chan struct{} // Channel to track sync cancellation
timeout *time.Timer // Timer to track delivery timeout
stale chan struct{} // Channel to signal the request was dropped
hashes []common.Hash // Bytecode hashes to validate responses
task *accountTask // Task which this request is filling (only access fields through the runloop!!)
}
// bytecodeResponse is an already verified remote response to a bytecode request.
type bytecodeResponse struct {
task *accountTask // Task which this request is filling
hashes []common.Hash // Hashes of the bytecode to avoid double hashing
codes [][]byte // Actual bytecodes to store into the database (nil = missing)
}
// storageRequest tracks a pending storage ranges request to ensure responses are
// to actual requests and to validate any security constraints.
//
// Concurrency note: storage requests and responses are handled concurrently from
// the main runloop to allow Merkle proof verifications on the peer's thread and
// to drop on invalid response. The request struct must contain all the data to
// construct the response without accessing runloop internals (i.e. tasks). That
// is only included to allow the runloop to match a response to the task being
// synced without having yet another set of maps.
type storageRequest struct {
peer string // Peer to which this request is assigned
id uint64 // Request ID of this request
time time.Time // Timestamp when the request was sent
deliver chan *storageResponse // Channel to deliver successful response on
revert chan *storageRequest // Channel to deliver request failure on
cancel chan struct{} // Channel to track sync cancellation
timeout *time.Timer // Timer to track delivery timeout
stale chan struct{} // Channel to signal the request was dropped
accounts []common.Hash // Account hashes to validate responses
roots []common.Hash // Storage roots to validate responses
origin common.Hash // First storage slot requested to allow continuation checks
limit common.Hash // Last storage slot requested to allow non-overlapping chunking
mainTask *accountTask // Task which this response belongs to (only access fields through the runloop!!)
subTask *storageTask // Task which this response is filling (only access fields through the runloop!!)
}
// storageResponse is an already Merkle-verified remote response to a storage
// range request. It contains the subtries for the requested storage ranges and
// the databases that's going to be filled with the internal nodes on commit.
type storageResponse struct {
mainTask *accountTask // Task which this response belongs to
subTask *storageTask // Task which this response is filling
accounts []common.Hash // Account hashes requested, may be only partially filled
roots []common.Hash // Storage roots requested, may be only partially filled
hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range
cont bool // Whether the last storage range has a continuation
}
// trienodeHealRequest tracks a pending state trie request to ensure responses
// are to actual requests and to validate any security constraints.
//
// Concurrency note: trie node requests and responses are handled concurrently from
// the main runloop to allow Keccak256 hash verifications on the peer's thread and
// to drop on invalid response. The request struct must contain all the data to
// construct the response without accessing runloop internals (i.e. task). That
// is only included to allow the runloop to match a response to the task being
// synced without having yet another set of maps.
type trienodeHealRequest struct {
peer string // Peer to which this request is assigned
id uint64 // Request ID of this request
time time.Time // Timestamp when the request was sent
deliver chan *trienodeHealResponse // Channel to deliver successful response on
revert chan *trienodeHealRequest // Channel to deliver request failure on
cancel chan struct{} // Channel to track sync cancellation
timeout *time.Timer // Timer to track delivery timeout
stale chan struct{} // Channel to signal the request was dropped
paths []string // Trie node paths for identifying trie node
hashes []common.Hash // Trie node hashes to validate responses
task *healTask // Task which this request is filling (only access fields through the runloop!!)
}
// trienodeHealResponse is an already verified remote response to a trie node request.
type trienodeHealResponse struct {
task *healTask // Task which this request is filling
paths []string // Paths of the trie nodes
hashes []common.Hash // Hashes of the trie nodes to avoid double hashing
nodes [][]byte // Actual trie nodes to store into the database (nil = missing)
}
// bytecodeHealRequest tracks a pending bytecode request to ensure responses are to
// actual requests and to validate any security constraints.
//
// Concurrency note: bytecode requests and responses are handled concurrently from
// the main runloop to allow Keccak256 hash verifications on the peer's thread and
// to drop on invalid response. The request struct must contain all the data to
// construct the response without accessing runloop internals (i.e. task). That
// is only included to allow the runloop to match a response to the task being
// synced without having yet another set of maps.
type bytecodeHealRequest struct {
peer string // Peer to which this request is assigned
id uint64 // Request ID of this request
time time.Time // Timestamp when the request was sent
deliver chan *bytecodeHealResponse // Channel to deliver successful response on
revert chan *bytecodeHealRequest // Channel to deliver request failure on
cancel chan struct{} // Channel to track sync cancellation
timeout *time.Timer // Timer to track delivery timeout
stale chan struct{} // Channel to signal the request was dropped
hashes []common.Hash // Bytecode hashes to validate responses
task *healTask // Task which this request is filling (only access fields through the runloop!!)
}
// bytecodeHealResponse is an already verified remote response to a bytecode request.
type bytecodeHealResponse struct {
task *healTask // Task which this request is filling
hashes []common.Hash // Hashes of the bytecode to avoid double hashing
codes [][]byte // Actual bytecodes to store into the database (nil = missing)
}
// accountTask represents the sync task for a chunk of the account snapshot.
type accountTask struct {
// These fields get serialized to leveldb on shutdown
Next common.Hash // Next account to sync in this interval
Last common.Hash // Last account to sync in this interval
SubTasks map[common.Hash][]*storageTask // Storage intervals needing fetching for large contracts
// These fields are internals used during runtime
req *accountRequest // Pending request to fill this task
res *accountResponse // Validate response filling this task
pend int // Number of pending subtasks for this round
needCode []bool // Flags whether the filling accounts need code retrieval
needState []bool // Flags whether the filling accounts need storage retrieval
needHeal []bool // Flags whether the filling accounts's state was chunked and need healing
codeTasks map[common.Hash]struct{} // Code hashes that need retrieval
stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval
genBatch ethdb.Batch // Batch used by the node generator
genTrie *trie.StackTrie // Node generator from storage slots
done bool // Flag whether the task can be removed
}
// storageTask represents the sync task for a chunk of the storage snapshot.
type storageTask struct {
Next common.Hash // Next account to sync in this interval
Last common.Hash // Last account to sync in this interval
// These fields are internals used during runtime
root common.Hash // Storage root hash for this instance
req *storageRequest // Pending request to fill this task
genBatch ethdb.Batch // Batch used by the node generator
genTrie *trie.StackTrie // Node generator from storage slots
done bool // Flag whether the task can be removed
}
// healTask represents the sync task for healing the snap-synced chunk boundaries.
type healTask struct {
scheduler *trie.Sync // State trie sync scheduler defining the tasks
trieTasks map[string]common.Hash // Set of trie node tasks currently queued for retrieval, indexed by node path
codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval, indexed by code hash
}
// SyncProgress is a database entry to allow suspending and resuming a snapshot state
// sync. Opposed to full and fast sync, there is no way to restart a suspended
// snap sync without prior knowledge of the suspension point.
type SyncProgress struct {
Tasks []*accountTask // The suspended account tasks (contract tasks within)
// Status report during syncing phase
AccountSynced uint64 // Number of accounts downloaded
AccountBytes common.StorageSize // Number of account trie bytes persisted to disk
BytecodeSynced uint64 // Number of bytecodes downloaded
BytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
StorageSynced uint64 // Number of storage slots downloaded
StorageBytes common.StorageSize // Number of storage trie bytes persisted to disk
// Status report during healing phase
TrienodeHealSynced uint64 // Number of state trie nodes downloaded
TrienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
BytecodeHealSynced uint64 // Number of bytecodes downloaded
BytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
}
// SyncPending is analogous to SyncProgress, but it's used to report on pending
// ephemeral sync progress that doesn't get persisted into the database.
type SyncPending struct {
TrienodeHeal uint64 // Number of state trie nodes pending
BytecodeHeal uint64 // Number of bytecodes pending
}
// SyncPeer abstracts out the methods required for a peer to be synced against
// with the goal of allowing the construction of mock peers without the full
// blown networking.
type SyncPeer interface {
// ID retrieves the peer's unique identifier.
ID() string
// RequestAccountRange fetches a batch of accounts rooted in a specific account
// trie, starting with the origin.
RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error
// RequestStorageRanges fetches a batch of storage slots belonging to one or
// more accounts. If slots from only one account is requested, an origin marker
// may also be used to retrieve from there.
RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error
// RequestByteCodes fetches a batch of bytecodes by hash.
RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error
// RequestTrieNodes fetches a batch of account or storage trie nodes rooted in
// a specific state trie.
RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error
// Log retrieves the peer's own contextual logger.
Log() log.Logger
}
// Syncer is an Ethereum account and storage trie syncer based on snapshots and
// the snap protocol. It's purpose is to download all the accounts and storage
// slots from remote peers and reassemble chunks of the state trie, on top of
// which a state sync can be run to fix any gaps / overlaps.
//
// Every network request has a variety of failure events:
// - The peer disconnects after task assignment, failing to send the request
// - The peer disconnects after sending the request, before delivering on it
// - The peer remains connected, but does not deliver a response in time
// - The peer delivers a stale response after a previous timeout
// - The peer delivers a refusal to serve the requested state
type Syncer struct {
db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
scheme string // Node scheme used in node database
root common.Hash // Current state trie root being synced
tasks []*accountTask // Current account task set being synced
snapped bool // Flag to signal that snap phase is done
healer *healTask // Current state healing task being executed
update chan struct{} // Notification channel for possible sync progression
peers map[string]SyncPeer // Currently active peers to download from
peerJoin *event.Feed // Event feed to react to peers joining
peerDrop *event.Feed // Event feed to react to peers dropping
rates *msgrate.Trackers // Message throughput rates for peers
// Request tracking during syncing phase
statelessPeers map[string]struct{} // Peers that failed to deliver state data
accountIdlers map[string]struct{} // Peers that aren't serving account requests
bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests
storageIdlers map[string]struct{} // Peers that aren't serving storage requests
accountReqs map[uint64]*accountRequest // Account requests currently running
bytecodeReqs map[uint64]*bytecodeRequest // Bytecode requests currently running
storageReqs map[uint64]*storageRequest // Storage requests currently running
accountSynced uint64 // Number of accounts downloaded
accountBytes common.StorageSize // Number of account trie bytes persisted to disk
bytecodeSynced uint64 // Number of bytecodes downloaded
bytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
storageSynced uint64 // Number of storage slots downloaded
storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
extProgress *SyncProgress // progress that can be exposed to external caller.
// Request tracking during healing phase
trienodeHealIdlers map[string]struct{} // Peers that aren't serving trie node requests
bytecodeHealIdlers map[string]struct{} // Peers that aren't serving bytecode requests
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend atomic.Uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed
trienodeHealNops uint64 // Number of state trie nodes not requested
bytecodeHealSynced uint64 // Number of bytecodes downloaded
bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
bytecodeHealDups uint64 // Number of bytecodes already processed
bytecodeHealNops uint64 // Number of bytecodes not requested
stateWriter ethdb.Batch // Shared batch writer used for persisting raw states
accountHealed uint64 // Number of accounts downloaded during the healing stage
accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage
storageHealed uint64 // Number of storage slots downloaded during the healing stage
storageHealedBytes common.StorageSize // Number of raw storage bytes persisted to disk during the healing stage
startTime time.Time // Time instance when snapshot sync started
logTime time.Time // Time instance when status was last reported
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
}
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
// snap protocol.
func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
return &Syncer{
db: db,
scheme: scheme,
peers: make(map[string]SyncPeer),
peerJoin: new(event.Feed),
peerDrop: new(event.Feed),
rates: msgrate.NewTrackers(log.New("proto", "snap")),
update: make(chan struct{}, 1),
accountIdlers: make(map[string]struct{}),
storageIdlers: make(map[string]struct{}),
bytecodeIdlers: make(map[string]struct{}),
accountReqs: make(map[uint64]*accountRequest),
storageReqs: make(map[uint64]*storageRequest),
bytecodeReqs: make(map[uint64]*bytecodeRequest),
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),
extProgress: new(SyncProgress),
}
}
// Register injects a new data source into the syncer's peerset.
func (s *Syncer) Register(peer SyncPeer) error {
// Make sure the peer is not registered yet
id := peer.ID()
s.lock.Lock()
if _, ok := s.peers[id]; ok {
log.Error("Snap peer already registered", "id", id)
s.lock.Unlock()
return errors.New("already registered")
}
s.peers[id] = peer
s.rates.Track(id, msgrate.NewTracker(s.rates.MeanCapacities(), s.rates.MedianRoundTrip()))
// Mark the peer as idle, even if no sync is running
s.accountIdlers[id] = struct{}{}
s.storageIdlers[id] = struct{}{}
s.bytecodeIdlers[id] = struct{}{}
s.trienodeHealIdlers[id] = struct{}{}
s.bytecodeHealIdlers[id] = struct{}{}
s.lock.Unlock()
// Notify any active syncs that a new peer can be assigned data
s.peerJoin.Send(id)
return nil
}
// Unregister injects a new data source into the syncer's peerset.
func (s *Syncer) Unregister(id string) error {
// Remove all traces of the peer from the registry
s.lock.Lock()
if _, ok := s.peers[id]; !ok {
log.Error("Snap peer not registered", "id", id)
s.lock.Unlock()
return errors.New("not registered")
}
delete(s.peers, id)
s.rates.Untrack(id)
// Remove status markers, even if no sync is running
delete(s.statelessPeers, id)
delete(s.accountIdlers, id)
delete(s.storageIdlers, id)
delete(s.bytecodeIdlers, id)
delete(s.trienodeHealIdlers, id)
delete(s.bytecodeHealIdlers, id)
s.lock.Unlock()
// Notify any active syncs that pending requests need to be reverted
s.peerDrop.Send(id)
return nil
}
// Sync starts (or resumes a previous) sync cycle to iterate over a state trie
// with the given root and reconstruct the nodes based on the snapshot leaves.
// Previously downloaded segments will not be redownloaded of fixed, rather any
// errors will be healed after the leaves are fully accumulated.
func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
// Move the trie root from any previous value, revert stateless markers for
// any peers and initialize the syncer if it was not yet run
s.lock.Lock()
s.root = root
s.healer = &healTask{
scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme),
trieTasks: make(map[string]common.Hash),
codeTasks: make(map[common.Hash]struct{}),
}
s.statelessPeers = make(map[string]struct{})
s.lock.Unlock()
if s.startTime == (time.Time{}) {
s.startTime = time.Now()
}
// Retrieve the previous sync status from LevelDB and abort if already synced
s.loadSyncStatus()
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
log.Debug("Snapshot sync already completed")
return nil
}
defer func() { // Persist any progress, independent of failure
for _, task := range s.tasks {
s.forwardAccountTask(task)
}
s.cleanAccountTasks()
s.saveSyncStatus()
}()
log.Debug("Starting snapshot sync cycle", "root", root)
// Flush out the last committed raw states
defer func() {
if s.stateWriter.ValueSize() > 0 {
s.stateWriter.Write()
s.stateWriter.Reset()
}
}()
defer s.report(true)
// commit any trie- and bytecode-healing data.
defer s.commitHealer(true)
// Whether sync completed or not, disregard any future packets
defer func() {
log.Debug("Terminating snapshot sync cycle", "root", root)
s.lock.Lock()
s.accountReqs = make(map[uint64]*accountRequest)
s.storageReqs = make(map[uint64]*storageRequest)
s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
s.lock.Unlock()
}()
// Keep scheduling sync tasks
peerJoin := make(chan string, 16)
peerJoinSub := s.peerJoin.Subscribe(peerJoin)
defer peerJoinSub.Unsubscribe()
peerDrop := make(chan string, 16)
peerDropSub := s.peerDrop.Subscribe(peerDrop)
defer peerDropSub.Unsubscribe()
// Create a set of unique channels for this sync cycle. We need these to be
// ephemeral so a data race doesn't accidentally deliver something stale on
// a persistent channel across syncs (yup, this happened)
var (
accountReqFails = make(chan *accountRequest)
storageReqFails = make(chan *storageRequest)
bytecodeReqFails = make(chan *bytecodeRequest)
accountResps = make(chan *accountResponse)
storageResps = make(chan *storageResponse)
bytecodeResps = make(chan *bytecodeResponse)
trienodeHealReqFails = make(chan *trienodeHealRequest)
bytecodeHealReqFails = make(chan *bytecodeHealRequest)
trienodeHealResps = make(chan *trienodeHealResponse)
bytecodeHealResps = make(chan *bytecodeHealResponse)
)
for {
// Remove all completed tasks and terminate sync if everything's done
s.cleanStorageTasks()
s.cleanAccountTasks()
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
return nil
}
// Assign all the data retrieval tasks to any free peers
s.assignAccountTasks(accountResps, accountReqFails, cancel)
s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
s.assignStorageTasks(storageResps, storageReqFails, cancel)
if len(s.tasks) == 0 {
// Sync phase done, run heal phase
s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel)
s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel)
}
// Update sync progress
s.lock.Lock()
s.extProgress = &SyncProgress{
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
BytecodeBytes: s.bytecodeBytes,
StorageSynced: s.storageSynced,
StorageBytes: s.storageBytes,
TrienodeHealSynced: s.trienodeHealSynced,
TrienodeHealBytes: s.trienodeHealBytes,
BytecodeHealSynced: s.bytecodeHealSynced,
BytecodeHealBytes: s.bytecodeHealBytes,
}
s.lock.Unlock()
// Wait for something to happen
select {
case <-s.update:
// Something happened (new peer, delivery, timeout), recheck tasks
case <-peerJoin:
// A new peer joined, try to schedule it new tasks
case id := <-peerDrop:
s.revertRequests(id)
case <-cancel:
return ErrCancelled
case req := <-accountReqFails:
s.revertAccountRequest(req)
case req := <-bytecodeReqFails:
s.revertBytecodeRequest(req)
case req := <-storageReqFails:
s.revertStorageRequest(req)
case req := <-trienodeHealReqFails:
s.revertTrienodeHealRequest(req)
case req := <-bytecodeHealReqFails:
s.revertBytecodeHealRequest(req)
case res := <-accountResps:
s.processAccountResponse(res)
case res := <-bytecodeResps:
s.processBytecodeResponse(res)
case res := <-storageResps:
s.processStorageResponse(res)
case res := <-trienodeHealResps:
s.processTrienodeHealResponse(res)
case res := <-bytecodeHealResps:
s.processBytecodeHealResponse(res)
}
// Report stats if something meaningful happened
s.report(false)
}
}
// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
var progress SyncProgress
if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
if err := json.Unmarshal(status, &progress); err != nil {
log.Error("Failed to decode snap sync status", "err", err)
} else {
for _, task := range progress.Tasks {
log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
}
s.tasks = progress.Tasks
for _, task := range s.tasks {
task := task // closure for task.genBatch in the stacktrie writer callback
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(task.genBatch, owner, path, hash, val, s.scheme)
})
for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback
subtask.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
subtask.genTrie = trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, val, s.scheme)
}, accountHash)
}
}
}
s.lock.Lock()
defer s.lock.Unlock()
s.snapped = len(s.tasks) == 0
s.accountSynced = progress.AccountSynced
s.accountBytes = progress.AccountBytes
s.bytecodeSynced = progress.BytecodeSynced
s.bytecodeBytes = progress.BytecodeBytes
s.storageSynced = progress.StorageSynced
s.storageBytes = progress.StorageBytes
s.trienodeHealSynced = progress.TrienodeHealSynced
s.trienodeHealBytes = progress.TrienodeHealBytes
s.bytecodeHealSynced = progress.BytecodeHealSynced
s.bytecodeHealBytes = progress.BytecodeHealBytes
return
}
}
// Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling
// them for retrieval.
s.tasks = nil
s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0
s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
var next common.Hash
step := new(big.Int).Sub(
new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(int64(accountConcurrency)),
), common.Big1,
)
for i := 0; i < accountConcurrency; i++ {
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
if i == accountConcurrency-1 {
// Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
genTrie: trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}),
})
log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
}
}
// saveSyncStatus marshals the remaining sync tasks into leveldb.
func (s *Syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
}
}
}
// Store the actual progress markers
progress := &SyncProgress{
Tasks: s.tasks,
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
BytecodeBytes: s.bytecodeBytes,
StorageSynced: s.storageSynced,
StorageBytes: s.storageBytes,
TrienodeHealSynced: s.trienodeHealSynced,
TrienodeHealBytes: s.trienodeHealBytes,
BytecodeHealSynced: s.bytecodeHealSynced,
BytecodeHealBytes: s.bytecodeHealBytes,
}
status, err := json.Marshal(progress)
if err != nil {
panic(err) // This can only fail during implementation
}
rawdb.WriteSnapshotSyncStatus(s.db, status)
}
// Progress returns the snap sync status statistics.
func (s *Syncer) Progress() (*SyncProgress, *SyncPending) {
s.lock.Lock()
defer s.lock.Unlock()
pending := new(SyncPending)
if s.healer != nil {
pending.TrienodeHeal = uint64(len(s.healer.trieTasks))
pending.BytecodeHeal = uint64(len(s.healer.codeTasks))
}
return s.extProgress, pending
}
// cleanAccountTasks removes account range retrieval tasks that have already been
// completed.
func (s *Syncer) cleanAccountTasks() {
// If the sync was already done before, don't even bother
if len(s.tasks) == 0 {
return
}
// Sync wasn't finished previously, check for any task that can be finalized
for i := 0; i < len(s.tasks); i++ {
if s.tasks[i].done {
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
i--
}
}
// If everything was just finalized just, generate the account trie and start heal
if len(s.tasks) == 0 {
s.lock.Lock()
s.snapped = true
s.lock.Unlock()
// Push the final sync report
s.reportSyncProgress(true)
}
}
// cleanStorageTasks iterates over all the account tasks and storage sub-tasks
// within, cleaning any that have been completed.
func (s *Syncer) cleanStorageTasks() {
for _, task := range s.tasks {
for account, subtasks := range task.SubTasks {
// Remove storage range retrieval tasks that completed
for j := 0; j < len(subtasks); j++ {
if subtasks[j].done {
subtasks = append(subtasks[:j], subtasks[j+1:]...)
j--
}
}
if len(subtasks) > 0 {
task.SubTasks[account] = subtasks
continue
}
// If all storage chunks are done, mark the account as done too
for j, hash := range task.res.hashes {
if hash == account {
task.needState[j] = false
}
}
delete(task.SubTasks, account)
task.pend--
// If this was the last pending task, forward the account task
if task.pend == 0 {
s.forwardAccountTask(task)
}
}
}
}
// assignAccountTasks attempts to match idle peers to pending account range
// retrievals.
func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.accountIdlers)),
caps: make([]int, 0, len(s.accountIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.accountIdlers {
if _, ok := s.statelessPeers[id]; ok {
continue
}
idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL))
}
if len(idlers.ids) == 0 {
return
}
sort.Sort(sort.Reverse(idlers))
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
// Skip any tasks already filling
if task.req != nil || task.res != nil {
continue
}
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
// Abort the entire assignment mechanism.
if len(idlers.ids) == 0 {
return
}
var (
idle = idlers.ids[0]
peer = s.peers[idle]
cap = idlers.caps[0]
)
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
for {
reqid = uint64(rand.Int63())
if reqid == 0 {
continue
}
if _, ok := s.accountReqs[reqid]; ok {
continue
}
break
}
// Generate the network query and send it to the peer
req := &accountRequest{
peer: idle,
id: reqid,
time: time.Now(),
deliver: success,
revert: fail,
cancel: cancel,
stale: make(chan struct{}),
origin: task.Next,
limit: task.Last,
task: task,
}
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
peer.Log().Debug("Account range request timed out", "reqid", reqid)
s.rates.Update(idle, AccountRangeMsg, 0, 0)
s.scheduleRevertAccountRequest(req)
})