-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc.go
113 lines (101 loc) · 3.24 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package grpc
import (
"context"
"net"
"net/http"
"time"
grpcutils "github.com/ecumenos-social/grpc-utils"
pbv1 "github.com/ecumenos-social/schemas/proto/gen/networkwarden/v1"
"github.com/ecumenos-social/toolkitfx"
"github.com/ecumenos-social/toolkitfx/fxgrpc"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/heptiolabs/healthcheck"
"go.uber.org/fx"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
func NewGRPCServer(lc fx.Lifecycle, config *fxgrpc.Config, sn toolkitfx.ServiceName) *fxgrpc.GRPCServer {
handler := NewHandler()
grpcServer := fxgrpc.GRPCServer{}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
server := grpcutils.NewServer(string(sn), net.JoinHostPort(config.GRPC.Host, config.GRPC.Port))
server.Init(
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: config.GRPC.KeepAliveEnforcementMinTime,
PermitWithoutStream: config.GRPC.KeepAliveEnforcementPermitWithoutStream,
}),
grpcutils.ValidatorServerOption(),
grpcutils.RecoveryServerOption(),
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: config.GRPC.MaxConnectionAge}),
)
pbv1.RegisterAdminServiceServer(server.Server, handler)
grpcServer.Server = server
return nil
},
OnStop: func(ctx context.Context) error {
return nil
},
})
return &grpcServer
}
func NewGatewayHandler() *fxgrpc.HTTPGatewayHandler {
return &fxgrpc.HTTPGatewayHandler{
// TODO: uncomment when endpoints are added
// Handler: pbv1.RegisterAdminServiceHandler,
}
}
func NewLivenessGateway() *fxgrpc.LivenessGatewayHandler {
health := healthcheck.NewHandler()
health.AddLivenessCheck("healthcheck", func() error { return nil })
return &fxgrpc.LivenessGatewayHandler{Handler: health}
}
func NewHTTPGateway(
lc fx.Lifecycle,
s fx.Shutdowner,
logger *zap.Logger,
cfg *fxgrpc.Config,
g *fxgrpc.HTTPGatewayHandler,
) error {
httpAddr := net.JoinHostPort(cfg.HTTPGateway.Host, cfg.HTTPGateway.Port)
mux := runtime.NewServeMux()
conn := grpcutils.NewClientConnection(net.JoinHostPort(cfg.GRPC.Host, cfg.GRPC.Port))
zapConf := zap.NewProductionConfig()
zapConf.Level.SetLevel(zap.ErrorLevel)
errLogger, err := zapConf.Build()
if err != nil {
errLogger = logger.With()
}
_ = conn.Dial(grpcutils.DefaultDialOpts(errLogger)...)
if err := g.Handler(context.Background(), mux, conn.Connection); err != nil {
logger.Error("failed to register mapping service handler", zap.Error(err))
}
var httpServer *http.Server
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
go func() {
httpServer = &http.Server{Addr: httpAddr, Handler: mux}
logger.Info("starting HTTP gateway...", zap.String("addr", httpAddr))
err = httpServer.ListenAndServe()
if err != nil {
logger.Error("failed to start http server", zap.Error(err))
_ = s.Shutdown()
}
}()
return nil
},
OnStop: func(ctx context.Context) error {
_ = conn.CleanUp()
if httpServer != nil {
timeout, can := context.WithTimeout(context.Background(), 10*time.Second)
defer can()
if err := httpServer.Shutdown(timeout); err != nil {
logger.Error("stopped http server after gRPC failure", zap.Error(err))
}
}
return nil
},
})
return nil
}