-
Notifications
You must be signed in to change notification settings - Fork 338
/
breaker.go
158 lines (126 loc) · 3.59 KB
/
breaker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package breaker
import (
"errors"
"sync"
"time"
)
const (
// defaults
limit = 100
failInterval = 30 * time.Minute
maxBackoff = time.Hour
backoff = 2 * time.Minute
)
var (
_ Interface = (*breaker)(nil)
// ErrClosed is the special error type that indicates that breaker is closed and that is not executing functions at the moment.
ErrClosed = errors.New("breaker closed")
)
type Interface interface {
// Execute runs f() if the limit number of consecutive failed calls is not reached within fail interval.
// f() call is not locked so it can still be executed concurrently.
// Returns `ErrClosed` if the limit is reached or f() result otherwise.
Execute(f func() error) error
// ClosedUntil returns the timestamp when the breaker will become open again.
ClosedUntil() time.Time
}
type currentTimeFn = func() time.Time
type breaker struct {
limit int // breaker will not execute any more tasks after limit number of consecutive failures happen
consFailedCalls int // current number of consecutive fails
firstFailedTimestamp time.Time
closedTimestamp time.Time
backoff time.Duration // initial backoff duration
maxBackoff time.Duration
failInterval time.Duration // consecutive failures are counted if they happen within this interval
currentTimeFn currentTimeFn
mtx sync.Mutex
}
type Options struct {
Limit int
FailInterval time.Duration
StartBackoff time.Duration
MaxBackoff time.Duration
}
func NewBreaker(o Options) Interface {
return newBreakerWithCurrentTimeFn(o, time.Now)
}
func newBreakerWithCurrentTimeFn(o Options, currentTimeFn currentTimeFn) Interface {
breaker := &breaker{
limit: o.Limit,
backoff: o.StartBackoff,
maxBackoff: o.MaxBackoff,
failInterval: o.FailInterval,
currentTimeFn: currentTimeFn,
}
if o.Limit == 0 {
breaker.limit = limit
}
if o.FailInterval == 0 {
breaker.failInterval = failInterval
}
if o.MaxBackoff == 0 {
breaker.maxBackoff = maxBackoff
}
if o.StartBackoff == 0 {
breaker.backoff = backoff
}
return breaker
}
func (b *breaker) Execute(f func() error) error {
if err := b.beforef(); err != nil {
return err
}
return b.afterf(f())
}
func (b *breaker) ClosedUntil() time.Time {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.consFailedCalls >= b.limit {
return b.closedTimestamp.Add(b.backoff)
}
return b.currentTimeFn()
}
func (b *breaker) beforef() error {
b.mtx.Lock()
defer b.mtx.Unlock()
// use currentTimeFn().Sub() instead of time.Since() so it can be deterministically mocked in tests
if b.consFailedCalls >= b.limit {
if b.closedTimestamp.IsZero() || b.currentTimeFn().Sub(b.closedTimestamp) < b.backoff {
return ErrClosed
}
b.resetFailed()
if newBackoff := b.backoff * 2; newBackoff <= b.maxBackoff {
b.backoff = newBackoff
} else {
b.backoff = b.maxBackoff
}
}
if !b.firstFailedTimestamp.IsZero() && b.currentTimeFn().Sub(b.firstFailedTimestamp) >= b.failInterval {
b.resetFailed()
}
return nil
}
func (b *breaker) afterf(err error) error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err != nil {
if b.consFailedCalls == 0 {
b.firstFailedTimestamp = b.currentTimeFn()
}
b.consFailedCalls++
if b.consFailedCalls == b.limit {
b.closedTimestamp = b.currentTimeFn()
}
return err
}
b.resetFailed()
return nil
}
func (b *breaker) resetFailed() {
b.consFailedCalls = 0
b.firstFailedTimestamp = time.Time{}
}