-
Notifications
You must be signed in to change notification settings - Fork 13
/
reactor.go
427 lines (367 loc) · 13.7 KB
/
reactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
package pex
import (
"context"
"fmt"
"time"
sync "github.com/sasha-s/go-deadlock"
"github.com/dashpay/tenderdash/internal/p2p"
"github.com/dashpay/tenderdash/internal/p2p/conn"
"github.com/dashpay/tenderdash/libs/log"
"github.com/dashpay/tenderdash/libs/service"
p2pproto "github.com/dashpay/tenderdash/proto/tendermint/p2p"
"github.com/dashpay/tenderdash/types"
)
var (
_ service.Service = (*Reactor)(nil)
)
const (
// PexChannel is a channel for PEX messages
PexChannel = 0x00
// over-estimate of max NetAddress size
// hexID (40) + IP (16) + Port (2) + Name (100) ...
// NOTE: dont use massive DNS name ..
maxAddressSize = 256
// max addresses returned by GetSelection
// NOTE: this must match "maxMsgSize"
maxGetSelection = 250
// NOTE: amplification factor!
// small request results in up to maxMsgSize response
maxMsgSize = maxAddressSize * maxGetSelection
// the minimum time one peer can send another request to the same peer
minReceiveRequestInterval = 100 * time.Millisecond
// the maximum amount of addresses that can be included in a response
maxAddresses = 100
// How long to wait when there are no peers available before trying again
noAvailablePeersWaitPeriod = 1 * time.Second
// indicates the ping rate of the pex reactor when the peer store is full.
// The reactor should still look to add new peers in order to flush out low
// scoring peers that are still in the peer store
fullCapacityInterval = 10 * time.Minute
pexSendTimeout = 30 * time.Second
)
// TODO: We should decide whether we want channel descriptors to be housed
// within each reactor (as they are now) or, considering that the reactor doesn't
// really need to care about the channel descriptors, if they should be housed
// in the node module.
func ChannelDescriptor() *conn.ChannelDescriptor {
return &conn.ChannelDescriptor{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "pex",
}
}
// The peer exchange or PEX reactor supports the peer manager by sending
// requests to other peers for addresses that can be given to the peer manager
// and at the same time advertises addresses to peers that need more.
//
// The reactor is able to tweak the intensity of it's search by decreasing or
// increasing the interval between each request. It tracks connected peers via
// a linked list, sending a request to the node at the front of the list and
// adding it to the back of the list once a response is received.
type Reactor struct {
service.BaseService
logger log.Logger
peerManager *p2p.PeerManager
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
// list of available peers to loop through and send peer requests to
availablePeers map[types.NodeID]struct{}
mtx sync.RWMutex
// requestsSent keeps track of which peers the PEX reactor has sent requests
// to. This prevents the sending of spurious responses.
// NOTE: If a node never responds, they will remain in this map until a
// peer down status update is sent
requestsSent map[types.NodeID]struct{}
// lastReceivedRequests keeps track of when peers send a request to prevent
// peers from sending requests too often (as defined by
// minReceiveRequestInterval).
lastReceivedRequests map[types.NodeID]time.Time
// the total number of unique peers added
totalPeers int
}
// NewReactor returns a reference to a new reactor.
func NewReactor(
logger log.Logger,
peerManager *p2p.PeerManager,
channelCreator p2p.ChannelCreator,
peerEvents p2p.PeerEventSubscriber,
) *Reactor {
r := &Reactor{
logger: logger,
peerManager: peerManager,
chCreator: channelCreator,
peerEvents: peerEvents,
availablePeers: make(map[types.NodeID]struct{}),
requestsSent: make(map[types.NodeID]struct{}),
lastReceivedRequests: make(map[types.NodeID]time.Time),
}
r.BaseService = *service.NewBaseService(logger, "PEX", r)
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
func (r *Reactor) OnStart(ctx context.Context) error {
channel, err := r.chCreator(ctx, ChannelDescriptor())
if err != nil {
return err
}
peerUpdates := r.peerEvents(ctx, "pex")
go r.processPexCh(ctx, channel)
go r.processPeerUpdates(ctx, peerUpdates)
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) {
incoming := make(chan *p2p.Envelope)
go func() {
defer close(incoming)
iter := pexCh.Receive(ctx)
for iter.Next(ctx) {
select {
case <-ctx.Done():
return
case incoming <- iter.Envelope():
}
}
}()
// Initially, we will request peers quickly to bootstrap. This duration
// will be adjusted upward as knowledge of the network grows.
var nextPeerRequest = minReceiveRequestInterval
timer := time.NewTimer(nextPeerRequest)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
// Send a request for more peer addresses.
err := r.sendRequestForPeers(ctx, pexCh)
if err != nil {
r.logger.Debug("request for peers failed, retrying", "error", err)
}
timer.Reset(nextPeerRequest)
case envelope, ok := <-incoming:
if !ok {
r.logger.Error("incoming PEX channel closed")
return // channel closed
}
start := time.Now()
// A request from another peer, or a response to one of our requests.
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
if err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
r.logger.Error("cannot send error to PEX channel", "error", serr, "previous_error", err)
return
}
} else if dur >= minReceiveRequestInterval {
// We got a useful result; update the poll timer.
// It will apply in the next timer reset.
nextPeerRequest = dur
}
r.logger.Trace("handled incoming PEX message", "peer", envelope.From, "envelope", envelope, "dur", dur.String(), "took", time.Since(start))
}
}
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
}
}
}
// handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
case *p2pproto.PexRequest:
// Verify that this peer hasn't sent us another request too recently.
if err := r.markPeerRequest(envelope.From); err != nil {
return 0, err
}
// Fetch peers from the peer manager, convert NodeAddresses into URL
// strings, and send them back to the caller.
nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses)
pexAddresses := make([]p2pproto.PexAddress, len(nodeAddresses))
for idx, addr := range nodeAddresses {
pexAddresses[idx] = p2pproto.PexAddress{
URL: addr.String(),
}
}
chCtx, cancel := context.WithTimeout(ctx, pexSendTimeout)
defer cancel()
return 0, pexCh.Send(chCtx, p2p.Envelope{
To: envelope.From,
Message: &p2pproto.PexResponse{Addresses: pexAddresses},
})
case *p2pproto.PexResponse:
// Verify that this response corresponds to one of our pending requests.
if err := r.markPeerResponse(envelope.From); err != nil {
return 0, err
}
// Verify that the response does not exceed the safety limit.
if len(msg.Addresses) > maxAddresses {
return 0, fmt.Errorf("peer sent too many addresses (%d > maxiumum %d)",
len(msg.Addresses), maxAddresses)
}
var numAdded int
for _, pexAddress := range msg.Addresses {
peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL)
if err != nil {
logger.Error("failed to parse PEX address", "address", peerAddress, "err", err)
continue
}
added, err := r.peerManager.Add(peerAddress)
if err != nil {
logger.Error("failed to add PEX address", "address", peerAddress, "err", err)
continue
}
if added {
numAdded++
logger.Debug("added PEX address", "address", peerAddress)
}
}
return r.calculateNextRequestTime(numAdded), nil
default:
return 0, fmt.Errorf("received unknown message: %T", msg)
}
}
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
// send a request for addresses.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.logger.Debug("received PEX peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
defer r.mtx.Unlock()
switch peerUpdate.Status {
case p2p.PeerStatusUp:
r.availablePeers[peerUpdate.NodeID] = struct{}{}
case p2p.PeerStatusDown:
delete(r.availablePeers, peerUpdate.NodeID)
delete(r.requestsSent, peerUpdate.NodeID)
delete(r.lastReceivedRequests, peerUpdate.NodeID)
default:
}
}
func (r *Reactor) selectPeerToSendMsg() (types.NodeID, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
if len(r.availablePeers) == 0 {
// no peers are available
return "", nil
}
// Select an arbitrary peer from the available set.
var peerID types.NodeID
for peerID = range r.availablePeers {
break
}
// Move the peer from available to pending. Even if sending fails, we don't want to retry.
delete(r.availablePeers, peerID)
r.requestsSent[peerID] = struct{}{}
return peerID, nil
}
// sendRequestForPeers chooses a peer from the set of available peers and sends
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error {
peerID, err := r.selectPeerToSendMsg()
if err != nil {
return err
}
if peerID == "" {
r.logger.Debug("no available peers to send a PEX request to (retrying)")
return nil
}
envelope := p2p.Envelope{
To: peerID,
Message: &p2pproto.PexRequest{},
}
start := time.Now()
if err := pexCh.Send(ctx, envelope); err != nil {
return err
}
r.logger.Trace("sent PEX request", "envelope", envelope, "peer", peerID, "took", time.Since(start))
return nil
}
// calculateNextRequestTime selects how long we should wait before attempting
// to send out another request for peer addresses.
//
// This implements a simplified proportional control mechanism to poll more
// often when our knowledge of the network is incomplete, and less often as our
// knowledge grows. To estimate our knowledge of the network, we use the
// fraction of "new" peers (addresses we have not previously seen) to the total
// so far observed. When we first join the network, this fraction will be close
// to 1, meaning most new peers are "new" to us, and as we discover more peers,
// the fraction will go toward zero.
//
// The minimum interval will be minReceiveRequestInterval to ensure we will not
// request from any peer more often than we would allow them to do from us.
func (r *Reactor) calculateNextRequestTime(added int) time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
r.totalPeers += added
// If the peer store is nearly full, wait the maximum interval.
if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 {
r.logger.Debug("Peer manager is nearly full",
"sleep_period", fullCapacityInterval,
"ratio", ratio)
return fullCapacityInterval
}
// If there are no available peers to query, poll less aggressively.
if len(r.availablePeers) == 0 {
r.logger.Debug("No available peers to send a PEX request",
"sleep_period", noAvailablePeersWaitPeriod)
return noAvailablePeersWaitPeriod
}
// Reaching here, there are available peers to query and the peer store
// still has space. Estimate our knowledge of the network from the latest
// update and choose a new interval.
base := float64(minReceiveRequestInterval) / float64(len(r.availablePeers))
multiplier := float64(r.totalPeers+1) / float64(added+1) // +1 to avert zero division
return time.Duration(base*multiplier*multiplier) + minReceiveRequestInterval
}
func (r *Reactor) markPeerRequest(peer types.NodeID) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok {
if d := time.Since(lastRequestTime); d < minReceiveRequestInterval {
return fmt.Errorf("peer %v sent PEX request too soon (%v < minimum %v)",
peer, d, minReceiveRequestInterval)
}
}
r.lastReceivedRequests[peer] = time.Now()
return nil
}
func (r *Reactor) markPeerResponse(peer types.NodeID) error {
r.mtx.Lock()
defer r.mtx.Unlock()
// check if a request to this peer was sent
if _, ok := r.requestsSent[peer]; !ok {
return fmt.Errorf("peer sent a PEX response when none was requested (%v)", peer)
}
delete(r.requestsSent, peer)
// attach to the back of the list so that the peer can be used again for
// future requests
r.availablePeers[peer] = struct{}{}
return nil
}