-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
307 lines (278 loc) · 10.4 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package grpc
import (
"context"
"fmt"
"math"
"net"
"os"
"runtime"
"sync"
"time"
"github.com/aluka-7/metacode"
"github.com/aluka-7/trace"
"github.com/aluka-7/utils"
"github.com/go-playground/validator/v10"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // NOTE: use grpc gzip by header grpc-accept-encoding
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
var _abortIndex int8 = math.MaxInt8 / 2
// ServerConfig 服务器配置信息
type ServerConfig struct {
Network string `json:"network"` // 网络为rpc监听网络,默认值为 tcp
Addr string `json:"address"` // 地址是rpc监听地址,默认值为 0.0.0.0:9000
Timeout utils.Duration `json:"timeout"` // 超时是每个rpc调用的上下文超时。
IdleTimeout utils.Duration `json:"idleTimeout"` // IdleTimeout 是一段持续时间,在这段时间内可以通过发送 GoAway 关闭空闲连接。 空闲持续时间是自最近一次未完成RPC的数量变为零或建立连接以来定义的。
MaxLifeTime utils.Duration `json:"maxLife"` // MaxLifeTime 是连接通过发送GoAway关闭之前可能存在的最长时间的持续时间。 将向+/- 10%的随机抖动添加到MaxConnectionAge中以分散连接风暴.
ForceCloseWait utils.Duration `json:"closeWait"` // ForceCloseWait 是 MaxLifeTime 之后的附加时间,在此之后将强制关闭连接。
KeepAliveInterval utils.Duration `json:"keepaliveInterval"` // 如果服务器没有看到任何活动,则 KeepAliveInterval 将在此时间段之后,对客户端进行ping操作以查看传输是否仍然有效。
KeepAliveTimeout utils.Duration `json:"keepaliveTimeout"` // 进行 keepalive 检查 ping 之后,服务器将等待一段时间的超时,并且即使在关闭连接后也看不到活动。
EnableLog bool `json:"enableLog"` // 是否打开日志
}
// Server 是框架的服务器端实例,它包含RpcServer,拦截器和拦截器。
// 通过使用NewServer()创建Server的实例。
type Server struct {
conf *ServerConfig
mutex sync.RWMutex
server *grpc.Server
handlers []grpc.UnaryServerInterceptor
}
// handle为OpenTracing\Logging\LinkTimeout返回一个新的一元服务器拦截器。
func (s *Server) handle() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// 对于性能进行监测等等
s.mutex.RLock()
conf := s.conf
s.mutex.RUnlock()
// 从rpc上下文获取派生超时,与配置的看守进行比较,并使用最小值
timeout := time.Duration(conf.Timeout)
if dl, ok := ctx.Deadline(); ok {
_timeout := time.Until(dl)
if _timeout-time.Millisecond*20 > 0 {
_timeout = _timeout - time.Millisecond*20
}
if timeout > _timeout {
timeout = _timeout
}
}
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
// 获取rpc元数据(trace&remote_ip&color)
var t trace.Trace
cmd := metacode.Metadata{}
if gmd, ok := metadata.FromIncomingContext(ctx); ok {
t, _ = trace.Extract(trace.GRPCFormat, gmd)
for k, v := range gmd {
if metacode.IsIncomingKey(k) {
cmd[k] = v[0]
}
}
}
if t == nil {
t = trace.New(args.FullMethod)
} else {
t.SetTitle(args.FullMethod)
}
var addr string
if pr, ok := peer.FromContext(ctx); ok {
addr = pr.Addr.String()
t.SetTag(trace.String(trace.TagAddress, addr))
}
defer t.Finish(&err)
// 使用公共元数据上下文而不是rpc上下文
ctx = metacode.NewContext(ctx, cmd)
ctx = trace.NewContext(ctx, t)
resp, err = handler(ctx, req)
return resp, FromError(err).Err()
}
}
// NewServer 带有默认服务器拦截器的新的空白Server实例。
func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) {
s = new(Server)
if err := s.SetConfig(conf); err != nil {
panic(errors.Errorf("rpc set config failed!err: %s", err.Error()))
}
keepParam := grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Duration(s.conf.IdleTimeout),
MaxConnectionAgeGrace: time.Duration(s.conf.ForceCloseWait),
Time: time.Duration(s.conf.KeepAliveInterval),
Timeout: time.Duration(s.conf.KeepAliveTimeout),
MaxConnectionAge: time.Duration(s.conf.MaxLifeTime),
})
opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor))
s.server = grpc.NewServer(opt...)
s.Use(s.recovery(), s.handle(), s.serverLogging(), s.validate())
return
}
// SetConfig 热重载服务器配置
func (s *Server) SetConfig(conf *ServerConfig) (err error) {
if conf.Addr == "" {
log.Warn().Msg("ServerConfig Addr is not empty")
}
if conf.Network == "" {
log.Warn().Msg("ServerConfig Network is not empty")
}
s.mutex.Lock()
s.conf = conf
s.mutex.Unlock()
return nil
}
// 拦截器是许多拦截器链中的单个拦截器。
// 执行以从左到右的顺序进行,包括传递上下文.
// 例如:ChainUnaryServer(1、2、3)将在3之前的2之前执行1,而3将看到1和2的上下文更改.
func (s *Server) interceptor(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
var i int
var chain grpc.UnaryHandler
n := len(s.handlers)
if n == 0 {
return handler(ctx, req)
}
chain = func(ic context.Context, ir interface{}) (interface{}, error) {
if i == n-1 {
return handler(ic, ir)
}
i++
return s.handlers[i](ic, ir, args, chain)
}
return s.handlers[0](ctx, req, args, chain)
}
// Server 返回用于注册服务的rpc服务器.
func (s *Server) Server() *grpc.Server {
return s.server
}
// Use 将全局拦截器附加到服务器.
// 例如:这是速率限制器或错误管理拦截器的正确位置.
func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server {
finalSize := len(s.handlers) + len(handlers)
if finalSize >= int(_abortIndex) {
panic("rpc: server use too many handlers")
}
mergedHandlers := make([]grpc.UnaryServerInterceptor, finalSize)
copy(mergedHandlers, s.handlers)
copy(mergedHandlers[len(s.handlers):], handlers)
s.handlers = mergedHandlers
return s
}
// Run 运行create tcp侦听器,并启动goroutine为每个传入请求提供服务。
// 除非调用Stop或GracefulStop,否则Run将返回非nil错误。
func (s *Server) Run(addr string) error {
fmt.Printf("启动rpc监听地址: %s\n", addr)
if lis, err := net.Listen("tcp", addr); err != nil {
err = errors.WithStack(err)
return err
} else {
reflection.Register(s.server)
return s.Serve(lis)
}
}
// RunUnix 创建一个unix侦听器并启动goroutine来处理每个传入的请求.
// 除非调用Stop或GracefulStop,否则RunUnix将返回非nil错误.
func (s *Server) RunUnix(file string) error {
fmt.Printf("启动rpc监听unix文件: %s\n", file)
if lis, err := net.Listen("unix", file); err != nil {
err = errors.WithStack(err)
return err
} else {
reflection.Register(s.server)
return s.Serve(lis)
}
}
// Start 开始使用配置的listen addr创建一个新的goroutine运行服务器,如果发生任何错误,它将惊慌返回服务器本身.
func (s *Server) Start() (*Server, error) {
if _, err := s.startWithAddr(); err != nil {
return nil, err
} else {
return s, nil
}
}
// StartWithAddr 使用配置的监听地址创建一个新的goroutine运行服务器,如果发生任何错误,它将崩溃
// 返回服务器本身和实际的监听地址(如果配置的监听端口为零,则操作系统将分配一个未使用的端口)
func (s *Server) StartWithAddr() (*Server, net.Addr, error) {
if addr, err := s.startWithAddr(); err != nil {
return nil, nil, err
} else {
return s, addr, nil
}
}
func (s *Server) startWithAddr() (net.Addr, error) {
if lis, err := net.Listen(s.conf.Network, s.conf.Addr); err != nil {
return nil, err
} else {
fmt.Printf("rpc: start grpc listen addr: %v\n", lis.Addr())
reflection.Register(s.server)
go func() {
if err := s.Serve(lis); err != nil {
panic(err)
}
}()
return lis.Addr(), nil
}
}
// Serve在侦听器lis上接受传入连接,从而为每个连接创建一个新的ServerTransport和服务goroutine。
// 除非调用Stop或GracefulStop,否则Serve将返回非nil错误.
func (s *Server) Serve(lis net.Listener) error {
return s.server.Serve(lis)
}
// Shutdown可以正常停止服务器。
// 它停止服务器接受新的连接和RPC,并阻止直到所有未完成的RPC完成或到达上下文截止日期为止.
func (s *Server) Shutdown(ctx context.Context) (err error) {
ch := make(chan struct{})
go func() {
s.server.GracefulStop()
close(ch)
}()
select {
case <-ctx.Done():
s.server.Stop()
err = ctx.Err()
case <-ch:
}
return
}
var validate = validator.New()
// 验证返回一个客户端拦截器,以验证每个RPC调用的传入请求.
func (s *Server) validate() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if err = validate.Struct(req); err != nil {
err = metacode.Error(metacode.ValidateErr, err.Error())
return
}
resp, err = handler(ctx, req)
return
}
}
// RegisterValidation 将验证功能添加到由键表示的验证者的验证者映射中
// 注意:如果密钥已经存在,则先前的验证功能将被替换。
// 注意:此方法不是线程安全的,因此应在进行任何验证之前先将它们全部注册
func (s *Server) RegisterValidation(key string, fn validator.Func) error {
return validate.RegisterValidation(key, fn)
}
// recovery是从任何紧急情况中恢复的服务器拦截器。
func (s *Server) recovery() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if _err := recover(); _err != nil {
const size = 64 << 10
buf := make([]byte, size)
rs := runtime.Stack(buf, false)
if rs > size {
rs = size
}
buf = buf[:rs]
pl := fmt.Sprintf("grpc server panic: %v\n%v\n%s\n", req, _err, buf)
fmt.Fprintf(os.Stderr, pl)
err = status.Errorf(codes.Unknown, metacode.ServerErr.Error())
}
}()
resp, err = handler(ctx, req)
return
}
}