/
block_manager.go
485 lines (401 loc) · 12.8 KB
/
block_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
package node
import (
"fmt"
"sync"
"time"
"github.com/ellcrys/elld/params"
"github.com/ellcrys/elld/node/common"
"github.com/ellcrys/elld/util/cache"
"gopkg.in/oleiade/lane.v1"
"github.com/ellcrys/elld/miner"
"github.com/ellcrys/elld/types"
"github.com/ellcrys/elld/types/core"
"github.com/ellcrys/elld/util/logger"
"github.com/jinzhu/copier"
"github.com/olebedev/emitter"
"github.com/shopspring/decimal"
)
// unprocessedBlock represents a block the requires processing.
type unprocessedBlock struct {
// block is the block that needs to be processed
block types.Block
// done is a channel to send the processing error status
done chan error
}
type processedBlock struct {
// block is the processed block
block types.Block
// atSyncTime indicates that the block
// was processed during a sync session
atSyncTime bool
}
// BlockManager is responsible for handling
// incoming, mined or processed blocks in a
// concurrency safe way.
type BlockManager struct {
// evt is the global event emitter
evt *emitter.Emitter
// syncMtx is a mutex used during block sync.
syncMtx *sync.RWMutex
// syncing indicates that block syncing is in progress
syncing bool
// log is the logger used by this module
log logger.Logger
// bChain is the blockchain manager
bChain types.Blockchain
// miner is CPU miner
miner *miner.Miner
// engine is the node's instance
engine *Node
// syncCandidate are candidate peers to
// sync blocks with.
syncCandidate map[string]*types.SyncPeerChainInfo
// bestSyncCandidate is the current best sync
// candidate to perform block synchronization with.
bestSyncCandidate *types.SyncPeerChainInfo
// unprocessed hold blocks that are yet to be processed
unprocessed *lane.Deque
// processedBlocks hold blocks that have been processed
processedBlocks *lane.Deque
// mined holds the hash of blocks mined by the client
mined *cache.Cache
}
// NewBlockManager creates a new BlockManager
func NewBlockManager(node *Node) *BlockManager {
bm := &BlockManager{
syncMtx: &sync.RWMutex{},
log: node.log,
evt: node.event,
bChain: node.bChain,
engine: node,
unprocessed: lane.NewDeque(),
processedBlocks: lane.NewDeque(),
mined: cache.NewCache(100),
syncCandidate: make(map[string]*types.SyncPeerChainInfo),
}
return bm
}
// SetMiner sets a reference of the CPU miner
func (bm *BlockManager) SetMiner(m *miner.Miner) {
bm.miner = m
}
// Manage handles all incoming block related events.
func (bm *BlockManager) Manage() {
go func() {
for evt := range bm.evt.On(core.EventFoundBlock) {
b := evt.Args[0].(*miner.FoundBlock)
bm.mined.Add(b.Block.GetHash().HexStr(), struct{}{})
errCh := evt.Args[1].(chan error)
bm.unprocessed.Append(&unprocessedBlock{
block: b.Block,
done: errCh,
})
}
}()
go func() {
for evt := range bm.evt.On(core.EventNewBlock) {
bm.processedBlocks.Append(&processedBlock{
block: evt.Args[0].(*core.Block),
atSyncTime: bm.IsSyncing(),
})
}
}()
go func() {
for evt := range bm.evt.On(core.EventOrphanBlock) {
bm.handleOrphan(evt.Args[0].(*core.Block))
}
}()
go func() {
for evt := range bm.evt.On(core.EventProcessBlock) {
bm.unprocessed.Append(&unprocessedBlock{
block: evt.Args[0].(*core.Block),
})
}
}()
go func() {
for evt := range bm.evt.On(core.EventPeerChainInfo) {
peerChainInfo := evt.Args[0].(*types.SyncPeerChainInfo)
if bm.isSyncCandidate(peerChainInfo) {
bm.addSyncCandidate(peerChainInfo)
if bm.sync() == nil {
bm.log.Info("Block synchronization complete")
}
}
}
}()
go func() {
ticker := time.NewTicker(params.QueueProcessorInterval)
for {
select {
case <-ticker.C:
for !bm.processedBlocks.Empty() {
bm.handleProcessedBlocks()
}
}
}
}()
go func() {
ticker := time.NewTicker(params.QueueProcessorInterval)
for {
select {
case <-ticker.C:
for !bm.unprocessed.Empty() {
bm.handleUnprocessedBlocks()
}
}
}
}()
}
// handleProcessedBlocks fetches blocks from the processed
// block queue to perform post-append operations such as
// updating clearing processed transactions from the pool,
// restarting miners and relaying the processed block.
func (bm *BlockManager) handleProcessedBlocks() error {
var pb = bm.processedBlocks.Shift()
if pb == nil {
return nil
}
b := pb.(*processedBlock).block
atSyncTime := pb.(*processedBlock).atSyncTime
// Remove the blocks transactions from the pool.
bm.engine.txsPool.Remove(b.(*core.Block).GetTransactions()...)
// Restart miner workers if the block was created
// by another peer. The miner typically manages its
// restart when it finds a block. But when a remote
// peer finds a block, we need to force the miner to restart.
if !bm.mined.Has(b.(*core.Block).GetHash().HexStr()) {
bm.miner.RestartWorkers()
}
// Relay the block to peers only when the block is not
// the genesis block, was not processed during a sync session
// and sync mode has not been disabled.
if b.(*core.Block).GetNumber() > 1 && !atSyncTime && !bm.engine.syncMode.IsDisabled() {
_ = bm.engine.Gossip().BroadcastBlock(b.(*core.Block), bm.engine.PM().GetAcquaintedPeers())
}
return nil
}
// handleUnprocessedBlocks fetches unprocessed blocks
// from the unprocessed block queue and attempts to
// append them to the blockchain.
func (bm *BlockManager) handleUnprocessedBlocks() error {
var upb = bm.unprocessed.Shift()
if upb == nil {
return nil
}
b := upb.(*unprocessedBlock).block
errCh := upb.(*unprocessedBlock).done
_, err := bm.bChain.ProcessBlock(b)
if err != nil {
go bm.evt.Emit(core.EventBlockProcessed, b, err)
bm.log.Debug("Failed to process block", "Err", err.Error())
if errCh != nil {
errCh <- err
}
return err
}
go bm.evt.Emit(core.EventBlockProcessed, b, nil)
bm.log.Info("Block has been processed",
"BlockNo", b.GetNumber(),
"BlockHash", b.GetHash().SS())
if errCh != nil {
errCh <- err
}
return nil
}
// handleOrphan sends a RequestBlock message to
// the originator of an orphaned block.
func (bm *BlockManager) handleOrphan(b *core.Block) {
// When the block has no broadcaster, it is likely
// because it was created by the local node and
// became an orphan due to reorganization that
// saw its parent deleted.
if b.Broadcaster == nil {
return
}
parentHash := b.GetHeader().GetParentHash()
bm.log.Debug("Requesting orphan parent block from broadcaster",
"BlockNo", b.GetNumber(),
"ParentBlockHash", parentHash.SS())
bm.engine.gossipMgr.RequestBlock(b.Broadcaster, parentHash)
}
// isSyncCandidate checks whether a peer is a
// valid sync candidate based on its chain state
// information.
// A peer is a valid candidate if the total difficulty
// of its best block is greater than that of the local best
// block
func (bm *BlockManager) isSyncCandidate(info *types.SyncPeerChainInfo) bool {
localBestBlock, _ := bm.engine.GetBlockchain().ChainReader().Current()
if localBestBlock.GetHeader().GetTotalDifficulty().Cmp(info.PeerChainTD) == -1 {
bm.log.Info("Local blockchain is behind peer",
"ChainHeight", localBestBlock.GetNumber(),
"LocalTD", localBestBlock.GetHeader().GetTotalDifficulty(),
"PeerID", info.PeerIDShort,
"PeerChainHeight", info.PeerChainHeight,
"PeerTD", info.PeerChainTD)
return true
}
return false
}
// pickBestSyncCandidate returns the best block synchronization
// candidate. The best candidate is the one with the highest
// total difficulty.
// Note: Not thread safe.
func (bm *BlockManager) pickBestSyncCandidate() *types.SyncPeerChainInfo {
var bestCandidate *types.SyncPeerChainInfo
for _, candidate := range bm.syncCandidate {
if bestCandidate == nil {
bestCandidate = candidate
continue
}
if bestCandidate.PeerChainTD.
Cmp(candidate.PeerChainTD) == -1 {
bestCandidate = candidate
}
}
return bestCandidate
}
// sync starts sync sessions with the available candidates
// starting with the best candidate. The best candidate is
// the one with the highest total difficulty. It continues
// to sync with the best candidate until it is completely
// in sync with it or fails to connect to it.
//
// If there is a failure in connection or a failure in
// requesting for sync objects, the candidate is removed
// and synchronization is restarted.
func (bm *BlockManager) sync() error {
var blockBodies *core.BlockBodies
var blockHashes *core.BlockHashes
var syncStatus *core.SyncStateInfo
var err error
// Abort synchronization if disabled on the engine
if bm.engine.syncMode.IsDisabled() {
return core.ErrAbortedDueToSyncDisablement
}
// Abort synchronization if an existing sync
// session is on-going.
if bm.IsSyncing() {
return fmt.Errorf("aborted. Synchronization is ongoing")
}
bm.syncMtx.Lock()
if len(bm.syncCandidate) == 0 {
bm.syncMtx.Unlock()
return nil
}
bm.syncing = true
// Choose the best candidate peer and
// set it as the current sync peer
bm.bestSyncCandidate = bm.pickBestSyncCandidate()
bm.syncMtx.Unlock()
var peer = bm.engine.peerManager.GetPeer(bm.bestSyncCandidate.PeerID)
if peer == nil {
err := fmt.Errorf("best candidate not found in peer list")
bm.log.Debug(err.Error(), "PeerID", bm.bestSyncCandidate.PeerID)
delete(bm.syncCandidate, bm.bestSyncCandidate.PeerID)
goto resync
}
// Request block hashes from the peer
blockHashes, err = bm.engine.gossipMgr.SendGetBlockHashes(peer, nil,
bm.bestSyncCandidate.LastBlockSent)
if err != nil {
bm.log.Debug("Failed to get block hashes", "Err", err.Error())
delete(bm.syncCandidate, bm.bestSyncCandidate.PeerID)
goto resync
}
// Request for block bodies
blockBodies, err = bm.engine.gossipMgr.SendGetBlockBodies(peer, blockHashes.Hashes)
if err != nil {
bm.log.Debug("Failed to get block bodies", "Err", err.Error())
delete(bm.syncCandidate, bm.bestSyncCandidate.PeerID)
goto resync
}
bm.log.Debug("Received block bodies",
"PeerID", bm.bestSyncCandidate.PeerID,
"NumBlockBodies", len(blockBodies.Blocks))
// Attempt to append the block bodies to the blockchain
for _, bb := range blockBodies.Blocks {
var block core.Block
copier.Copy(&block, bb)
// Set the broadcaster
block.SetBroadcaster(peer)
bm.bestSyncCandidate.LastBlockSent = block.GetHash()
hk := common.KeyBlock2(block.GetHashAsHex(), bm.bestSyncCandidate.PeerID)
bm.engine.GetHistory().AddMulti(cache.Sec(600), hk...)
// Process the block
bm.unprocessed.Append(&unprocessedBlock{
block: &block,
})
}
// Let's check if the candidate is still a viable
// sync candidate. If it is not, remove it as a
// sync candidate and proceed to starting the sync
// process with another peer.
if !bm.isSyncCandidate(bm.bestSyncCandidate) {
delete(bm.syncCandidate, bm.bestSyncCandidate.PeerID)
goto resync
}
syncStatus = bm.GetSyncStat()
if syncStatus != nil {
bm.log.Info("Current synchronization status",
"TargetTD", syncStatus.TargetTD,
"CurTD", syncStatus.CurrentTD,
"TargetChainHeight", syncStatus.TargetChainHeight,
"CurChainHeight", syncStatus.CurrentChainHeight,
"Progress(%)", syncStatus.ProgressPercent)
}
resync:
bm.syncMtx.Lock()
bm.syncing = false
bm.bestSyncCandidate = nil
bm.syncMtx.Unlock()
bm.sync()
return nil
}
// IsSyncing checks whether block syncing is active.
func (bm *BlockManager) IsSyncing() bool {
bm.syncMtx.RLock()
defer bm.syncMtx.RUnlock()
return bm.syncing
}
// GetSyncStat returns progress information about
// the current blockchain synchronization session.
func (bm *BlockManager) GetSyncStat() *core.SyncStateInfo {
if !bm.IsSyncing() || bm.bestSyncCandidate == nil {
return nil
}
var syncState = &core.SyncStateInfo{}
// Get the current local best chain
localBestBlock, _ := bm.engine.GetBlockchain().ChainReader().Current()
syncState.TargetTD = bm.bestSyncCandidate.PeerChainTD
syncState.TargetChainHeight = bm.bestSyncCandidate.PeerChainHeight
syncState.CurrentTD = localBestBlock.GetHeader().GetTotalDifficulty()
syncState.CurrentChainHeight = localBestBlock.GetNumber()
// compute progress percentage based
// on block height differences
pct := float64(100) * (float64(syncState.CurrentChainHeight) /
float64(syncState.TargetChainHeight))
syncState.ProgressPercent, _ = decimal.NewFromFloat(pct).
Round(1).Float64()
return syncState
}
// addSyncCandidate adds a sync candidate.
// If the candidate already exists, it updates
// it only if the update candidate has a greater
// total difficulty
func (bm *BlockManager) addSyncCandidate(candidate *types.SyncPeerChainInfo) {
bm.syncMtx.Lock()
existing, ok := bm.syncCandidate[candidate.PeerID]
if ok {
if existing.PeerChainTD.Cmp(candidate.PeerChainTD) == 0 {
bm.syncMtx.Unlock()
return
}
candidate.LastBlockSent = existing.LastBlockSent
bm.log.Debug("Updated sync candidate", "PeerID", candidate.PeerID)
} else {
bm.log.Debug("Added new sync candidate", "PeerID", candidate.PeerID)
}
bm.syncCandidate[candidate.PeerID] = candidate
bm.syncMtx.Unlock()
}