/
server.go
211 lines (179 loc) · 6.46 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright © 2016 Asteris, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"context"
"net"
"net/http"
"net/url"
"golang.org/x/sync/errgroup"
"github.com/asteris-llc/converge/helpers/logging"
"github.com/asteris-llc/converge/rpc/pb"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/pkg/errors"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
)
// Server represents the configuration for a Converge RPC server
type Server struct {
// Security
Security *Security
// Serving
ResourceRoot string
EnableBinaryDownload bool
}
// newGRPC constructs all GRPC servers and handlers
func (s *Server) newGRPC() (*grpc.Server, error) {
server := grpc.NewServer(s.Security.Server()...)
pb.RegisterExecutorServer(server, &executor{})
pb.RegisterGrapherServer(server, &grapher{})
pb.RegisterResourceHostServer(
server,
&resourceHost{
root: s.ResourceRoot,
enableBinaryDownload: s.EnableBinaryDownload,
},
)
pb.RegisterInfoServer(server, &infoServer{})
return server, nil
}
// NewREST constructs a new REST gateway
func (s *Server) newREST(ctx context.Context, addr *url.URL) (*http.Server, error) {
opts, err := s.Security.Client()
if err != nil {
return nil, errors.Wrap(err, "could not generate REST gateway security options")
}
mux := runtime.NewServeMux(
runtime.WithMarshalerOption("text/plain", newContentMarshaler()),
)
if err := pb.RegisterExecutorHandlerFromEndpoint(ctx, mux, addr.Host, opts); err != nil {
return nil, errors.Wrap(err, "could not register executor")
}
if err := pb.RegisterResourceHostHandlerFromEndpoint(ctx, mux, addr.Host, opts); err != nil {
return nil, errors.Wrap(err, "could not register resource host")
}
if err := pb.RegisterGrapherHandlerFromEndpoint(ctx, mux, addr.Host, opts); err != nil {
return nil, errors.Wrap(err, "could not register grapher")
}
if err := pb.RegisterInfoHandlerFromEndpoint(ctx, mux, addr.Host, opts); err != nil {
return nil, errors.Wrap(err, "could not register info server")
}
handler := http.Handler(mux)
if s.Security.Token != "" {
handler = NewJWTAuth(s.Security.Token).Protect(handler)
}
return &http.Server{
Handler: handler,
}, nil
}
// Listen on the given address for all server-related duties
func (s *Server) Listen(ctx context.Context, addr *url.URL) error {
logger := logging.GetLogger(ctx).WithField("addr", addr)
// set up a context within the waitgroup
wg, ctx := errgroup.WithContext(ctx)
// set up listeners
//
// We'll start with a regular net.Listener. This is going to be our entry
// point into the whole system. If we're using TLS/SSL, we'll wrap the
// original listener in a tls listener implementing the same interface.
// s.Security takes care of this.
//
// We need to care about it here because wrapping the listener here means
// that we can terminate SSL at a single point.
//
// One caveat: the REST interface is actually an automatically-generated
// client of the GRPC interface. This means that we have to require both
// server and client configuration to use the server. On the other hand, it
// means that *all* communication is secured when any of it is. Anything
// that talks to either server component will be encrypted over the wire.
lis, err := net.Listen("tcp", addr.Host)
if err != nil {
return errors.Wrap(err, "failed to listen")
}
wg.Go(func() error {
logger.Debug("waiting to close listener")
<-ctx.Done()
logger.Info("closing listener")
return lis.Close()
})
if s.Security.UseSSL {
logger.Debug("wrapping insecure listener in secure listener")
lis, err = s.Security.WrapListener(lis)
if err != nil {
return errors.Wrap(err, "could not initialize secure listener")
}
}
mux := cmux.New(lis)
// start the GRPC listener and server
//
// Each of the tasks in the workers must handle if the errors they received
// are any form of use-after-close error. This happens on shutdown for
// cleanup purposes. In most of these cases, receiving an error means we're
// already cleaned up so we just need to check which error it is.
wg.Go(func() error {
grpcSrv, err := s.newGRPC()
if err != nil {
return errors.Wrap(err, "failed to create grpc server")
}
lis := mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
logger.Info("serving GRPC")
err = grpcSrv.Serve(lis)
logger.Debug("finished serving GRPC")
if err != nil && err != cmux.ErrListenerClosed {
return errors.Wrap(err, "failed to serve GRPC")
}
return nil
})
// start the REST gateway listener and server
//
// Same cancellation semantics as GRPC listeners.
wg.Go(func() error {
restSrv, err := s.newREST(ctx, addr)
if err != nil {
return errors.Wrap(err, "failed to create REST server")
}
logger.Info("serving REST")
err = restSrv.Serve(mux.Match(cmux.HTTP1()))
logger.Debug("finished serving REST")
if err != nil && err != cmux.ErrListenerClosed {
return errors.Wrap(err, "failed to serve REST")
}
return nil
})
// start our cmux listener
//
// This is the "master start" switch. If our mux isn't serving, no traffic
// will flow to either GRPC or the REST gateway.
wg.Go(func() error {
logger.Debug("multiplexing")
err := mux.Serve()
if err != nil && !IsClosedNetworkConnErr(err) {
return errors.Wrap(err, "failed to multiplex")
}
return nil
})
// wait for all listeners to return. Reminder: the semantics of errgroup
// mean that the *first* error that returns will be returned here. As of
// the time of this comment, the server will immediately log a fatal line
// and exit if it receives an error here, so all the error handling possible
// should be done in this method.
return wg.Wait()
}
// IsClosedNetworkConnErr detects if an error is the use of a close network connection
func IsClosedNetworkConnErr(err error) bool {
opErr, ok := err.(*net.OpError)
// TODO: this feels brittle, but it seems to be the only way since net
// doesn't export a similar method.
return ok && opErr.Err.Error() == "use of closed network connection"
}