/
donthavetimeoutmgr.go
397 lines (339 loc) · 11.8 KB
/
donthavetimeoutmgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package messagequeue
import (
"context"
"sync"
"time"
"github.com/benbjohnson/clock"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)
const (
// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
dontHaveTimeout = 5 * time.Second
// maxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second
// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime
// pingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
pingLatencyMultiplier = 3
// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5
// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
type PeerConnection interface {
// Ping the peer
Ping(context.Context) ping.Result
// The average latency of all pings
Latency() time.Duration
}
// pendingWant keeps track of a want that has been sent and we're waiting
// for a response or for a timeout to expire
type pendingWant struct {
c cid.Cid
active bool
sent time.Time
}
// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long
// to respond to a message.
// The timeout is based on latency - we start with a default latency, while
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
// All variables below here must be protected by the lock
lk sync.RWMutex
// has the timeout manager started
started bool
// wants that are active (waiting for a response or timeout)
activeWants map[cid.Cid]*pendingWant
// queue of wants, from oldest to newest
wantQueue []*pendingWant
// time to wait for a response (depends on latency)
timeout time.Duration
// ewma of message latency (time from message sent to response received)
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *clock.Timer
// used for testing -- timeoutsTriggered when a scheduled dont have timeouts were triggered
timeoutsTriggered chan struct{}
}
// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
}
// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
clock clock.Clock,
timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
timeoutsTriggered: timeoutsTriggered,
}
return mqp
}
// Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored
func (dhtm *dontHaveTimeoutMgr) Shutdown() {
dhtm.shutdown()
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// Clear any pending check for timeouts
if dhtm.checkForTimeoutsTimer != nil {
dhtm.checkForTimeoutsTimer.Stop()
}
}
// Start the dontHaveTimeoutMgr. This method is idempotent
func (dhtm *dontHaveTimeoutMgr) Start() {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// Make sure the dont have timeout manager hasn't already been started
if dhtm.started {
return
}
dhtm.started = true
// If we already have a measure of latency to the peer, use it to
// calculate a reasonable timeout
latency := dhtm.peerConn.Latency()
if latency.Nanoseconds() > 0 {
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
return
}
// Otherwise measure latency by pinging the peer
go dhtm.measurePingLatency()
}
// UpdateMessageLatency is called when we receive a response from the peer.
// It is the time between sending a request and receiving the corresponding
// response.
func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// Update the message latency and the timeout
dhtm.messageLatency.update(elapsed)
oldTimeout := dhtm.timeout
dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency()
// If the timeout has decreased
if dhtm.timeout < oldTimeout {
// Check if after changing the timeout there are any pending wants that
// are now over the timeout
dhtm.checkForTimeouts()
}
}
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
defer cancel()
// Ping the peer
res := dhtm.peerConn.Ping(ctx)
if res.Error != nil {
// If there was an error, we'll just leave the timeout as
// defaultTimeout
return
}
// Get the average latency to the peer
latency := dhtm.peerConn.Latency()
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// A message has arrived so we already set the timeout based on message latency
if dhtm.messageLatency.samples > 0 {
return
}
// Calculate a reasonable timeout based on latency
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
// Check if after changing the timeout there are any pending wants that are
// now over the timeout
dhtm.checkForTimeouts()
}
// checkForTimeouts checks pending wants to see if any are over the timeout.
// Note: this function should only be called within the lock.
func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
if len(dhtm.wantQueue) == 0 {
return
}
// Figure out which of the blocks that were wanted were not received
// within the timeout
expired := make([]cid.Cid, 0, len(dhtm.activeWants))
for len(dhtm.wantQueue) > 0 {
pw := dhtm.wantQueue[0]
// If the want is still active
if pw.active {
// The queue is in order from earliest to latest, so if we
// didn't find an expired entry we can stop iterating
if dhtm.clock.Since(pw.sent) < dhtm.timeout {
break
}
// Add the want to the expired list
expired = append(expired, pw.c)
// Remove the want from the activeWants map
delete(dhtm.activeWants, pw.c)
}
// Remove expired or cancelled wants from the want queue
dhtm.wantQueue = dhtm.wantQueue[1:]
}
// Fire the timeout event for the expired wants
if len(expired) > 0 {
go dhtm.fireTimeout(expired)
}
if len(dhtm.wantQueue) == 0 {
return
}
// Make sure the timeout manager is still running
if dhtm.ctx.Err() != nil {
return
}
// Schedule the next check for the moment when the oldest pending want will
// timeout
oldestStart := dhtm.wantQueue[0].sent
until := oldestStart.Add(dhtm.timeout).Sub(dhtm.clock.Now())
if dhtm.checkForTimeoutsTimer == nil {
dhtm.checkForTimeoutsTimer = dhtm.clock.Timer(until)
go dhtm.consumeTimeouts()
} else {
dhtm.checkForTimeoutsTimer.Stop()
dhtm.checkForTimeoutsTimer.Reset(until)
}
}
func (dhtm *dontHaveTimeoutMgr) consumeTimeouts() {
for {
select {
case <-dhtm.ctx.Done():
return
case <-dhtm.checkForTimeoutsTimer.C:
dhtm.lk.Lock()
dhtm.checkForTimeouts()
dhtm.lk.Unlock()
}
}
}
// AddPending adds the given keys that will expire if not cancelled before
// the timeout
func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
if len(ks) == 0 {
return
}
start := dhtm.clock.Now()
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
queueWasEmpty := len(dhtm.activeWants) == 0
// Record the start time for each key
for _, c := range ks {
if _, ok := dhtm.activeWants[c]; !ok {
pw := pendingWant{
c: c,
sent: start,
active: true,
}
dhtm.activeWants[c] = &pw
dhtm.wantQueue = append(dhtm.wantQueue, &pw)
}
}
// If there was already an earlier pending item in the queue, then there
// must already be a timeout check scheduled. If there is nothing in the
// queue then we should make sure to schedule a check.
if queueWasEmpty {
dhtm.checkForTimeouts()
}
}
// CancelPending is called when we receive a response for a key
func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// Mark the wants as cancelled
for _, c := range ks {
if pw, ok := dhtm.activeWants[c]; ok {
pw.active = false
delete(dhtm.activeWants, c)
}
}
}
// fireTimeout fires the onDontHaveTimeout method with the timed out keys
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
// Make sure the timeout manager has not been shut down
if dhtm.ctx.Err() != nil {
return
}
// Fire the timeout
dhtm.onDontHaveTimeout(pending)
// signal a timeout fired
if dhtm.timeoutsTriggered != nil {
dhtm.timeoutsTriggered <- struct{}{}
}
}
// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration {
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}
// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}
// latencyEwma is an EWMA of message latency
type latencyEwma struct {
alpha float64
samples uint64
latency time.Duration
}
// update the EWMA with the given sample
func (le *latencyEwma) update(elapsed time.Duration) {
le.samples++
// Initially set alpha to be 1.0 / <the number of samples>
alpha := 1.0 / float64(le.samples)
if alpha < le.alpha {
// Once we have enough samples, clamp alpha
alpha = le.alpha
}
le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency))
}