forked from zeromicro/go-zero
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
133 lines (110 loc) · 3.47 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package zrpc
import (
"log"
"time"
"github.com/hduhelp/go-zero/core/load"
"github.com/hduhelp/go-zero/core/logx"
"github.com/hduhelp/go-zero/core/stat"
"github.com/hduhelp/go-zero/zrpc/internal"
"github.com/hduhelp/go-zero/zrpc/internal/auth"
"github.com/hduhelp/go-zero/zrpc/internal/serverinterceptors"
"google.golang.org/grpc"
)
// A RpcServer is a rpc server.
type RpcServer struct {
server internal.Server
register internal.RegisterFn
}
// MustNewServer returns a RpcSever, exits on any error.
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
server, err := NewServer(c, register)
if err != nil {
log.Fatal(err)
}
return server
}
// NewServer returns a RpcServer.
func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) {
var err error
if err = c.Validate(); err != nil {
return nil, err
}
var server internal.Server
metrics := stat.NewMetrics(c.ListenOn)
serverOptions := []internal.ServerOption{
internal.WithMetrics(metrics),
internal.WithRpcHealth(c.Health),
}
if c.HasEtcd() {
server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)
if err != nil {
return nil, err
}
} else {
server = internal.NewRpcServer(c.ListenOn, serverOptions...)
}
server.SetName(c.Name)
if err = setupInterceptors(server, c, metrics); err != nil {
return nil, err
}
rpcServer := &RpcServer{
server: server,
register: register,
}
if err = c.SetUp(); err != nil {
return nil, err
}
return rpcServer, nil
}
// AddOptions adds given options.
func (rs *RpcServer) AddOptions(options ...grpc.ServerOption) {
rs.server.AddOptions(options...)
}
// AddStreamInterceptors adds given stream interceptors.
func (rs *RpcServer) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {
rs.server.AddStreamInterceptors(interceptors...)
}
// AddUnaryInterceptors adds given unary interceptors.
func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
rs.server.AddUnaryInterceptors(interceptors...)
}
// Start starts the RpcServer.
// Graceful shutdown is enabled by default.
// Use proc.SetTimeToForceQuit to customize the graceful shutdown period.
func (rs *RpcServer) Start() {
if err := rs.server.Start(rs.register); err != nil {
logx.Error(err)
panic(err)
}
}
// Stop stops the RpcServer.
func (rs *RpcServer) Stop() {
logx.Close()
}
// DontLogContentForMethod disable logging content for given method.
func DontLogContentForMethod(method string) {
serverinterceptors.DontLogContentForMethod(method)
}
// SetServerSlowThreshold sets the slow threshold on server side.
func SetServerSlowThreshold(threshold time.Duration) {
serverinterceptors.SetSlowThreshold(threshold)
}
func setupInterceptors(server internal.Server, c RpcServerConf, metrics *stat.Metrics) error {
if c.CpuThreshold > 0 {
shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
server.AddUnaryInterceptors(serverinterceptors.UnarySheddingInterceptor(shedder, metrics))
}
if c.Timeout > 0 {
server.AddUnaryInterceptors(serverinterceptors.UnaryTimeoutInterceptor(
time.Duration(c.Timeout) * time.Millisecond))
}
if c.Auth {
authenticator, err := auth.NewAuthenticator(c.Redis.NewRedis(), c.Redis.Key, c.StrictControl)
if err != nil {
return err
}
server.AddStreamInterceptors(serverinterceptors.StreamAuthorizeInterceptor(authenticator))
server.AddUnaryInterceptors(serverinterceptors.UnaryAuthorizeInterceptor(authenticator))
}
return nil
}