-
Notifications
You must be signed in to change notification settings - Fork 787
/
fallback.go
139 lines (123 loc) · 4.76 KB
/
fallback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package fallback ...
package fallback
import (
"context"
"errors"
"reflect"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/utils"
)
// ErrorFallback is to build fallback policy for error.
func ErrorFallback(ef Func) *Policy {
return &Policy{fallbackFunc: func(ctx context.Context, req utils.KitexArgs, resp utils.KitexResult, err error) (fbErr error) {
if err == nil {
return err
}
return ef(ctx, req, resp, err)
}}
}
// TimeoutAndCBFallback is to build fallback policy for rpc timeout and circuit breaker error.
// Kitex will filter the errors, only timeout and circuit breaker can trigger the ErrorFunc to execute.
func TimeoutAndCBFallback(ef Func) *Policy {
return &Policy{fallbackFunc: func(ctx context.Context, req utils.KitexArgs, resp utils.KitexResult, err error) (fbErr error) {
if err == nil {
return err
}
if kerrors.IsTimeoutError(err) || errors.Is(err, kerrors.ErrCircuitBreak) {
return ef(ctx, req, resp, err)
}
return err
}}
}
// NewFallbackPolicy is to build a fallback policy.
func NewFallbackPolicy(fb Func) *Policy {
return &Policy{
fallbackFunc: fb,
}
}
// Policy is the definition for fallback.
// - fallbackFunc is fallback func.
// - reportAsFallback is used to decide whether to report Metric according to the Fallback result.
type Policy struct {
fallbackFunc Func
reportAsFallback bool
}
func (p *Policy) EnableReportAsFallback() *Policy {
p.reportAsFallback = true
return p
}
// IsPolicyValid to check if the Fallback policy is valid.
func IsPolicyValid(p *Policy) bool {
return p != nil && p.fallbackFunc != nil
}
// UnwrapHelper helps to get the real request and response.
// Therefor, the RealReqRespFunc only need to process the real rpc req and resp but not the XXXArgs and XXXResult.
// eg:
//
// `client.WithFallback(fallback.NewFallbackPolicy(fallback.UnwrapHelper(yourRealReqRespFunc)))`
func UnwrapHelper(userFB RealReqRespFunc) Func {
return func(ctx context.Context, args utils.KitexArgs, result utils.KitexResult, err error) error {
req, resp := args.GetFirstArgument(), result.GetResult()
fbResp, fbErr := userFB(ctx, req, resp, err)
if fbResp != nil {
result.SetSuccess(fbResp)
}
return fbErr
}
}
// Func is the definition for fallback func, which can do fallback both for error and resp.
// Notice !! The args and result are not the real rpc req and resp, are respectively XXXArgs and XXXResult of generated code.
// setup eg: client.WithFallback(fallback.NewFallbackPolicy(yourFunc))
type Func func(ctx context.Context, args utils.KitexArgs, result utils.KitexResult, err error) (fbErr error)
// RealReqRespFunc is the definition for fallback func with real rpc req as param, and must return the real rpc resp.
// setup eg: client.WithFallback(fallback.TimeoutAndCBFallback(fallback.UnwrapHelper(yourRealReqRespFunc)))
type RealReqRespFunc func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error)
// DoIfNeeded do fallback
func (p *Policy) DoIfNeeded(ctx context.Context, ri rpcinfo.RPCInfo, args, result interface{}, err error) (fbErr error, reportAsFallback bool) {
if p == nil {
return err, false
}
ka, kaOK := args.(utils.KitexArgs)
kr, krOK := result.(utils.KitexResult)
if !kaOK || !krOK {
klog.Warn("KITEX: fallback cannot be supported, the args and result must be KitexArgs and KitexResult")
return err, false
}
err, allowReportAsFB := getBizErrIfExist(ri, err)
reportAsFallback = allowReportAsFB && p.reportAsFallback
fbErr = p.fallbackFunc(ctx, ka, kr, err)
if fbErr == nil && reflect.ValueOf(kr.GetResult()).IsNil() {
klog.Warnf("KITEX: both fallback resp and error are nil, return original err=%v, to_service=%s to_method=%s",
err, ri.To().ServiceName(), ri.To().Method())
return err, false
}
return fbErr, reportAsFallback
}
func getBizErrIfExist(ri rpcinfo.RPCInfo, err error) (error, bool) {
if err == nil {
if bizErr := ri.Invocation().BizStatusErr(); bizErr != nil {
// biz error also as error passed to fallback
err = bizErr
}
// if err is nil, reportAsFallback always be false even if user set true
return err, false
}
return err, true
}