/
base_reactor.go
135 lines (117 loc) · 4.37 KB
/
base_reactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package p2p
import (
"github.com/Finschia/ostracon/libs/service"
"github.com/Finschia/ostracon/p2p/conn"
)
// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
// when the peer is stopped. Receive is called when a message is received on a
// channel associated with this reactor.
//
// Peer#Send or Peer#TrySend should be used to send the message to a peer.
type Reactor interface {
service.Service // Start, Stop
// SetSwitch allows setting a switch.
SetSwitch(*Switch)
// GetChannels returns the list of MConnection.ChannelDescriptor. Make sure
// that each ID is unique across all the reactors added to the switch.
GetChannels() []*conn.ChannelDescriptor
// InitPeer is called by the switch before the peer is started. Use it to
// initialize data for the peer (e.g. peer state).
//
// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
// the peer. Do not store any data associated with the peer in the reactor
// itself unless you don't want to have a state, which is never cleaned up.
InitPeer(peer Peer) Peer
// AddPeer is called by the switch after the peer is added and successfully
// started. Use it to start goroutines communicating with the peer.
AddPeer(peer Peer)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
//
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive.
// Deprecated: Reactors looking to receive data from a peer should implement ReceiveEnvelope.
// Receive will be deprecated in favor of ReceiveEnvelope in v0.37.
Receive(chID byte, peer Peer, msgBytes []byte)
// receive async version
GetRecvChan() chan *BufferedMsg
// receive routine per reactor
RecvRoutine()
}
type EnvelopeReceiver interface {
// ReceiveEnvelope is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor.
//
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive. Receive will be replaced by ReceiveEnvelope in a future version
ReceiveEnvelope(Envelope)
}
//--------------------------------------
type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch
recvMsgBuf chan *BufferedMsg
impl Reactor
}
func NewBaseReactor(name string, impl Reactor, async bool, recvBufSize int) *BaseReactor {
baseReactor := &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
impl: impl,
}
if async {
baseReactor.recvMsgBuf = make(chan *BufferedMsg, recvBufSize)
}
return baseReactor
}
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (br *BaseReactor) OnStart() error {
if br.recvMsgBuf != nil {
// if it is async mode it starts RecvRoutine()
go br.RecvRoutine()
}
return nil
}
func (br *BaseReactor) RecvRoutine() {
for {
select {
case msg := <-br.recvMsgBuf:
if nr, ok := br.impl.(EnvelopeReceiver); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: msg.ChID,
Src: msg.Peer,
Message: msg.ProtoMsg,
})
} else {
br.impl.Receive(msg.ChID, msg.Peer, msg.Msg)
}
case <-br.Quit():
return
}
}
}
func (br *BaseReactor) GetRecvChan() chan *BufferedMsg {
if br.recvMsgBuf == nil {
panic("It's not async reactor, but GetRecvChan() is called ")
}
return br.recvMsgBuf
}