-
Notifications
You must be signed in to change notification settings - Fork 9
/
cmux.go
115 lines (91 loc) · 2.23 KB
/
cmux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package cmux
import (
"context"
"net"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"github.com/air-go/rpc/library/logger"
"github.com/air-go/rpc/server"
serverGRPC "github.com/air-go/rpc/server/grpc"
)
type (
RegisterHTTP func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error
)
type Option struct {
logger logger.Logger
httpHandler http.Handler
}
type OptionFunc func(*Option)
func WithHTTPHandler(h http.Handler) OptionFunc {
return func(s *Option) { s.httpHandler = h }
}
func defaultOption() *Option {
return &Option{httpHandler: http.NotFoundHandler()}
}
type MuxServer struct {
*Option
ctx context.Context
endpoint string
grpcRegisters []serverGRPC.RegisterGRPC
tcpMux cmux.CMux
}
var _ server.Server = (*MuxServer)(nil)
func NewMux(ctx context.Context, endpoint string, grpcRegisters []serverGRPC.RegisterGRPC, opts ...OptionFunc) *MuxServer {
if len(grpcRegisters) < 1 {
panic("len(registers) < 1")
}
option := defaultOption()
for _, o := range opts {
o(option)
}
s := &MuxServer{
Option: option,
ctx: ctx,
grpcRegisters: grpcRegisters,
endpoint: endpoint,
}
return s
}
func (s *MuxServer) Start() (err error) {
listener, err := net.Listen("tcp", s.endpoint)
if err != nil {
return
}
s.tcpMux = cmux.New(listener)
go s.startGRPC()
go s.startHTTP()
return s.tcpMux.Serve()
}
func (s *MuxServer) startGRPC() {
grpcServer := grpc.NewServer(serverGRPC.NewServerOption(serverGRPC.ServerOptionLogger(s.logger))...)
for _, r := range s.grpcRegisters {
r(grpcServer)
}
serverGRPC.RegisterTools(grpcServer)
listener := s.tcpMux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc"))
if err := grpcServer.Serve(listener); err != nil {
panic(err)
}
}
func (s *MuxServer) startHTTP() {
var err error
defer func() {
if err != nil {
panic(err)
}
}()
httpServer := &http.Server{
Addr: s.endpoint,
Handler: s.httpHandler,
}
listener := s.tcpMux.Match(cmux.HTTP1Fast())
if err = httpServer.Serve(listener); err != nil {
return
}
}
func (s *MuxServer) Close() (err error) {
s.tcpMux.Close()
return
}