forked from failsafe-go/failsafe-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
circuitbreaker.go
348 lines (288 loc) · 10.1 KB
/
circuitbreaker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
package circuitbreaker
import (
"errors"
"sync"
"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/policy"
)
// ErrCircuitBreakerOpen is returned when an execution is attempted against a circuit breaker that is open.
var ErrCircuitBreakerOpen = errors.New("circuit breaker open")
// State of a CircuitBreaker.
type State int
func (s State) String() string {
switch s {
case ClosedState:
return "closed"
case OpenState:
return "open"
case HalfOpenState:
return "half-open"
default:
return "unknown"
}
}
const (
// ClosedState indicates the circuit is closed and fully functional, allowing executions to occur.
ClosedState State = iota
// OpenState indicates the circuit is opened and not allowing executions to occur.
OpenState
// HalfOpenState indicates the circuit is temporarily allowing executions to occur.
HalfOpenState
)
/*
CircuitBreaker is a policy that temporarily blocks execution when a configured number of failures are exceeded. Circuit
breakers have three states: closed, open, and half-open. When a circuit breaker is in the ClosedState (default),
executions are allowed. If a configurable number of failures occur, optionally over some time period, the circuit
breaker transitions to OpenState. In the OpenState a circuit breaker will fail executions with ErrCircuitBreakerOpen.
After a configurable delay, the circuit breaker will transition to HalfOpenState. In the HalfOpenState a configurable
number of trial executions will be allowed, after which the circuit breaker will transition to either ClosedState or
OpenState depending on how many were successful.
A circuit breaker can be count based or time based:
- Count based circuit breakers will transition between states when recent execution results exceed a threshold.
- Time based circuit breakers will transition between states when recent execution results exceed a threshold within a time
period. A minimum number of executions must be performed in order for a state transition to occur. Time based circuit breakers
use a sliding window to aggregate execution results. The window is divided into 10 time slices, each representing 1/10th
of the failureThresholdingPeriod. As time progresses, statistics for old time slices are gradually discarded, which
smoothes the calculation of success and failure rates.
This type is concurrency safe.
*/
type CircuitBreaker[R any] interface {
failsafe.Policy[R]
// Open opens the CircuitBreaker.
Open()
// HalfOpen half-opens the CircuitBreaker.
HalfOpen()
// Close closes the CircuitBreaker.
Close()
// IsOpen returns whether the CircuitBreaker is open.
IsOpen() bool
// IsHalfOpen returns whether the CircuitBreaker is half-open.
IsHalfOpen() bool
// IsClosed returns whether the CircuitBreaker is closed.
IsClosed() bool
// State returns the State of the CircuitBreaker.
State() State
// Metrics returns metrics for the CircuitBreaker.
Metrics() Metrics
// TryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired.
// Permission will be automatically released when a result or failure is recorded.
TryAcquirePermit() bool
// RecordResult records an execution result as a success or failure based on the failure handling configuration.
RecordResult(result R)
// RecordError records an error as a success or failure based on the failure handling configuration.
RecordError(err error)
// RecordSuccess records an execution success.
RecordSuccess()
// RecordFailure records an execution failure.
RecordFailure()
}
type Metrics interface {
// Executions returns the number of executions recorded in the current state when the state is ClosedState or
// HalfOpenState. When the state is OpenState, this returns the executions recorded during the previous ClosedState.
//
// For count based thresholding, the max number of executions is limited to the execution threshold. For time based
// thresholds, the number of executions may vary within the thresholding period.
Executions() uint
// Failures returns the number of failures recorded in the current state when in a ClosedState or HalfOpenState. When
// in OpenState, this returns the failures recorded during the previous ClosedState.
//
// For count based thresholds, the max number of failures is based on the failure threshold. For time based thresholds,
// the number of failures may vary within the failure thresholding period.
Failures() uint
// FailureRate returns the percentage rate of failed executions, from 0 to 100, in the current state when in a
// ClosedState or HalfOpenState. When in OpenState, this returns the rate recorded during the previous ClosedState.
//
// The rate is based on the configured failure thresholding capacity.
FailureRate() uint
// Successes returns the number of successes recorded in the current state when in a ClosedState or HalfOpenState.
// When in OpenState, this returns the successes recorded during the previous ClosedState.
//
// The max number of successes is based on the success threshold.
Successes() uint
// SuccessRate returns percentage rate of successful executions, from 0 to 100, in the current state when in a
// ClosedState or HalfOpenState. When in OpenState, this returns the successes recorded during the previous ClosedState.
//
// The rate is based on the configured success thresholding capacity.
SuccessRate() uint
}
// StateChangedEvent indicates a CircuitBreaker's state has changed.
type StateChangedEvent struct {
OldState State
NewState State
}
type circuitBreaker[R any] struct {
config *circuitBreakerConfig[R]
mtx sync.Mutex
// Guarded by mtx
state circuitState[R]
}
func (cb *circuitBreaker[R]) TryAcquirePermit() bool {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.tryAcquirePermit()
}
func (cb *circuitBreaker[R]) Open() {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.open(nil)
}
func (cb *circuitBreaker[R]) HalfOpen() {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.halfOpen()
}
func (cb *circuitBreaker[R]) Close() {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.close()
}
func (cb *circuitBreaker[R]) State() State {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.state.getState()
}
func (cb *circuitBreaker[R]) Metrics() Metrics {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb
}
func (cb *circuitBreaker[R]) IsOpen() bool {
return cb.State() == OpenState
}
func (cb *circuitBreaker[R]) IsHalfOpen() bool {
return cb.State() == HalfOpenState
}
func (cb *circuitBreaker[R]) IsClosed() bool {
return cb.State() == ClosedState
}
func (cb *circuitBreaker[R]) Executions() uint {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.state.getStats().getExecutionCount()
}
func (cb *circuitBreaker[R]) Failures() uint {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.state.getStats().getFailureCount()
}
func (cb *circuitBreaker[R]) FailureRate() uint {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.state.getStats().getFailureRate()
}
func (cb *circuitBreaker[R]) Successes() uint {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.state.getStats().getSuccessCount()
}
func (cb *circuitBreaker[R]) SuccessRate() uint {
cb.mtx.Lock()
defer cb.mtx.Unlock()
return cb.state.getStats().getSuccessRate()
}
func (cb *circuitBreaker[R]) RecordFailure() {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.recordFailure(nil)
}
func (cb *circuitBreaker[R]) RecordError(err error) {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.recordResult(*(new(R)), err)
}
func (cb *circuitBreaker[R]) RecordResult(result R) {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.recordResult(result, nil)
}
func (cb *circuitBreaker[R]) RecordSuccess() {
cb.mtx.Lock()
defer cb.mtx.Unlock()
cb.recordSuccess()
}
func (cb *circuitBreaker[R]) ToExecutor(policyIndex int, _ R) any {
cbe := &circuitBreakerExecutor[R]{
BaseExecutor: &policy.BaseExecutor[R]{
BaseFailurePolicy: cb.config.BaseFailurePolicy,
PolicyIndex: policyIndex,
},
circuitBreaker: cb,
}
cbe.Executor = cbe
return cbe
}
// Transitions to the newState if not already in that state and calls listener after transitioning.
//
// Requires external locking.
func (cb *circuitBreaker[R]) transitionTo(newState State, exec failsafe.Execution[R], listener func(StateChangedEvent)) {
transitioned := false
currentState := cb.state.getState()
if currentState != newState {
switch newState {
case ClosedState:
cb.state = newClosedState(cb)
case OpenState:
delay := cb.config.ComputeDelay(exec)
if delay == -1 {
delay = cb.config.Delay
}
cb.state = newOpenState(cb, cb.state, delay)
case HalfOpenState:
cb.state = newHalfOpenState(cb)
}
transitioned = true
}
if transitioned {
event := StateChangedEvent{
OldState: currentState,
NewState: newState,
}
if cb.config.stateChangedListener != nil {
cb.config.stateChangedListener(event)
}
if listener != nil {
listener(event)
}
}
}
// Requires external locking.
func (cb *circuitBreaker[R]) tryAcquirePermit() bool {
return cb.state.tryAcquirePermit()
}
// Opens the circuit breaker and considers the execution when computing the delay before the circuit breaker
// will transition to half open.
//
// Requires external locking.
func (cb *circuitBreaker[R]) open(execution failsafe.Execution[R]) {
cb.transitionTo(OpenState, execution, cb.config.openListener)
}
// Requires external locking.
func (cb *circuitBreaker[R]) close() {
cb.transitionTo(ClosedState, nil, cb.config.closeListener)
}
// Requires external locking.
func (cb *circuitBreaker[R]) halfOpen() {
cb.transitionTo(HalfOpenState, nil, cb.config.halfOpenListener)
}
// Requires external locking.
func (cb *circuitBreaker[R]) recordResult(result R, err error) {
if cb.config.IsFailure(result, err) {
cb.recordFailure(nil)
} else {
cb.recordSuccess()
}
}
// Requires external locking.
func (cb *circuitBreaker[R]) recordSuccess() {
cb.state.getStats().recordSuccess()
cb.state.checkThresholdAndReleasePermit(nil)
}
// Requires external locking.
func (cb *circuitBreaker[R]) recordFailure(exec failsafe.Execution[R]) {
cb.state.getStats().recordFailure()
cb.state.checkThresholdAndReleasePermit(exec)
}
func (cb *circuitBreaker[R]) Reset() {
cb.close()
cb.state.getStats().reset()
}