-
-
Notifications
You must be signed in to change notification settings - Fork 352
/
sleeper.go
151 lines (125 loc) · 3.1 KB
/
sleeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package utils
import (
"context"
"fmt"
mr "math/rand"
"reflect"
"sync"
"time"
)
// Sleep the goroutine for specified seconds, such as 2.3 seconds
func Sleep(seconds float64) {
d := time.Duration(seconds * float64(time.Second))
time.Sleep(d)
}
// Sleeper sleeps the current gouroutine for sometime, returns the reason to wake, if ctx is done release resource
type Sleeper func(context.Context) error
// ErrMaxSleepCount type
type ErrMaxSleepCount struct {
// Max count
Max int
}
// Error interface
func (e *ErrMaxSleepCount) Error() string {
return fmt.Sprintf("max sleep count %d exceeded", e.Max)
}
// Is interface
func (e *ErrMaxSleepCount) Is(err error) bool {
return reflect.TypeOf(e) == reflect.TypeOf(err)
}
// CountSleeper wakes immediately. When counts to the max returns *ErrMaxSleepCount
func CountSleeper(max int) Sleeper {
l := sync.Mutex{}
count := 0
return func(ctx context.Context) error {
l.Lock()
defer l.Unlock()
if ctx.Err() != nil {
return ctx.Err()
}
if count == max {
return &ErrMaxSleepCount{max}
}
count++
return nil
}
}
// DefaultBackoff algorithm: A(n) = A(n-1) * random[1.9, 2.1)
func DefaultBackoff(interval time.Duration) time.Duration {
scale := 2 + (mr.Float64()-0.5)*0.2
return time.Duration(float64(interval) * scale)
}
// BackoffSleeper returns a sleeper that sleeps in a backoff manner every time get called.
// If algorithm is nil, DefaultBackoff will be used.
// Set interval and maxInterval to the same value to make it a constant sleeper.
// If maxInterval is not greater than 0, the sleeper will wake immediately.
func BackoffSleeper(init, maxInterval time.Duration, algorithm func(time.Duration) time.Duration) Sleeper {
l := sync.Mutex{}
if algorithm == nil {
algorithm = DefaultBackoff
}
return func(ctx context.Context) error {
l.Lock()
defer l.Unlock()
// wake immediately
if maxInterval <= 0 {
return nil
}
var interval time.Duration
if init < maxInterval {
interval = algorithm(init)
} else {
interval = maxInterval
}
t := time.NewTimer(interval)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
init = interval
}
return nil
}
}
// EachSleepers returns a sleeper wakes up when each sleeper is awake.
// If a sleeper returns error, it will wake up immediately.
func EachSleepers(list ...Sleeper) Sleeper {
return func(ctx context.Context) (err error) {
for _, s := range list {
err = s(ctx)
if err != nil {
break
}
}
return
}
}
// RaceSleepers returns a sleeper wakes up when one of the sleepers wakes.
func RaceSleepers(list ...Sleeper) Sleeper {
return func(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
done := make(chan error, len(list))
sleep := func(s Sleeper) {
done <- s(ctx)
cancel()
}
for _, s := range list {
go sleep(s)
}
return <-done
}
}
// Retry fn and sleeper until fn returns true or s returns error
func Retry(ctx context.Context, s Sleeper, fn func() (stop bool, err error)) error {
for {
stop, err := fn()
if stop {
return err
}
err = s(ctx)
if err != nil {
return err
}
}
}