-
Notifications
You must be signed in to change notification settings - Fork 0
/
protocol_reactor.go
99 lines (83 loc) · 2.43 KB
/
protocol_reactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package netsync
import (
"time"
"github.com/fatzero/mass-core/errors"
"github.com/fatzero/mass-core/logging"
"github.com/fatzero/mass-core/p2p"
"github.com/fatzero/mass-core/p2p/connection"
)
const (
handshakeTimeout = 10 * time.Second
handshakeCheckPerid = 500 * time.Millisecond
)
var (
errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
errStatusRequest = errors.New("Status request error")
)
//ProtocolReactor handles new coming protocol message.
type ProtocolReactor struct {
p2p.BaseReactor
sm *SyncManager
peers *peerSet
}
// NewProtocolReactor returns the reactor of whole blockchain.
func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
pr := &ProtocolReactor{
sm: sm,
peers: peers,
}
pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
return pr
}
// GetChannels implements Reactor
func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
return []*connection.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 100,
},
}
}
// OnStart implements BaseService
func (pr *ProtocolReactor) OnStart() error {
pr.BaseReactor.OnStart()
return nil
}
// OnStop implements BaseService
func (pr *ProtocolReactor) OnStop() {
pr.BaseReactor.OnStop()
}
// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return errStatusRequest
}
checkTicker := time.NewTicker(handshakeCheckPerid)
defer checkTicker.Stop()
timeoutTimer := time.NewTimer(handshakeTimeout)
for {
select {
case <-checkTicker.C:
if exist := pr.peers.getPeer(peer.Key); exist != nil {
pr.sm.syncTransactions(peer.Key)
return nil
}
case <-timeoutTimer.C:
return errProtocolHandshakeTimeout
}
}
}
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
pr.peers.removePeer(peer.Key)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
msgType, msg, err := DecodeMessage(msgBytes)
if err != nil {
logging.CPrint(logging.ERROR, "fail on reactor decoding message", logging.LogFormat{"err": err})
return
}
pr.sm.processMsg(src, msgType, msg)
}