-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.go
167 lines (140 loc) · 3.94 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package grpc
import (
"context"
"fmt"
"github.com/byorty/enterprise-application/pkg/common/adapter/application"
"github.com/byorty/enterprise-application/pkg/common/adapter/log"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net"
"net/http"
"reflect"
"sort"
)
type Middleware struct {
Priority int
GrpcOption grpc.UnaryServerInterceptor
MuxOption runtime.ServeMuxOption
}
type ByPriority []Middleware
func (b ByPriority) Len() int {
return len(b)
}
func (b ByPriority) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}
func (b ByPriority) Less(i, j int) bool {
return b[i].Priority > b[j].Priority
}
type MiddlewareOut struct {
fx.Out
GrpcMiddleware Middleware `group:"grpc_middleware"`
MuxMiddleware Middleware `group:"mux_middleware"`
}
type Server interface {
Register(descriptor Descriptor) error
Start() error
}
type Descriptor struct {
Server interface{}
GRPCRegistrar interface{}
GRPCGatewayRegistrar func(context.Context, *runtime.ServeMux, string, []grpc.DialOption) error
MethodDescriptors []MethodDescriptor
}
type FxServerIn struct {
fx.In
Ctx context.Context
Logger log.Logger
ConfigProvider application.Provider
GrpcMiddlewares []Middleware `group:"grpc_middleware"`
MuxMiddlewares []Middleware `group:"mux_middleware"`
}
func NewFxServer(
in FxServerIn,
) (Server, error) {
var cfg Config
err := in.ConfigProvider.PopulateByKey("server", &cfg)
if err != nil {
return nil, err
}
sort.Sort(ByPriority(in.MuxMiddlewares))
sort.Sort(ByPriority(in.GrpcMiddlewares))
interceptors := make([]grpc.UnaryServerInterceptor, 0)
for _, middleware := range in.GrpcMiddlewares {
if middleware.GrpcOption != nil {
interceptors = append(interceptors, middleware.GrpcOption)
}
}
serverMuxOptions := make([]runtime.ServeMuxOption, 0)
for _, middleware := range in.MuxMiddlewares {
if middleware.MuxOption != nil {
serverMuxOptions = append(serverMuxOptions, middleware.MuxOption)
}
}
srv := &server{
ctx: in.Ctx,
cfg: cfg,
logger: in.Logger.Named("grpc"),
grpcServer: grpc.NewServer(
grpc.MaxRecvMsgSize(cfg.MaxReceiveMessageLength),
grpc.MaxSendMsgSize(cfg.MaxSendMessageLength),
grpc.ChainUnaryInterceptor(interceptors...),
),
mux: runtime.NewServeMux(
serverMuxOptions...,
),
opts: []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(cfg.MaxReceiveMessageLength),
grpc.MaxCallSendMsgSize(cfg.MaxSendMessageLength),
),
},
errors: make(chan error, 1),
}
return srv, nil
}
type server struct {
ctx context.Context
logger log.Logger
cfg Config
grpcServer *grpc.Server
gatewayServer *http.Server
mux *runtime.ServeMux
opts []grpc.DialOption
errors chan error
}
func (s *server) Register(descriptor Descriptor) error {
reflect.ValueOf(descriptor.GRPCRegistrar).Call([]reflect.Value{
reflect.ValueOf(s.grpcServer),
reflect.ValueOf(descriptor.Server),
})
if descriptor.GRPCGatewayRegistrar != nil {
return descriptor.GRPCGatewayRegistrar(s.ctx, s.mux, fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.GrpcPort), s.opts)
}
return nil
}
func (s *server) Start() error {
go func(logger log.Logger) {
netAddress := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.GrpcPort)
logger.Infof("start server at %s", netAddress)
socket, err := net.Listen("tcp", netAddress)
if err != nil {
s.errors <- err
return
}
s.errors <- s.grpcServer.Serve(socket)
}(s.logger.Named("grpc_server"))
go func(logger log.Logger) {
netAddress := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.HttpPort)
s.gatewayServer = &http.Server{
Addr: netAddress,
Handler: s.mux,
}
logger.Infof("start gateway at %s", netAddress)
s.errors <- s.gatewayServer.ListenAndServe()
}(s.logger.Named("http_server"))
return <-s.errors
}