forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
type.go
125 lines (106 loc) · 3.78 KB
/
type.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package throttle
import (
"sync/atomic"
"time"
)
//------------------------------------------------------------------------------
// Type is a throttle of retries to avoid endless busy loops when a message
// fails to reach its destination.
type Type struct {
// unthrottledRetries is the number of concecutive retries we are
// comfortable attempting before throttling begins.
unthrottledRetries int64
// maxExponentialPeriod is the maximum duration for which our throttle lasts
// when exponentially increasing.
maxExponentialPeriod int64
// baseThrottlePeriod is the static duration for which our throttle lasts.
baseThrottlePeriod int64
// throttlePeriod is the current throttle period, by default this is set to
// the baseThrottlePeriod.
throttlePeriod int64
// closeChan can interrupt a throttle when closed.
closeChan <-chan struct{}
// consecutiveRetries is the live count of consecutive retries.
consecutiveRetries int64
}
// New creates a new throttle, which permits a static number of consecutive
// retries before throttling subsequent retries. A success will reset the count
// of consecutive retries.
func New(options ...func(*Type)) *Type {
t := &Type{
unthrottledRetries: 3,
baseThrottlePeriod: int64(time.Second),
maxExponentialPeriod: int64(time.Minute),
closeChan: nil,
}
t.throttlePeriod = t.baseThrottlePeriod
for _, option := range options {
option(t)
}
return t
}
//------------------------------------------------------------------------------
// OptMaxUnthrottledRetries sets the maximum number of consecutive retries that
// will be attempted before throttling will begin.
func OptMaxUnthrottledRetries(n int64) func(*Type) {
return func(t *Type) {
t.unthrottledRetries = n
}
}
// OptMaxExponentPeriod sets the maximum period of time that throttles will last
// when exponentially increasing.
func OptMaxExponentPeriod(period time.Duration) func(*Type) {
return func(t *Type) {
t.maxExponentialPeriod = int64(period)
}
}
// OptThrottlePeriod sets the static period of time that throttles will last.
func OptThrottlePeriod(period time.Duration) func(*Type) {
return func(t *Type) {
t.baseThrottlePeriod = int64(period)
t.throttlePeriod = int64(period)
}
}
// OptCloseChan sets a read-only channel that, if closed, will interrupt a retry
// throttle early.
func OptCloseChan(c <-chan struct{}) func(*Type) {
return func(t *Type) {
t.closeChan = c
}
}
//------------------------------------------------------------------------------
// Retry indicates that a retry is about to occur and, if appropriate, will
// block until either the throttle period is over and the retry may be attempted
// (returning true) or that the close channel has closed (returning false).
func (t *Type) Retry() bool {
if rets := atomic.AddInt64(&t.consecutiveRetries, 1); rets <= t.unthrottledRetries {
return true
}
select {
case <-time.After(time.Duration(atomic.LoadInt64(&t.throttlePeriod))):
case <-t.closeChan:
return false
}
return true
}
// ExponentialRetry is the same as Retry except also sets the throttle period to
// exponentially increase after each consecutive retry.
func (t *Type) ExponentialRetry() bool {
if atomic.LoadInt64(&t.consecutiveRetries) > t.unthrottledRetries {
if throtPrd := atomic.LoadInt64(&t.throttlePeriod); throtPrd < t.maxExponentialPeriod {
throtPrd = throtPrd * 2
if throtPrd > t.maxExponentialPeriod {
throtPrd = t.maxExponentialPeriod
}
atomic.StoreInt64(&t.throttlePeriod, throtPrd)
}
}
return t.Retry()
}
// Reset clears the count of consecutive retries and resets the exponential
// backoff.
func (t *Type) Reset() {
atomic.StoreInt64(&t.consecutiveRetries, 0)
atomic.StoreInt64(&t.throttlePeriod, t.baseThrottlePeriod)
}
//------------------------------------------------------------------------------