forked from hashicorp/vault
/
request_forwarding.go
377 lines (317 loc) · 11 KB
/
request_forwarding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
package vault
import (
"bytes"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/vault/helper/forwarding"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc"
)
const (
clusterListenerAcceptDeadline = 500 * time.Millisecond
)
// Starts the listeners and servers necessary to handle forwarded requests
func (c *Core) startForwarding() error {
// Clean up in case we have transitioned from a client to a server
c.clearForwardingClients()
// Get our base handler (for our RPC server) and our wrapped handler (for
// straight HTTP/2 forwarding)
baseHandler, wrappedHandler := c.clusterHandlerSetupFunc()
// Get our TLS config
tlsConfig, err := c.ClusterTLSConfig()
if err != nil {
c.logger.Error("core/startClusterListener: failed to get tls configuration", "error", err)
return err
}
// The server supports all of the possible protos
tlsConfig.NextProtos = []string{"h2", "req_fw_sb-act_v1"}
// Create our RPC server and register the request handler server
c.rpcServer = grpc.NewServer()
RegisterRequestForwardingServer(c.rpcServer, &forwardedRequestRPCServer{
core: c,
handler: baseHandler,
})
// Create the HTTP/2 server that will be shared by both RPC and regular
// duties. Doing it this way instead of listening via the server and gRPC
// allows us to re-use the same port via ALPN. We can just tell the server
// to serve a given conn and which handler to use.
fws := &http2.Server{}
// Shutdown coordination logic
var shutdown uint32
shutdownWg := &sync.WaitGroup{}
for _, addr := range c.clusterListenerAddrs {
shutdownWg.Add(1)
// Force a local resolution to avoid data races
laddr := addr
// Start our listening loop
go func() {
defer shutdownWg.Done()
c.logger.Info("core/startClusterListener: starting listener")
// Create a TCP listener. We do this separately and specifically
// with TCP so that we can set deadlines.
tcpLn, err := net.ListenTCP("tcp", laddr)
if err != nil {
c.logger.Error("core/startClusterListener: error starting listener", "error", err)
return
}
// Wrap the listener with TLS
tlsLn := tls.NewListener(tcpLn, tlsConfig)
if c.logger.IsInfo() {
c.logger.Info("core/startClusterListener: serving cluster requests", "cluster_listen_address", tlsLn.Addr())
}
for {
if atomic.LoadUint32(&shutdown) > 0 {
tlsLn.Close()
return
}
// Set the deadline for the accept call. If it passes we'll get
// an error, causing us to check the condition at the top
// again.
tcpLn.SetDeadline(time.Now().Add(clusterListenerAcceptDeadline))
// Accept the connection
conn, err := tlsLn.Accept()
if err != nil {
if conn != nil {
conn.Close()
}
continue
}
// Type assert to TLS connection and handshake to populate the
// connection state
tlsConn := conn.(*tls.Conn)
err = tlsConn.Handshake()
if err != nil {
if c.logger.IsDebug() {
c.logger.Debug("core/startClusterListener/Accept: error handshaking", "error", err)
}
if conn != nil {
conn.Close()
}
continue
}
switch tlsConn.ConnectionState().NegotiatedProtocol {
case "h2":
c.logger.Debug("core/startClusterListener/Accept: got h2 connection")
go fws.ServeConn(conn, &http2.ServeConnOpts{
Handler: wrappedHandler,
})
case "req_fw_sb-act_v1":
c.logger.Debug("core/startClusterListener/Accept: got req_fw_sb-act_v1 connection")
go fws.ServeConn(conn, &http2.ServeConnOpts{
Handler: c.rpcServer,
})
default:
c.logger.Debug("core/startClusterListener/Accept: unknown negotiated protocol")
conn.Close()
continue
}
}
}()
}
// This is in its own goroutine so that we don't block the main thread, and
// thus we use atomic and channels to coordinate
go func() {
// If we get told to shut down...
<-c.clusterListenerShutdownCh
// Stop the RPC server
c.rpcServer.Stop()
c.logger.Info("core/startClusterListener: shutting down listeners")
// Set the shutdown flag. This will cause the listeners to shut down
// within the deadline in clusterListenerAcceptDeadline
atomic.StoreUint32(&shutdown, 1)
// Wait for them all to shut down
shutdownWg.Wait()
c.logger.Info("core/startClusterListener: listeners successfully shut down")
// Tell the main thread that shutdown is done.
c.clusterListenerShutdownSuccessCh <- struct{}{}
}()
return nil
}
// refreshRequestForwardingConnection ensures that the client/transport are
// alive and that the current active address value matches the most
// recently-known address.
func (c *Core) refreshRequestForwardingConnection(clusterAddr string) error {
c.requestForwardingConnectionLock.Lock()
defer c.requestForwardingConnectionLock.Unlock()
// It's nil but we don't have an address anyways, so exit
if c.requestForwardingConnection == nil && clusterAddr == "" {
return nil
}
// NOTE: We don't fast path the case where we have a connection because the
// address is the same, because the cert/key could have changed if the
// active node ended up being the same node. Before we hit this function in
// Leader() we'll have done a hash on the advertised info to ensure that we
// won't hit this function unnecessarily anyways.
// Disabled, potentially, so clean up anything that might be around.
if clusterAddr == "" {
c.clearForwardingClients()
return nil
}
clusterURL, err := url.Parse(clusterAddr)
if err != nil {
c.logger.Error("core/refreshRequestForwardingConnection: error parsing cluster address", "error", err)
return err
}
switch os.Getenv("VAULT_USE_GRPC_REQUEST_FORWARDING") {
case "":
// Set up normal HTTP forwarding handling
tlsConfig, err := c.ClusterTLSConfig()
if err != nil {
c.logger.Error("core/refreshRequestForwardingConnection: error fetching cluster tls configuration", "error", err)
return err
}
tp := &http2.Transport{
TLSClientConfig: tlsConfig,
}
c.requestForwardingConnection = &activeConnection{
transport: tp,
clusterAddr: clusterAddr,
}
default:
// Set up grpc forwarding handling
// It's not really insecure, but we have to dial manually to get the
// ALPN header right. It's just "insecure" because GRPC isn't managing
// the TLS state.
ctx, cancelFunc := context.WithCancel(context.Background())
c.rpcClientConnCancelFunc = cancelFunc
c.rpcClientConn, err = grpc.DialContext(ctx, clusterURL.Host, grpc.WithDialer(c.getGRPCDialer()), grpc.WithInsecure())
if err != nil {
c.logger.Error("core/refreshRequestForwardingConnection: err setting up rpc client", "error", err)
return err
}
c.rpcForwardingClient = NewRequestForwardingClient(c.rpcClientConn)
}
return nil
}
func (c *Core) clearForwardingClients() {
if c.requestForwardingConnection != nil {
c.requestForwardingConnection.transport.CloseIdleConnections()
c.requestForwardingConnection = nil
}
c.rpcForwardingClient = nil
if c.rpcClientConnCancelFunc != nil {
c.rpcClientConnCancelFunc()
c.rpcClientConnCancelFunc = nil
}
if c.rpcClientConn != nil {
c.rpcClientConn.Close()
c.rpcClientConn = nil
}
}
// ForwardRequest forwards a given request to the active node and returns the
// response.
func (c *Core) ForwardRequest(req *http.Request) (int, http.Header, []byte, error) {
c.requestForwardingConnectionLock.RLock()
defer c.requestForwardingConnectionLock.RUnlock()
switch os.Getenv("VAULT_USE_GRPC_REQUEST_FORWARDING") {
case "":
if c.requestForwardingConnection == nil {
return 0, nil, nil, ErrCannotForward
}
if c.requestForwardingConnection.clusterAddr == "" {
return 0, nil, nil, ErrCannotForward
}
freq, err := forwarding.GenerateForwardedHTTPRequest(req, c.requestForwardingConnection.clusterAddr+"/cluster/local/forwarded-request")
if err != nil {
c.logger.Error("core/ForwardRequest: error creating forwarded request", "error", err)
return 0, nil, nil, fmt.Errorf("error creating forwarding request")
}
//resp, err := c.requestForwardingConnection.Do(freq)
resp, err := c.requestForwardingConnection.transport.RoundTrip(freq)
if err != nil {
return 0, nil, nil, err
}
defer resp.Body.Close()
// Read the body into a buffer so we can write it back out to the
// original requestor
buf := bytes.NewBuffer(nil)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return 0, nil, nil, err
}
return resp.StatusCode, resp.Header, buf.Bytes(), nil
default:
if c.rpcForwardingClient == nil {
return 0, nil, nil, ErrCannotForward
}
freq, err := forwarding.GenerateForwardedRequest(req)
if err != nil {
c.logger.Error("core/ForwardRequest: error creating forwarding RPC request", "error", err)
return 0, nil, nil, fmt.Errorf("error creating forwarding RPC request")
}
if freq == nil {
c.logger.Error("core/ForwardRequest: got nil forwarding RPC request")
return 0, nil, nil, fmt.Errorf("got nil forwarding RPC request")
}
resp, err := c.rpcForwardingClient.HandleRequest(context.Background(), freq, grpc.FailFast(true))
if err != nil {
c.logger.Error("core/ForwardRequest: error during forwarded RPC request", "error", err)
return 0, nil, nil, fmt.Errorf("error during forwarding RPC request")
}
var header http.Header
if resp.HeaderEntries != nil {
header = make(http.Header)
for k, v := range resp.HeaderEntries {
for _, j := range v.Values {
header.Add(k, j)
}
}
}
return int(resp.StatusCode), header, resp.Body, nil
}
}
// getGRPCDialer is used to return a dialer that has the correct TLS
// configuration. Otherwise gRPC tries to be helpful and stomps all over our
// NextProtos.
func (c *Core) getGRPCDialer() func(string, time.Duration) (net.Conn, error) {
return func(addr string, timeout time.Duration) (net.Conn, error) {
tlsConfig, err := c.ClusterTLSConfig()
if err != nil {
c.logger.Error("core/getGRPCDialer: failed to get tls configuration", "error", err)
return nil, err
}
tlsConfig.NextProtos = []string{"req_fw_sb-act_v1"}
dialer := &net.Dialer{
Timeout: timeout,
}
return tls.DialWithDialer(dialer, "tcp", addr, tlsConfig)
}
}
type forwardedRequestRPCServer struct {
core *Core
handler http.Handler
}
func (s *forwardedRequestRPCServer) HandleRequest(ctx context.Context, freq *forwarding.Request) (*forwarding.Response, error) {
// Parse an http.Request out of it
req, err := forwarding.ParseForwardedRequest(freq)
if err != nil {
return nil, err
}
// A very dummy response writer that doesn't follow normal semantics, just
// lets you write a status code (last written wins) and a body. But it
// meets the interface requirements.
w := forwarding.NewRPCResponseWriter()
s.handler.ServeHTTP(w, req)
resp := &forwarding.Response{
StatusCode: uint32(w.StatusCode()),
Body: w.Body().Bytes(),
}
header := w.Header()
if header != nil {
resp.HeaderEntries = make(map[string]*forwarding.HeaderEntry, len(header))
for k, v := range header {
resp.HeaderEntries[k] = &forwarding.HeaderEntry{
Values: v,
}
}
}
return resp, nil
}