-
Notifications
You must be signed in to change notification settings - Fork 28
/
timeoutexecutor.go
56 lines (48 loc) · 1.93 KB
/
timeoutexecutor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package timeout
import (
"errors"
"sync/atomic"
"time"
"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/common"
"github.com/failsafe-go/failsafe-go/internal"
"github.com/failsafe-go/failsafe-go/policy"
)
// timeoutExecutor is a policy.Executor that handles failures according to a Timeout.
type timeoutExecutor[R any] struct {
*policy.BaseExecutor[R]
*timeout[R]
}
var _ policy.Executor[any] = &timeoutExecutor[any]{}
func (e *timeoutExecutor[R]) Apply(innerFn func(failsafe.Execution[R]) *common.PolicyResult[R]) func(failsafe.Execution[R]) *common.PolicyResult[R] {
// This func sets up a race between a timeout and the innerFn returning
return func(exec failsafe.Execution[R]) *common.PolicyResult[R] {
execInternal := exec.(policy.ExecutionInternal[R])
// Create child context
execInternal = execInternal.CopyForCancellable().(policy.ExecutionInternal[R])
var result atomic.Pointer[common.PolicyResult[R]]
timer := time.AfterFunc(e.config.timeLimit, func() {
timeoutResult := internal.FailureResult[R](ErrExceeded)
if result.CompareAndSwap(nil, timeoutResult) {
// Sets the timeoutResult, overwriting any previously set result for the execution. This is correct, because while an
// execution may have completed, inner policies such as fallbacks may still be processing that result, in which case
// it's still important to interrupt them with a timeout.
execInternal.Cancel(timeoutResult)
if e.config.onTimeoutExceeded != nil {
e.config.onTimeoutExceeded(failsafe.ExecutionDoneEvent[R]{
ExecutionStats: execInternal,
Error: ErrExceeded,
})
}
}
})
// Store result and ctxCancel timeout context if needed
if result.CompareAndSwap(nil, innerFn(execInternal)) {
timer.Stop()
}
return e.PostExecute(execInternal, result.Load())
}
}
func (e *timeoutExecutor[R]) IsFailure(_ R, err error) bool {
return err != nil && errors.Is(err, ErrExceeded)
}