This repository has been archived by the owner on Mar 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 360
/
block_fetcher.go
99 lines (84 loc) · 2.31 KB
/
block_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package netsync
import (
log "github.com/sirupsen/logrus"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/bytom/bytom/p2p/security"
"github.com/bytom/bytom/protocol/bc"
)
const (
maxBlockDistance = 64
maxMsgSetSize = 128
newBlockChSize = 64
)
// blockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type blockFetcher struct {
chain Chain
peers *peerSet
newBlockCh chan *blockMsg
queue *prque.Prque
msgSet map[bc.Hash]*blockMsg
}
//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
func newBlockFetcher(chain Chain, peers *peerSet) *blockFetcher {
f := &blockFetcher{
chain: chain,
peers: peers,
newBlockCh: make(chan *blockMsg, newBlockChSize),
queue: prque.New(),
msgSet: make(map[bc.Hash]*blockMsg),
}
go f.blockProcessor()
return f
}
func (f *blockFetcher) blockProcessor() {
for {
for !f.queue.Empty() {
msg := f.queue.PopItem().(*blockMsg)
if msg.block.Height > f.chain.BestBlockHeight()+1 {
f.queue.Push(msg, -float32(msg.block.Height))
break
}
f.insert(msg)
delete(f.msgSet, msg.block.Hash())
}
f.add(<-f.newBlockCh)
}
}
func (f *blockFetcher) add(msg *blockMsg) {
bestHeight := f.chain.BestBlockHeight()
if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
return
}
blockHash := msg.block.Hash()
if _, ok := f.msgSet[blockHash]; !ok {
f.msgSet[blockHash] = msg
f.queue.Push(msg, -float32(msg.block.Height))
log.WithFields(log.Fields{
"module": logModule,
"block height": msg.block.Height,
"block hash": blockHash.String(),
}).Debug("blockFetcher receive mine block")
}
}
func (f *blockFetcher) insert(msg *blockMsg) {
isOrphan, err := f.chain.ProcessBlock(msg.block)
if err != nil {
peer := f.peers.getPeer(msg.peerID)
if peer == nil {
return
}
f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
return
}
if isOrphan {
return
}
if err := f.peers.broadcastMinedBlock(msg.block); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on broadcast new block")
return
}
}
func (f *blockFetcher) processNewBlock(msg *blockMsg) {
f.newBlockCh <- msg
}