-
Notifications
You must be signed in to change notification settings - Fork 469
/
transport_handler.go
executable file
·121 lines (103 loc) · 3.47 KB
/
transport_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package handler
import (
"time"
"github.com/go-chassis/go-chassis/client/rest"
"github.com/go-chassis/go-chassis/core/client"
"github.com/go-chassis/go-chassis/core/common"
"github.com/go-chassis/go-chassis/core/config"
"github.com/go-chassis/go-chassis/core/invocation"
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/loadbalancer"
"github.com/go-chassis/go-chassis/session"
)
// TransportHandler transport handler
type TransportHandler struct{}
// Name returns transport string
func (th *TransportHandler) Name() string {
return "transport"
}
func errNotNill(err error, cb invocation.ResponseCallBack) {
r := &invocation.Response{
Err: err,
}
lager.Logger.Error("GetClient got Error", err)
cb(r)
return
}
// Handle is to handle transport related things
func (th *TransportHandler) Handle(chain *Chain, i *invocation.Invocation, cb invocation.ResponseCallBack) {
c, err := client.GetClient(i.Protocol, i.MicroServiceName)
if err != nil {
errNotNill(err, cb)
}
r := &invocation.Response{}
//taking the time elapsed to check for latency aware strategy
timeBefore := time.Now()
err = c.Call(i.Ctx, i.Endpoint, i, i.Reply)
if err != nil {
r.Err = err
lager.Logger.Errorf(err, "Call got Error")
if i.Strategy == loadbalancer.StrategySessionStickiness {
ProcessSpecialProtocol(i)
ProcessSuccessiveFailure(i)
}
cb(r)
return
}
if i.Strategy == loadbalancer.StrategyLatency {
timeAfter := time.Since(timeBefore)
loadbalancer.SetLatency(timeAfter, i.Endpoint, i.MicroServiceName, i.RouteTags, i.Protocol)
}
if i.Strategy == loadbalancer.StrategySessionStickiness {
ProcessSpecialProtocol(i)
}
r.Result = i.Reply
cb(r)
}
//ProcessSpecialProtocol handles special logic for protocol
func ProcessSpecialProtocol(inv *invocation.Invocation) {
switch inv.Protocol {
case common.ProtocolRest:
var reply *rest.Response
if inv.Reply != nil && inv.Args != nil {
reply = inv.Reply.(*rest.Response)
req := inv.Args.(*rest.Request)
session.SaveSessionIDFromHTTP(inv.Endpoint, config.GetSessionTimeout(inv.SourceMicroService, inv.MicroServiceName), reply.GetResponse(), req.GetRequest())
}
case common.ProtocolHighway:
inv.Ctx = session.SaveSessionIDFromContext(inv.Ctx, inv.Endpoint, config.GetSessionTimeout(inv.SourceMicroService, inv.MicroServiceName))
}
}
//ProcessSuccessiveFailure handles special logic for protocol
func ProcessSuccessiveFailure(i *invocation.Invocation) {
var cookie string
var reply *rest.Response
switch i.Protocol {
case common.ProtocolRest:
if i.Reply != nil && i.Args != nil {
reply = i.Reply.(*rest.Response)
}
cookie = session.GetSessionCookie(nil, reply.GetResponse())
if cookie != "" {
loadbalancer.IncreaseSuccessiveFailureCount(cookie)
errCount := loadbalancer.GetSuccessiveFailureCount(cookie)
if errCount == config.StrategySuccessiveFailedTimes(i.SourceServiceID, i.MicroServiceName) {
session.DeletingKeySuccessiveFailure(reply.GetResponse())
loadbalancer.DeleteSuccessiveFailureCount(cookie)
}
}
default:
cookie = session.GetSessionCookie(i.Ctx, nil)
if cookie != "" {
loadbalancer.IncreaseSuccessiveFailureCount(cookie)
errCount := loadbalancer.GetSuccessiveFailureCount(cookie)
if errCount == config.StrategySuccessiveFailedTimes(i.SourceServiceID, i.MicroServiceName) {
session.DeletingKeySuccessiveFailure(nil)
loadbalancer.DeleteSuccessiveFailureCount(cookie)
}
}
}
}
func newTransportHandler() Handler {
return &TransportHandler{}
}