-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
175 lines (147 loc) · 4.09 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package bhgrpcutils
import (
"fmt"
"github.com/buhuoxinxi/bh-go-grpc-utils/etcd_balancer"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"net"
"os"
"os/signal"
"runtime/debug"
"syscall"
)
func init() {
// init config
DefaultConfigFn()
}
// config
var (
config *Config // config
)
// NewServer grpc server
func NewServer() *grpc.Server {
// options
var opts []grpc.ServerOption
// ssl
if config.SSLEnable {
cred, err := credentials.NewServerTLSFromFile(config.SSLCertFile, config.SSLKeyFile)
if err != nil {
logrus.Panicf("credentials.NewServerTLSFromFile error : %v", err)
}
opts = append(opts, grpc.Creds(cred))
}
// unary interceptor
opts = append(opts, DefaultUnaryInterceptorFn())
// stream interceptor
opts = append(opts, DefaultStreamInterceptorFn())
return grpc.NewServer(opts...)
}
// RunServer start
func RunServer(server *grpc.Server) {
// tcp
lis, err := net.Listen("tcp", ":"+config.ServerPort)
if err != nil {
logrus.Panicf("RunServer net.Listen error : %v", err)
}
// server address
serverAddr := net.JoinHostPort(config.ServerHost, config.ServerPort)
logrus.Printf("server addr : %s", serverAddr)
// register server to etcd
if err := balancer.RegisterServer(serverAddr); err != nil {
logrus.Panicf("balancer.RegisterServer error : %v", err)
}
// remove server from etcd
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
balancer.UnRegisterServer(serverAddr)
if i, ok := s.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}()
// start
if err := server.Serve(lis); err != nil {
logrus.Panicf("RunServer server.Serve error : %v", err)
}
}
// DefaultGRPCAuthorizationFn grpc auth
var DefaultGRPCAuthorizationFn = func(ctx context.Context) error {
// metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {
// oauth2 authorization
if data, ok := md["authorization"]; ok {
_ = data
}
}
// status.Error(codes.Unauthenticated, "auth fail")
return nil
}
// DefaultUnaryInterceptorFn unary interceptor
var DefaultUnaryInterceptorFn = func() grpc.ServerOption {
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// request log
go DefaultUnaryRequestLog(ctx, info.FullMethod, req)
// auth
if err := DefaultGRPCAuthorizationFn(ctx); err != nil {
return nil, err
}
// recover
defer func() {
if e := recover(); e != nil {
debug.PrintStack()
err = status.Errorf(codes.Internal, "Panic err: %v", e)
}
}()
// next
return handler(ctx, req)
}
return grpc.UnaryInterceptor(interceptor)
}
// DefaultUnaryRequestLog unary request
var DefaultUnaryRequestLog = func(ctx context.Context, method string, reqParam interface{}) {
// metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {
// authorization
if data, ok := md["authorization"]; ok {
_ = data
}
}
}
// DefaultStreamInterceptorFn stream interceptor
var DefaultStreamInterceptorFn = func() grpc.ServerOption {
var interceptor = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
// request log
go DefaultStreamRequestLog(srv, ss, info)
// auth
if err := DefaultGRPCAuthorizationFn(ss.Context()); err != nil {
return err
}
// recover
defer func() {
if e := recover(); e != nil {
debug.PrintStack()
err = status.Errorf(codes.Internal, "Panic err: %v", e)
}
}()
// next
return handler(srv, ss)
}
return grpc.StreamInterceptor(interceptor)
}
// DefaultStreamRequestLog unary request
var DefaultStreamRequestLog = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo) {
// unary request log
DefaultUnaryRequestLog(
ss.Context(),
info.FullMethod,
fmt.Sprintf("stream interceptor; isClient(%v), isServer(%v)", info.IsClientStream, info.IsServerStream),
)
}