-
Notifications
You must be signed in to change notification settings - Fork 41
/
retries.go
261 lines (228 loc) · 6.31 KB
/
retries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package retries
import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
"time"
"github.com/databricks/databricks-sdk-go/logger"
)
// Deprecated: use return types from non-*AndWait methods
type Info[T any] struct {
Info *T
Timeout time.Duration
}
// Deprecated: use return types from non-*AndWait methods
type Option[T any] func(*Info[T])
// Deprecated: use return types from non-*AndWait methods
func Timeout[T any](dur time.Duration) Option[T] {
return func(i *Info[T]) {
i.Timeout = dur
}
}
// Deprecated: use return types from non-*AndWait methods
func OnPoll[T any](callback func(*T)) Option[T] {
return func(i *Info[T]) {
if i.Info == nil {
return
}
callback(i.Info)
}
}
type Err struct {
Err error
Halt bool
}
func (e *Err) Error() string {
return e.Err.Error()
}
func (e *Err) Unwrap() error {
return e.Err
}
func Halt(err error) *Err {
return &Err{err, true}
}
func Continue(err error) *Err {
return &Err{err, false}
}
func Continues(msg string) *Err {
return Continue(fmt.Errorf(msg))
}
func Continuef(format string, err error, args ...interface{}) *Err {
wrapped := fmt.Errorf(format, append([]interface{}{err}, args...))
return Continue(wrapped)
}
var maxWait = 10 * time.Second
var minJitter = 50 * time.Millisecond
var maxJitter = 750 * time.Millisecond
func backoff(attempt int) time.Duration {
wait := time.Duration(attempt) * time.Second
if wait > maxWait {
wait = maxWait
}
// add some random jitter
rand.Seed(time.Now().UnixNano())
jitter := rand.Intn(int(maxJitter)-int(minJitter)+1) + int(minJitter)
wait += time.Duration(jitter)
return wait
}
type ErrTimedOut struct {
err error
}
func (et *ErrTimedOut) Error() string {
return fmt.Sprintf("timed out: %s", et.err)
}
func (et *ErrTimedOut) Unwrap() error {
return et.err
}
// RetryOption is a function that sets part of the retry configuration for a retrier.
type RetryOption func(*RetryConfig)
// RetryConfig is the configuration for a retrier.
type RetryConfig struct {
timeout time.Duration
shouldRetry func(error) bool
backoff func(int) time.Duration
}
func (r RetryConfig) Timeout() time.Duration {
if r.timeout == 0 {
return 20 * time.Minute
}
return r.timeout
}
func (r RetryConfig) Backoff(attempt int) time.Duration {
if r.backoff == nil {
return backoff(attempt)
}
return r.backoff(attempt)
}
func (r RetryConfig) ShouldRetry(err error) bool {
if r.shouldRetry == nil {
return err != nil
}
return r.shouldRetry(err)
}
// WithTimeout sets the timeout for the retrier.
func WithTimeout(timeout time.Duration) RetryOption {
return func(rc *RetryConfig) {
rc.timeout = timeout
}
}
// OnErrors sets the errors that should be retried.
func OnErrors(on ...error) RetryOption {
return func(rc *RetryConfig) {
rc.shouldRetry = func(err error) bool {
for _, e := range on {
if errors.Is(err, e) {
return true
}
}
return false
}
}
}
// WithRetryFunc sets the function that determines whether an error should halt the retrier.
// If the function returns true, the retrier will continue. If it returns false, the retrier will halt.
func WithRetryFunc(halt func(error) bool) RetryOption {
return func(rc *RetryConfig) {
rc.shouldRetry = halt
}
}
// Retrier is a struct that can retry an operation until it succeeds or the timeout is reached.
// The empty struct indicates that the retrier should run for 20 minutes and retry on any non-nil error.
// The type parameter is the return type of the Run() method. When using the Wait() method, this can be struct{}.
//
// Example:
//
// r := retries.New[struct{}](retries.WithTimeout(5 * time.Minute), retries.OnErrors(apierr.ErrResourceConflict))
// err := r.Wait(ctx, func(ctx context.Context) error {
// return a.Workspaces.Delete(ctx, provisioning.DeleteWorkspaceRequest{
// WorkspaceId: workspace.WorkspaceId,
// })
// })
type Retrier[T any] struct {
config RetryConfig
}
// New creates a new retrier with the given configuration.
// If no timeout is specified, the default is 20 minutes. If the timeout is negative, the retrier will run indefinitely.
// If no retry function is specified, the default is to retry on all errors.
func New[T any](configOpts ...RetryOption) Retrier[T] {
config := RetryConfig{}
for _, opt := range configOpts {
opt(&config)
}
return Retrier[T]{config}
}
// Wait runs the given function until it succeeds or the timeout is reached. On
// success, it returns nil. On timeout, it returns an error wrapping the last
// error returned by the function.
func (r Retrier[T]) Wait(ctx context.Context, fn func(ctx context.Context) error) error {
_, err := r.Run(ctx, func(ctx context.Context) (*T, error) {
return nil, fn(ctx)
})
return err
}
// Run runs the given function until it succeeds or the timeout is reached, returning the result.
// On timeout, it returns an error wrapping the last error returned by the function.
func (r Retrier[T]) Run(ctx context.Context, fn func(context.Context) (*T, error)) (*T, error) {
timeout := r.config.Timeout()
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.config.Timeout())
defer cancel()
}
var attempt int
var lastErr error
for {
attempt++
entity, err := fn(ctx)
if err == nil {
return entity, nil
}
if !r.config.ShouldRetry(err) {
logger.Debugf(ctx, "non-retriable error: %s", err)
return nil, err
}
lastErr = err
wait := r.config.Backoff(attempt)
timer := time.NewTimer(wait)
logger.Tracef(ctx, "%s. Sleeping %s",
strings.TrimSuffix(err.Error(), "."),
wait.Round(time.Millisecond))
select {
// stop when either this or parent context times out
case <-ctx.Done():
timer.Stop()
return nil, &ErrTimedOut{lastErr}
case <-timer.C:
}
}
}
func shouldRetry(err error) bool {
if err == nil {
return false
}
e := err.(*Err)
if e == nil {
return false
}
return !e.Halt
}
func Wait(ctx context.Context, timeout time.Duration, fn func() *Err) error {
return New[struct{}](WithTimeout(timeout), WithRetryFunc(shouldRetry)).Wait(ctx, func(_ context.Context) error {
err := fn()
if err != nil {
return err
}
return nil
})
}
func Poll[T any](ctx context.Context, timeout time.Duration, fn func() (*T, *Err)) (*T, error) {
return New[T](WithTimeout(timeout), WithRetryFunc(shouldRetry)).Run(ctx, func(_ context.Context) (*T, error) {
res, err := fn()
if err != nil {
return res, err
}
return res, nil
})
}