forked from filecoin-project/go-fil-markets
-
Notifications
You must be signed in to change notification settings - Fork 2
/
retrystream.go
103 lines (84 loc) · 2.84 KB
/
retrystream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package shared
import (
"context"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"golang.org/x/xerrors"
)
var log = logging.Logger("data_transfer_network")
// The max number of attempts to open a stream
const defaultMaxStreamOpenAttempts = 5
// The min backoff time between retries
const defaultMinAttemptDuration = 1 * time.Second
// The max backoff time between retries
const defaultMaxAttemptDuration = 5 * time.Minute
// The multiplier in the backoff time for each retry
const defaultBackoffFactor = 5
type StreamOpener interface {
NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error)
}
type RetryStreamOption func(*RetryStream)
// RetryParameters changes the default parameters around connection reopening
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) RetryStreamOption {
return func(impl *RetryStream) {
impl.maxStreamOpenAttempts = attempts
impl.minAttemptDuration = minDuration
impl.maxAttemptDuration = maxDuration
}
}
type RetryStream struct {
opener StreamOpener
backoffFactor float64
maxStreamOpenAttempts float64
minAttemptDuration time.Duration
maxAttemptDuration time.Duration
}
func NewRetryStream(opener StreamOpener, options ...RetryStreamOption) *RetryStream {
impl := &RetryStream{
opener: opener,
backoffFactor: defaultBackoffFactor,
maxStreamOpenAttempts: defaultMaxStreamOpenAttempts,
minAttemptDuration: defaultMinAttemptDuration,
maxAttemptDuration: defaultMaxAttemptDuration,
}
impl.SetOptions(options...)
return impl
}
func (impl *RetryStream) SetOptions(options ...RetryStreamOption) {
for _, option := range options {
option(impl)
}
}
func (impl *RetryStream) OpenStream(ctx context.Context, id peer.ID, protocols []protocol.ID) (network.Stream, error) {
b := &backoff.Backoff{
Min: impl.minAttemptDuration,
Max: impl.maxAttemptDuration,
Factor: impl.maxStreamOpenAttempts,
Jitter: true,
}
for {
s, err := impl.opener.NewStream(ctx, id, protocols...)
if err == nil {
return s, err
}
// b.Attempt() starts from zero
nAttempts := b.Attempt() + 1
if nAttempts >= impl.maxStreamOpenAttempts {
return nil, xerrors.Errorf("exhausted %d attempts but failed to open stream, err: %w", int(impl.maxStreamOpenAttempts), err)
}
duration := b.Duration()
log.Warnf("failed to open stream to %s on attempt %.0f of %.0f, waiting %s to try again, err: %s",
id, nAttempts, impl.maxStreamOpenAttempts, duration, err)
ebt := time.NewTimer(duration)
select {
case <-ctx.Done():
ebt.Stop()
return nil, xerrors.Errorf("open stream to %s canceled by context", id)
case <-ebt.C:
}
}
}