-
Notifications
You must be signed in to change notification settings - Fork 0
/
discovery.go
283 lines (239 loc) · 8.96 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package messenger
import (
"context"
"errors"
"net"
"sync"
"time"
"github.com/spf13/viper"
"github.com/dnerochain/dnero/common"
cn "github.com/dnerochain/dnero/p2p/connection"
"github.com/dnerochain/dnero/p2p/netutil"
pr "github.com/dnerochain/dnero/p2p/peer"
p2ptypes "github.com/dnerochain/dnero/p2p/types"
)
//
// PeerDiscoveryManager manages the peer discovery process
//
type PeerDiscoveryManager struct {
messenger *Messenger
//addrBook *AddrBook
peerTable *pr.PeerTable
nodeInfo *p2ptypes.NodeInfo
seedPeers map[string]*pr.Peer
mutex *sync.Mutex
seedPeerOnly bool
// Three mechanisms for peer discovery
seedPeerConnector SeedPeerConnector // pro-actively connect to seed peers
peerDiscMsgHandler PeerDiscoveryMessageHandler // pro-actively connect to peer candidates obtained from connected peers
inboundPeerListener InboundPeerListener // listen to incoming peering requests
// Life cycle
wg *sync.WaitGroup
quit chan struct{}
ctx context.Context
cancel context.CancelFunc
stopped bool
}
//
// PeerDiscoveryManagerConfig specifies the configuration for PeerDiscoveryManager
//
type PeerDiscoveryManagerConfig struct {
MaxNumPeers int
SufficientNumPeers uint
}
// CreatePeerDiscoveryManager creates an instance of the PeerDiscoveryManager
func CreatePeerDiscoveryManager(msgr *Messenger, nodeInfo *p2ptypes.NodeInfo, addrBookFilePath string,
routabilityRestrict bool, seedPeerNetAddresses []string,
networkProtocol string, localNetworkAddr string, externalPort int, skipUPNP bool, peerTable *pr.PeerTable,
config PeerDiscoveryManagerConfig) (*PeerDiscoveryManager, error) {
discMgr := &PeerDiscoveryManager{
messenger: msgr,
nodeInfo: nodeInfo,
peerTable: peerTable,
seedPeers: make(map[string]*pr.Peer),
mutex: &sync.Mutex{},
seedPeerOnly: viper.GetBool(common.CfgP2PSeedPeerOnly),
wg: &sync.WaitGroup{},
}
//discMgr.addrBook = NewAddrBook(addrBookFilePath, routabilityRestrict)
var err error
discMgr.seedPeerConnector, err = createSeedPeerConnector(discMgr, localNetworkAddr, seedPeerNetAddresses)
if err != nil {
return discMgr, err
}
discMgr.peerDiscMsgHandler, err = createPeerDiscoveryMessageHandler(discMgr, localNetworkAddr)
if err != nil {
return discMgr, err
}
inlConfig := GetDefaultInboundPeerListenerConfig()
discMgr.inboundPeerListener, err = createInboundPeerListener(discMgr, networkProtocol, localNetworkAddr, externalPort, skipUPNP, inlConfig)
if err != nil {
return discMgr, err
}
discMgr.inboundPeerListener.SetInboundCallback(func(peer *pr.Peer, err error) {
if err == nil {
logger.Infof("Inbound peer connected, ID: %v, from: %v", peer.ID(), peer.GetConnection().GetNetconn().RemoteAddr())
} else {
logger.Errorf("Inbound peer listener error: %v", err)
}
})
return discMgr, nil
}
// GetDefaultPeerDiscoveryManagerConfig returns the default config for the PeerDiscoveryManager
func GetDefaultPeerDiscoveryManagerConfig() PeerDiscoveryManagerConfig {
return PeerDiscoveryManagerConfig{
MaxNumPeers: viper.GetInt(common.CfgP2PMaxNumPeers),
SufficientNumPeers: uint(viper.GetInt(common.CfgP2PMinNumPeers)),
}
}
// SetMessenger sets the Messenger for the PeerDiscoveryManager
func (discMgr *PeerDiscoveryManager) SetMessenger(msgr *Messenger) {
discMgr.messenger = msgr
}
// Start is called when the PeerDiscoveryManager starts
func (discMgr *PeerDiscoveryManager) Start(ctx context.Context) error {
c, cancel := context.WithCancel(ctx)
discMgr.ctx = c
discMgr.cancel = cancel
var err error
err = discMgr.seedPeerConnector.Start(c)
if err != nil {
return err
}
err = discMgr.inboundPeerListener.Start(c)
if err != nil {
return err
}
if discMgr.seedPeerOnly {
return nil // if seed peer only, we don't need to start the peer discovery manager
}
err = discMgr.peerDiscMsgHandler.Start(c)
if err != nil {
return err
}
return nil
}
// Stop is called when the PeerDiscoveryManager stops
func (discMgr *PeerDiscoveryManager) Stop() {
discMgr.cancel()
}
// Wait suspends the caller goroutine
func (discMgr *PeerDiscoveryManager) Wait() {
discMgr.seedPeerConnector.wg.Wait()
discMgr.inboundPeerListener.wg.Wait()
discMgr.peerDiscMsgHandler.wg.Wait()
discMgr.wg.Wait()
}
// HandlePeerWithErrors handles peers that are in the error state.
// If the peer is persistent, it will attempt to reconnect to the
// peer. Otherwise, it disconnects from that peer
func (discMgr *PeerDiscoveryManager) HandlePeerWithErrors(peer *pr.Peer) {
peerRemoteAddress := peer.GetConnection().GetNetconn().RemoteAddr().String()
lookedUpPeer := discMgr.peerTable.GetPeer(peer.ID())
if lookedUpPeer == nil {
logger.Errorf("HandlePeerWithErrors cannot find the peer: %v", peer.ID())
return // Should not happen
}
lookedUpPeerRemoteAddress := lookedUpPeer.GetConnection().GetNetconn().RemoteAddr().String()
logger.Infof("HandlePeerWithErrors, peerRemoteAddress: %v", peerRemoteAddress)
logger.Infof("HandlePeerWithErrors, lookedUpPeerRemoteAddress: %v", lookedUpPeerRemoteAddress)
if peerRemoteAddress != lookedUpPeerRemoteAddress {
logger.Warnf("Will not reconnect, since peerRemoteAddress and lookedUpPeerRemoteAddress are not the same")
return
// lookedUpPeer might be created by the inbound connection. A senario is that
// the peer restarted and established a new connection with us. In this case,
// we should not proceed to reconnect
}
discMgr.peerTable.DeletePeer(peer.ID())
peer.Stop() // TODO: may need to stop peer regardless of the remote address comparison
seedPeerOnly := viper.GetBool(common.CfgP2PSeedPeerOnly)
//shouldRetry := seedPeerOnly && peer.IsPersistent()
shouldRetry := (seedPeerOnly && peer.IsSeed()) || (!seedPeerOnly && !peer.IsSeed()) // avoid bombarding the seed nodes
if shouldRetry {
logger.Infof("Lost connection to peer %v with IP address %v, trying to re-connect", peer.ID(), peer.NetAddress().String())
var err error
for i := 0; i < 3; i++ { // retry up to 3 times
if peer.IsOutbound() {
_, err = discMgr.connectToOutboundPeer(peer.NetAddress(), true)
} else {
// For now not to retry connecting to the inbound peer, since that peer will
// retry to etablish the connection
//_, err = discMgr.connectWithInboundPeer(peer.GetConnection().GetNetconn(), true)
}
if err == nil {
logger.Infof("Successfully re-connected to peer %v", peer.NetAddress().String())
return
}
time.Sleep(time.Second * 3)
}
logger.Warnf("Failed to re-connect to peer %v with IP address %v: %v", peer.ID(), peer.NetAddress().String(), err)
}
}
func (discMgr *PeerDiscoveryManager) connectToOutboundPeer(peerNetAddress *netutil.NetAddress, persistent bool) (*pr.Peer, error) {
logger.Debugf("Connecting to outbound peer: %v...", peerNetAddress)
peerConfig := pr.GetDefaultPeerConfig()
connConfig := cn.GetDefaultConnectionConfig()
peer, err := pr.CreateOutboundPeer(peerNetAddress, peerConfig, connConfig)
if err != nil {
logger.Debugf("Failed to create outbound peer: %v", peerNetAddress)
return nil, err
}
peer.SetPersistency(persistent)
err = discMgr.handshakeAndAddPeer(peer)
return peer, err
}
func (discMgr *PeerDiscoveryManager) connectWithInboundPeer(netconn net.Conn, persistent bool) (*pr.Peer, error) {
logger.Infof("Connecting with inbound peer: %v...", netconn.RemoteAddr())
peerConfig := pr.GetDefaultPeerConfig()
connConfig := cn.GetDefaultConnectionConfig()
peer, err := pr.CreateInboundPeer(netconn, peerConfig, connConfig)
if err != nil {
logger.Errorf("Failed to create inbound peer: %v", netconn.RemoteAddr())
return nil, err
}
peer.SetPersistency(persistent)
err = discMgr.handshakeAndAddPeer(peer)
return peer, err
}
// handshakeAndAddPeer performs handshake with a peer. Upon successful handshake,
// it save the peer to the peer table
func (discMgr *PeerDiscoveryManager) handshakeAndAddPeer(peer *pr.Peer) error {
if err := peer.Handshake(discMgr.nodeInfo); err != nil {
logger.Errorf("Failed to handshake with peer, error: %v", err)
return err
}
isSeed := discMgr.seedPeerConnector.isASeedPeer(peer.NetAddress())
peer.SetSeed(isSeed)
if isSeed {
logger.Infof("Handshaked with a seed peer: %v, isOutbound: %v", peer.NetAddress(), peer.IsOutbound())
}
if discMgr.messenger != nil {
discMgr.messenger.AttachMessageHandlersToPeer(peer)
} else {
logger.Warnf("discMgr.messenger not set, cannot attach message handlers")
}
if !peer.Start(discMgr.ctx) {
errMsg := "Failed to start peer"
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if !discMgr.peerTable.AddPeer(peer) {
errMsg := "Failed to add peer to the peerTable"
logger.Errorf(errMsg)
return errors.New(errMsg)
}
//discMgr.addrBook.AddAddress(peer.NetAddress(), peer.NetAddress())
//discMgr.addrBook.Save()
if peer.IsSeed() {
discMgr.mutex.Lock()
defer discMgr.mutex.Unlock()
discMgr.seedPeers[peer.ID()] = peer
}
return nil
}
func (discMgr *PeerDiscoveryManager) isSeedPeer(pid string) bool {
discMgr.mutex.Lock()
defer discMgr.mutex.Unlock()
_, isSeed := discMgr.seedPeers[pid]
return isSeed
}