/
blockchain.go
648 lines (578 loc) · 25.7 KB
/
blockchain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
package core
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/exports/syncx"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
miveconsensus "github.com/ethereum-mive/mive/consensus"
miverawdb "github.com/ethereum-mive/mive/core/rawdb"
mivetypes "github.com/ethereum-mive/mive/core/types"
miveparams "github.com/ethereum-mive/mive/params"
)
var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
headFinalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil)
headSafeBlockGauge = metrics.NewRegisteredGauge("chain/head/safe", nil)
chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
accountCommitTimer = metrics.NewRegisteredTimer("chain/account/commits", nil)
storageReadTimer = metrics.NewRegisteredTimer("chain/storage/reads", nil)
storageHashTimer = metrics.NewRegisteredTimer("chain/storage/hashes", nil)
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)
triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")
)
const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
)
// triedbConfig derives the configures for trie database.
func triedbConfig(c *core.CacheConfig) *trie.Config {
config := &trie.Config{Preimages: c.Preimages}
if c.StateScheme == rawdb.HashScheme {
config.HashDB = &hashdb.Config{
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
}
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
return config
}
type BlockChain struct {
chainConfig *miveparams.ChainConfig // Chain & network configuration
cacheConfig *core.CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisHeader *mivetypes.Header
// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
chainmu *syncx.ClosableMutex
currentBlock atomic.Pointer[mivetypes.Header] // Current head of the chain
currentSnapBlock atomic.Pointer[mivetypes.Header] // Current head of snap-sync
// Introduced by the Beacon Chain
// https://www.alchemy.com/overviews/ethereum-commitment-levels
currentFinalBlock atomic.Pointer[mivetypes.Header] // Latest (consensus) finalized block
currentSafeBlock atomic.Pointer[mivetypes.Header] // Latest (consensus) safe block
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]
// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]
wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
engine miveconsensus.Engine
validator core.Validator // Block and state validator interface
prefetcher core.Prefetcher
processor core.Processor // Block transaction processor interface
vmConfig vm.Config
ethClient *ethclient.Client
ctx context.Context
ctxCancel context.CancelFunc
}
func NewBlockChain(db ethdb.Database, cacheConfig *core.CacheConfig, genesis *Genesis, overrides *core.ChainOverrides, engine miveconsensus.Engine, vmConfig vm.Config, ethClient *ethclient.Client) (*BlockChain, error) {
// Open trie database with provided config
triedb := trie.NewDatabase(db, triedbConfig(cacheConfig))
ctx, ctxCancel := context.WithCancel(context.Background())
chainConfig, genesisHash, genesisErr := SetupGenesisBlockWithOverride(ctx, db, triedb, genesis, overrides, ethClient)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
ctxCancel()
return nil, genesisErr
}
_ = genesisHash
log.Info("")
log.Info(strings.Repeat("-", 153))
for _, line := range strings.Split(chainConfig.Description(), "\n") {
log.Info(line)
}
log.Info(strings.Repeat("-", 153))
log.Info("")
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
vmConfig: vmConfig,
ethClient: ethClient,
ctx: ctx,
ctxCancel: ctxCancel,
}
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)
var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
return nil, err
}
bc.genesisHeader = bc.GetHeaderByNumber(chainConfig.Mive.GenesisBlock.Uint64())
if bc.genesisHeader == nil {
return nil, core.ErrNoGenesis
}
bc.currentBlock.Store(nil)
bc.currentSnapBlock.Store(nil)
bc.currentFinalBlock.Store(nil)
bc.currentSafeBlock.Store(nil)
// If Mive is initialized with an external ancient store, re-initialize the
// missing chain indexes and chain flags. This procedure can survive crash
// and can be resumed in next restart since chain flags are updated in last step.
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
}
return bc, nil
}
// empty returns an indicator whether the blockchain is empty.
// Note, it's a special case that we connect a non-empty ancient
// database with an empty node, so that we can plugin the ancient
// into node seamlessly.
func (bc *BlockChain) empty() bool {
genesis := bc.genesisHeader.Hash
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} {
if hash != genesis {
return false
}
}
return true
}
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (bc *BlockChain) loadLastState() error {
// Restore the last known head block
head := rawdb.ReadHeadBlockHash(bc.db)
if head == (common.Hash{}) {
// Corrupt or empty database, init from scratch
log.Warn("Empty database, resetting chain")
return bc.Reset()
}
// Make sure the entire head header is available
headBlock := bc.GetBlockByHash(head)
if headBlock == nil {
// Corrupt or empty database, init from scratch
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
headHeader := bc.GetHeaderByHash(head)
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(headHeader)
headBlockGauge.Update(int64(headHeader.NumberU64()))
// Restore the last known head snap block
bc.currentSnapBlock.Store(headHeader)
headFastBlockGauge.Update(int64(headHeader.NumberU64()))
// Restore the last known head header
if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
headHeader = header
}
}
bc.hc.SetCurrentHeader(headHeader)
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
bc.currentSnapBlock.Store(header)
headFastBlockGauge.Update(int64(header.NumberU64()))
}
}
// Restore the last known finalized block and safe block
// Note: the safe block is not stored on disk and it is set to the last
// known finalized block on startup
if head := rawdb.ReadFinalizedBlockHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
bc.currentFinalBlock.Store(header)
headFinalizedBlockGauge.Update(int64(header.NumberU64()))
bc.currentSafeBlock.Store(header)
headSafeBlockGauge.Update(int64(header.NumberU64()))
}
}
// Issue a status log for the user
currentSnapBlock := bc.CurrentSnapBlock()
currentFinalBlock := bc.CurrentFinalBlock()
if headHeader.Hash != headBlock.Hash() {
log.Info("Loaded most recent local header",
"number", headHeader.Number,
"hash", headHeader.Hash,
"age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
}
log.Info("Loaded most recent local block",
"number", headBlock.Number(),
"hash", headBlock.Hash(),
"age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
if headBlock.Hash() != currentSnapBlock.Hash {
log.Info("Loaded most recent local snap block",
"number", currentSnapBlock.Number,
"hash", currentSnapBlock.Hash,
"age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
}
if currentFinalBlock != nil {
log.Info("Loaded most recent local finalized block",
"number", currentFinalBlock.Number,
"hash", currentFinalBlock.Hash,
"age", common.PrettyAge(time.Unix(int64(currentFinalBlock.Time), 0)))
}
if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
log.Info("Loaded last snap-sync pivot marker", "number", *pivot)
}
return nil
}
// Reset purges the entire blockchain, restoring it to its genesis state.
func (bc *BlockChain) Reset() error {
return bc.ResetWithGenesisBlock(bc.genesisHeader)
}
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
func (bc *BlockChain) ResetWithGenesisBlock(genesis *mivetypes.Header) error {
// Dump the entire blockchain and purge the caches
if err := bc.SetHead(bc.genesisHeader.NumberU64()); err != nil {
return err
}
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
batch := bc.db.NewBatch()
miverawdb.WriteHeader(batch, genesis)
if err := batch.Write(); err != nil {
log.Crit("Failed to write genesis block", "err", err)
}
bc.writeHeadBlock(genesis)
// Last update all in-memory chain markers
bc.genesisHeader = genesis
bc.currentBlock.Store(bc.genesisHeader)
headBlockGauge.Update(int64(bc.genesisHeader.NumberU64()))
bc.hc.SetGenesis(bc.genesisHeader)
bc.hc.SetCurrentHeader(bc.genesisHeader)
bc.currentSnapBlock.Store(bc.genesisHeader)
headFastBlockGauge.Update(int64(bc.genesisHeader.NumberU64()))
return nil
}
// SetHead rewinds the local chain to a new head. Depending on whether the node
// was snap synced or full synced and in which state, the method will try to
// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
if _, err := bc.setHeadBeyondRoot(head, 0, common.Hash{}, false); err != nil {
return err
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash, header.NumberU64())
if block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash)
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash.Bytes()[:4])
}
bc.chainHeadFeed.Send(core.ChainHeadEvent{Block: block})
return nil
}
// SetHeadWithTimestamp rewinds the local chain to a new head that has at max
// the given timestamp. Depending on whether the node was snap synced or full
// synced and in which state, the method will try to delete minimal data from
// disk whilst retaining chain consistency.
func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error {
if _, err := bc.setHeadBeyondRoot(0, timestamp, common.Hash{}, false); err != nil {
return err
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash, header.NumberU64())
if block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash)
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash.Bytes()[:4])
}
bc.chainHeadFeed.Send(core.ChainHeadEvent{Block: block})
return nil
}
// SetFinalized sets the finalized block.
func (bc *BlockChain) SetFinalized(header *mivetypes.Header) {
bc.currentFinalBlock.Store(header)
if header != nil {
rawdb.WriteFinalizedBlockHash(bc.db, header.Hash)
headFinalizedBlockGauge.Update(int64(header.NumberU64()))
} else {
rawdb.WriteFinalizedBlockHash(bc.db, common.Hash{})
headFinalizedBlockGauge.Update(0)
}
}
// SetSafe sets the safe block.
func (bc *BlockChain) SetSafe(header *mivetypes.Header) {
bc.currentSafeBlock.Store(header)
if header != nil {
headSafeBlockGauge.Update(int64(header.NumberU64()))
} else {
headSafeBlockGauge.Update(0)
}
}
func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Hash, repair bool) (uint64, error) {
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
// Track the block number of the requested root hash
var rootNumber uint64 // (no root == always 0) TODO
// Retrieve the last pivot block to short circuit rollbacks beyond it and the
// current freezer limit to start nuking id underflown
pivot := rawdb.ReadLastPivotNumber(bc.db)
frozen, _ := bc.db.Ancients()
updateFn := func(db ethdb.KeyValueWriter, header *mivetypes.Header) (*mivetypes.Header, bool) {
// Rewind the blockchain, ensuring we don't end up with a stateless head
// block. Note, depth equality is permitted to allow using SetHead as a
// chain reparation mechanism without deleting any data!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.NumberU64() <= currentBlock.NumberU64() {
newHeadBlock := bc.GetHeader(header.Hash, header.NumberU64())
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash)
newHeadBlock = bc.genesisHeader
} else {
// Block exists. Keep rewinding until either we find one with state
// or until we exceed the optional threshold root hash
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)
for {
// If a root threshold was requested but not yet crossed, check
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root == root {
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
}
if !bc.HasState(newHeadBlock.Root) && !bc.stateRecoverable(newHeadBlock.Root) {
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash)
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
parent := bc.GetHeader(newHeadBlock.ParentHash, newHeadBlock.NumberU64()-1)
if parent != nil {
newHeadBlock = parent
continue
}
log.Error("Missing block in the middle, aiming genesis", "number", newHeadBlock.NumberU64()-1, "hash", newHeadBlock.ParentHash)
newHeadBlock = bc.genesisHeader
} else {
log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash, "pivot", *pivot)
newHeadBlock = bc.genesisHeader
}
}
if beyondRoot || newHeadBlock.NumberU64() == bc.genesisHeader.NumberU64() {
if !bc.HasState(newHeadBlock.Root) && bc.stateRecoverable(newHeadBlock.Root) {
// Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here.
if err := bc.triedb.Recover(newHeadBlock.Root); err != nil {
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash)
}
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash, "root", newHeadBlock.Root)
newHeadBlock = bc.GetHeader(newHeadBlock.ParentHash, newHeadBlock.NumberU64()-1) // Keep rewinding
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash)
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of SetHead is from high
// to low, so it's safe to update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
// The head state is missing, which is only possible in the path-based
// scheme. This situation occurs when the chain head is rewound below
// the pivot point. In this scenario, there is no possible recovery
// approach except for rerunning a snap sync. Do nothing here until the
// state syncer picks it up.
if !bc.HasState(newHeadBlock.Root) {
log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number, "hash", newHeadBlock.Hash)
}
}
// Rewind the snap block in a simpleton way to the target head
if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.NumberU64() < currentSnapBlock.NumberU64() {
newHeadSnapBlock := bc.GetHeader(header.Hash, header.NumberU64())
// If either blocks reached nil, reset to the genesis state
if newHeadSnapBlock == nil {
newHeadSnapBlock = bc.genesisHeader
}
rawdb.WriteHeadFastBlockHash(db, newHeadSnapBlock.Hash)
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentSnapBlock.Store(newHeadSnapBlock)
headFastBlockGauge.Update(int64(newHeadSnapBlock.NumberU64()))
}
var (
headHeader = bc.CurrentBlock()
headNumber = headHeader.NumberU64()
)
// If setHead underflown the freezer threshold and the block processing
// intent afterwards is full block importing, delete the chain segment
// between the stateful-block and the sethead target.
var wipe bool
if headNumber+1 < frozen {
wipe = pivot == nil || headNumber >= *pivot
}
return headHeader, wipe // Only force wipe if full synced
}
// Rewind the header chain, deleting all block bodies until then
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
// Remove relative receipts from the active store.
// The header and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteReceipts(db, hash, num)
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
// If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken
if repair {
if target, force := updateFn(bc.db, bc.CurrentBlock()); force {
bc.hc.SetHead(target.NumberU64(), updateFn, delFn)
}
} else {
// Rewind the chain to the requested head and keep going backwards until a
// block with a state is found or snap sync pivot is passed
if time > 0 {
log.Warn("Rewinding blockchain to timestamp", "target", time)
bc.hc.SetHeadWithTimestamp(time, updateFn, delFn)
} else {
log.Warn("Rewinding blockchain to block", "target", head)
bc.hc.SetHead(head, updateFn, delFn)
}
}
// Clear out any stale content from the caches
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.futureBlocks.Purge()
// Clear safe block, finalized block if needed
if safe := bc.CurrentSafeBlock(); safe != nil && head < safe.NumberU64() {
log.Warn("SetHead invalidated safe block")
bc.SetSafe(nil)
}
if finalized := bc.CurrentFinalBlock(); finalized != nil && head < finalized.NumberU64() {
log.Error("SetHead invalidated finalized block")
bc.SetFinalized(nil)
}
return rootNumber, bc.loadLastState()
}
// writeHeadBlock injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head snap sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) writeHeadBlock(header *mivetypes.Header) {
// Add the block to the canonical chain number scheme and mark as the head
batch := bc.db.NewBatch()
rawdb.WriteHeadHeaderHash(batch, header.Hash)
rawdb.WriteHeadFastBlockHash(batch, header.Hash)
rawdb.WriteCanonicalHash(batch, header.Hash, header.NumberU64())
rawdb.WriteHeadBlockHash(batch, header.Hash)
// Flush the whole batch into the disk, exit the node if failed
if err := batch.Write(); err != nil {
log.Crit("Failed to update chain indexes and markers", "err", err)
}
// Update all in-memory chain markers in the last step
bc.hc.SetCurrentHeader(header)
bc.currentSnapBlock.Store(header)
headFastBlockGauge.Update(int64(header.NumberU64()))
bc.currentBlock.Store(header)
headBlockGauge.Update(int64(header.NumberU64()))
}
func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) {
// If the chain is terminating, don't even bother starting up.
if bc.insertStopped() {
return 0, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
core.SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig.Eth, chain[0].Number(), chain[0].Time()), chain)
return 0, nil
}
// StopInsert interrupts all insertion methods, causing them to return
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
bc.procInterrupt.Store(true)
}
// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return bc.procInterrupt.Load()
}