-
Notifications
You must be signed in to change notification settings - Fork 9
/
block_fetcher.go
116 lines (100 loc) · 3.04 KB
/
block_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package consensusmgr
import (
log "github.com/sirupsen/logrus"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/bytom/vapor/p2p/security"
"github.com/bytom/vapor/protocol/bc"
)
const (
maxBlockDistance = 64
newBlockChSize = 64
msgLimit = 128 // peer message number limit
)
// blockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type blockFetcher struct {
chain Chain
peers Peers
newBlockCh chan *blockMsg
queue *prque.Prque // block import priority queue
msgSet map[bc.Hash]*blockMsg // already queued blocks
msgCounter map[string]int // per peer msg counter to prevent DOS
}
//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
return &blockFetcher{
chain: chain,
peers: peers,
newBlockCh: make(chan *blockMsg, newBlockChSize),
queue: prque.New(),
msgSet: make(map[bc.Hash]*blockMsg),
msgCounter: make(map[string]int),
}
}
func (f *blockFetcher) blockProcessorLoop() {
for {
for !f.queue.Empty() {
msg := f.queue.PopItem().(*blockMsg)
if msg.block.Height > f.chain.BestBlockHeight()+1 {
f.queue.Push(msg, -float32(msg.block.Height))
break
}
f.insert(msg)
delete(f.msgSet, msg.block.Hash())
f.msgCounter[msg.peerID]--
if f.msgCounter[msg.peerID] <= 0 {
delete(f.msgCounter, msg.peerID)
}
}
f.add(<-f.newBlockCh, msgLimit)
}
}
func (f *blockFetcher) add(msg *blockMsg, limit int) {
// prevent DOS
count := f.msgCounter[msg.peerID] + 1
if count > limit {
log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
return
}
bestHeight := f.chain.BestBlockHeight()
if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
return
}
blockHash := msg.block.Hash()
if _, ok := f.msgSet[blockHash]; !ok {
f.msgSet[blockHash] = msg
f.queue.Push(msg, -float32(msg.block.Height))
f.msgCounter[msg.peerID] = count
log.WithFields(log.Fields{
"module": logModule,
"block height": msg.block.Height,
"block hash": blockHash.String(),
}).Debug("blockFetcher receive propose block")
}
}
func (f *blockFetcher) insert(msg *blockMsg) {
isOrphan, err := f.chain.ProcessBlock(msg.block)
if err != nil {
peer := f.peers.GetPeer(msg.peerID)
if peer == nil {
return
}
f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
return
}
if isOrphan {
return
}
proposeMsg, err := NewBlockProposeMsg(msg.block)
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
return
}
if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
return
}
}
func (f *blockFetcher) processNewBlock(msg *blockMsg) {
f.newBlockCh <- msg
}