-
Notifications
You must be signed in to change notification settings - Fork 3
/
micro.go
309 lines (252 loc) · 8.67 KB
/
micro.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package micro
import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
)
// Service represents the microservice
type Service struct {
GRPCServer *grpc.Server
HTTPServer *http.Server
httpHandler HTTPHandlerFunc
errorHandler runtime.ErrorHandlerFunc
annotators []AnnotatorFunc
redoc *RedocOpts
staticDir string
muxOptions []runtime.ServeMuxOption
mux *runtime.ServeMux
routes []Route
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
shutdownFunc func()
shutdownTimeout time.Duration
preShutdownDelay time.Duration
interruptSignals []os.Signal
grpcServerOptions []grpc.ServerOption
grpcDialOptions []grpc.DialOption
logger Logger
}
const (
// the default timeout before the server shutdown abruptly
defaultShutdownTimeout = 30 * time.Second
// the default time waiting for running goroutines to finish their jobs before the shutdown starts
defaultPreShutdownDelay = 1 * time.Second
)
// ReverseProxyFunc is the callback that the caller should implement to steps to reverse-proxy the HTTP/1 requests to gRPC
type ReverseProxyFunc func(ctx context.Context, mux *runtime.ServeMux, grpcHostAndPort string, opts []grpc.DialOption) error
// HTTPHandlerFunc is the http middleware handler function
type HTTPHandlerFunc func(*runtime.ServeMux) http.Handler
// AnnotatorFunc is the annotator function is for injecting meta data from http request into gRPC context
type AnnotatorFunc func(context.Context, *http.Request) metadata.MD
// DefaultHTTPHandler is the default http handler which does nothing
func DefaultHTTPHandler(mux *runtime.ServeMux) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
})
}
func defaultService() *Service {
s := Service{}
s.httpHandler = DefaultHTTPHandler
s.errorHandler = runtime.DefaultHTTPErrorHandler
s.shutdownFunc = func() {}
s.shutdownTimeout = defaultShutdownTimeout
s.preShutdownDelay = defaultPreShutdownDelay
s.logger = dummyLogger
s.redoc = &RedocOpts{
Up: false,
}
// default interrupt signals to catch, you can use InterruptSignal option to append more
s.interruptSignals = InterruptSignals
s.streamInterceptors = []grpc.StreamServerInterceptor{}
s.unaryInterceptors = []grpc.UnaryServerInterceptor{}
// install validator interceptor
s.streamInterceptors = append(s.streamInterceptors, grpc_validator.StreamServerInterceptor())
s.unaryInterceptors = append(s.unaryInterceptors, grpc_validator.UnaryServerInterceptor())
// install prometheus interceptor
s.streamInterceptors = append(s.streamInterceptors, grpc_prometheus.StreamServerInterceptor)
s.unaryInterceptors = append(s.unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
// install panic handler which will turn panics into gRPC errors
s.streamInterceptors = append(s.streamInterceptors, grpc_recovery.StreamServerInterceptor())
s.unaryInterceptors = append(s.unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
// add /metrics HTTP/1 endpoint
routeMetrics := Route{
Method: "GET",
Path: "/metrics",
Handler: func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
promhttp.Handler().ServeHTTP(w, r)
},
}
s.routes = append(s.routes, routeMetrics)
return &s
}
// NewService creates a new microservice
func NewService(opts ...Option) *Service {
s := defaultService()
s.apply(opts...)
// default dial option is using insecure connection
if len(s.grpcDialOptions) == 0 {
s.grpcDialOptions = append(s.grpcDialOptions, grpc.WithInsecure())
}
// init gateway mux
s.muxOptions = append(s.muxOptions, runtime.WithErrorHandler(s.errorHandler))
for _, annotator := range s.annotators {
s.muxOptions = append(s.muxOptions, runtime.WithMetadata(annotator))
}
s.mux = runtime.NewServeMux(s.muxOptions...)
s.grpcServerOptions = append(s.grpcServerOptions, grpc_middleware.WithStreamServerChain(s.streamInterceptors...))
s.grpcServerOptions = append(s.grpcServerOptions, grpc_middleware.WithUnaryServerChain(s.unaryInterceptors...))
s.GRPCServer = grpc.NewServer(
s.grpcServerOptions...,
)
if s.HTTPServer == nil {
s.HTTPServer = &http.Server{}
}
return s
}
// Getpid gets the process id of server
func (s *Service) Getpid() int {
return os.Getpid()
}
// Start starts the microservice with listening on the ports
func (s *Service) Start(httpPort uint, grpcPort uint, reverseProxyFunc ReverseProxyFunc) error {
// intercept interrupt signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, s.interruptSignals...)
// channels to receive error
errChan1 := make(chan error, 1)
errChan2 := make(chan error, 1)
// start gRPC server
go func() {
s.logger.Printf("Starting gPRC server listening on %d", grpcPort)
errChan1 <- s.startGRPCServer(grpcPort)
}()
// start HTTP/1.0 gateway server
go func() {
s.logger.Printf("Starting http server listening on %d", httpPort)
errChan2 <- s.startGRPCGateway(httpPort, grpcPort, reverseProxyFunc)
}()
// wait for context cancellation or shutdown signal
select {
// if gRPC server fail to start
case err := <-errChan1:
return err
// if http server fail to start
case err := <-errChan2:
return err
// if we received an interrupt signal
case sig := <-sigChan:
s.logger.Printf("Interrupt signal received: %v", sig)
s.Stop()
return nil
}
}
func (s *Service) startGRPCServer(grpcPort uint) error {
// register reflection service on gRPC server.
reflection.Register(s.GRPCServer)
grpcHost := fmt.Sprintf(":%d", grpcPort)
lis, err := net.Listen("tcp", grpcHost)
if err != nil {
return err
}
return s.GRPCServer.Serve(lis)
}
func (s *Service) startGRPCGateway(httpPort uint, grpcPort uint, reverseProxyFunc ReverseProxyFunc) error {
if s.redoc.Up {
s.redoc.EnsureDefaults()
// add redoc endpoint for api docs
routeDocs := Route{
Method: "GET",
Path: s.redoc.Route,
Handler: func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
s.redoc.Serve(w, r, pathParams)
},
}
s.AddRoutes(routeDocs)
// host local spec files if not set yet
for _, url := range s.redoc.SpecURLs {
if strings.HasPrefix(url, "/") {
fileRoute := Route{
Method: "GET",
Path: url,
Handler: s.ServeFile,
}
if !s.HasRoute(fileRoute) {
s.AddRoutes(fileRoute)
}
}
}
}
err := reverseProxyFunc(context.Background(), s.mux, fmt.Sprintf("localhost:%d", grpcPort), s.grpcDialOptions)
if err != nil {
return err
}
// apply routes
for _, route := range s.routes {
s.mux.HandlePath(route.Method, route.Path, route.Handler)
}
s.HTTPServer.Addr = fmt.Sprintf(":%d", httpPort)
s.HTTPServer.Handler = s.httpHandler(s.mux)
s.HTTPServer.RegisterOnShutdown(s.shutdownFunc)
return s.HTTPServer.ListenAndServe()
}
// Stop stops the microservice gracefully
func (s *Service) Stop() {
// disable keep-alives on existing connections
s.HTTPServer.SetKeepAlivesEnabled(false)
// we wait for a duration of preShutdownDelay for running goroutines to finish their jobs
if s.preShutdownDelay > 0 {
s.logger.Printf("Waiting for %v before shutdown starts", s.preShutdownDelay)
time.Sleep(s.preShutdownDelay)
}
// gracefully stop gRPC server first
s.GRPCServer.GracefulStop()
var ctx, cancel = context.WithTimeout(
context.Background(),
s.shutdownTimeout,
)
defer cancel()
// gracefully stop http server
s.HTTPServer.Shutdown(ctx)
}
// AddRoutes adds additional routes
func (s *Service) AddRoutes(routes ...Route) {
s.routes = append(s.routes, routes...)
}
// HasRoute checks if a route already exists
func (s *Service) HasRoute(route Route) bool {
for _, r := range s.routes {
if r.Method == route.Method && r.Path == route.Path {
return true
}
}
return false
}
// ServeFile serves a file
func (s *Service) ServeFile(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
dir := s.staticDir
if s.staticDir == "" {
dir, _ = os.Getwd()
}
// check if the file exists and fobid showing directory
path := filepath.Join(dir, r.URL.Path)
if fileInfo, err := os.Stat(path); os.IsNotExist(err) || fileInfo.IsDir() {
http.NotFound(w, r)
return
}
http.ServeFile(w, r, path)
}