-
Notifications
You must be signed in to change notification settings - Fork 2k
/
pool.go
553 lines (466 loc) · 12.5 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
package pool
import (
"container/list"
"fmt"
"io"
"log"
"net"
"net/rpc"
"sync"
"sync/atomic"
"time"
hclog "github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)
// NewClientCodec returns a new rpc.ClientCodec to be used to make RPC calls.
func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
}
// NewServerCodec returns a new rpc.ServerCodec to be used to handle RPCs.
func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
}
// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream net.Conn
codec rpc.ClientCodec
}
func (sc *StreamClient) Close() {
sc.stream.Close()
sc.codec.Close()
}
// Conn is a pooled connection to a Nomad server
type Conn struct {
refCount int32
shouldClose int32
addr net.Addr
session *yamux.Session
lastUsed atomic.Pointer[time.Time]
pool *ConnPool
clients *list.List
clientLock sync.Mutex
}
// markForUse does all the bookkeeping required to ready a connection for use,
// and ensure that active connections don't get reaped.
func (c *Conn) markForUse() {
now := time.Now()
c.lastUsed.Store(&now)
atomic.AddInt32(&c.refCount, 1)
}
// releaseUse is the complement of `markForUse`, to free up the reference count
func (c *Conn) releaseUse() {
refCount := atomic.AddInt32(&c.refCount, -1)
if refCount == 0 && atomic.LoadInt32(&c.shouldClose) == 1 {
c.Close()
}
}
func (c *Conn) Close() error {
return c.session.Close()
}
// getClient is used to get a cached or new client
func (c *Conn) getRPCClient() (*StreamClient, error) {
// Check for cached client
c.clientLock.Lock()
front := c.clients.Front()
if front != nil {
c.clients.Remove(front)
}
c.clientLock.Unlock()
if front != nil {
return front.Value.(*StreamClient), nil
}
// Open a new session
stream, err := c.session.Open()
if err != nil {
return nil, err
}
if _, err := stream.Write([]byte{byte(RpcNomad)}); err != nil {
stream.Close()
return nil, err
}
// Create a client codec
codec := NewClientCodec(stream)
// Return a new stream client
sc := &StreamClient{
stream: stream,
codec: codec,
}
return sc, nil
}
// returnClient is used when done with a stream
// to allow re-use by a future RPC
func (c *Conn) returnClient(client *StreamClient) {
didSave := false
c.clientLock.Lock()
if c.clients.Len() < c.pool.maxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {
c.clients.PushFront(client)
didSave = true
// If this is a Yamux stream, shrink the internal buffers so that
// we can GC the idle memory
if ys, ok := client.stream.(*yamux.Stream); ok {
ys.Shrink()
}
}
c.clientLock.Unlock()
if !didSave {
client.Close()
}
}
func (c *Conn) IsClosed() bool {
return c.session.IsClosed()
}
func (c *Conn) AcceptStream() (net.Conn, error) {
s, err := c.session.AcceptStream()
if err != nil {
return nil, err
}
c.markForUse()
return &incomingStream{
Stream: s,
parent: c,
}, nil
}
// incomingStream wraps yamux.Stream but frees the underlying yamux.Session
// when closed
type incomingStream struct {
*yamux.Stream
parent *Conn
}
func (s *incomingStream) Close() error {
err := s.Stream.Close()
// always release parent even if error
s.parent.releaseUse()
return err
}
// ConnPool is used to maintain a connection pool to other
// Nomad servers. This is used to reduce the latency of
// RPC requests between servers. It is only used to pool
// connections in the rpcNomad mode. Raft connections
// are pooled separately.
type ConnPool struct {
sync.Mutex
// logger is the logger to be used
logger *log.Logger
// The maximum time to keep a connection open
maxTime time.Duration
// The maximum number of open streams to keep
maxStreams int
// Pool maps an address to a open connection
pool map[string]*Conn
// limiter is used to throttle the number of connect attempts
// to a given address. The first thread will attempt a connection
// and put a channel in here, which all other threads will wait
// on to close.
limiter map[string]chan struct{}
// TLS wrapper
tlsWrap tlsutil.RegionWrapper
// Used to indicate the pool is shutdown
shutdown bool
shutdownCh chan struct{}
// connListener is used to notify a potential listener of a new connection
// being made.
connListener chan<- *Conn
}
// NewPool is used to make a new connection pool
// Maintain at most one connection per host, for up to maxTime.
// Set maxTime to 0 to disable reaping. maxStreams is used to control
// the number of idle streams allowed.
// If TLS settings are provided outgoing connections use TLS.
func NewPool(logger hclog.Logger, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper) *ConnPool {
pool := &ConnPool{
logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
maxTime: maxTime,
maxStreams: maxStreams,
pool: make(map[string]*Conn),
limiter: make(map[string]chan struct{}),
tlsWrap: tlsWrap,
shutdownCh: make(chan struct{}),
}
if maxTime > 0 {
go pool.reap()
}
return pool
}
// Shutdown is used to close the connection pool
func (p *ConnPool) Shutdown() error {
p.Lock()
defer p.Unlock()
for _, conn := range p.pool {
conn.Close()
}
p.pool = make(map[string]*Conn)
if p.shutdown {
return nil
}
if p.connListener != nil {
close(p.connListener)
p.connListener = nil
}
p.shutdown = true
close(p.shutdownCh)
return nil
}
// ReloadTLS reloads TLS configuration on the fly
func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) {
p.Lock()
defer p.Unlock()
oldPool := p.pool
for _, conn := range oldPool {
conn.Close()
}
p.pool = make(map[string]*Conn)
p.tlsWrap = tlsWrap
}
// SetConnListener is used to listen to new connections being made. The
// channel will be closed when the conn pool is closed or a new listener is set.
func (p *ConnPool) SetConnListener(l chan<- *Conn) {
p.Lock()
defer p.Unlock()
// Close the old listener
if p.connListener != nil {
close(p.connListener)
}
// Store the new listener
p.connListener = l
}
// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) acquire(region string, addr net.Addr) (*Conn, error) {
// Check to see if there's a pooled connection available. This is up
// here since it should the vastly more common case than the rest
// of the code here.
p.Lock()
c := p.pool[addr.String()]
if c != nil {
c.markForUse()
p.Unlock()
return c, nil
}
// If not (while we are still locked), set up the throttling structure
// for this address, which will make everyone else wait until our
// attempt is done.
var wait chan struct{}
var ok bool
if wait, ok = p.limiter[addr.String()]; !ok {
wait = make(chan struct{})
p.limiter[addr.String()] = wait
}
isLeadThread := !ok
p.Unlock()
// If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it.
if isLeadThread {
c, err := p.getNewConn(region, addr)
p.Lock()
delete(p.limiter, addr.String())
close(wait)
if err != nil {
p.Unlock()
return nil, err
}
p.pool[addr.String()] = c
// If there is a connection listener, notify them of the new connection.
if p.connListener != nil {
select {
case p.connListener <- c:
default:
}
}
p.Unlock()
return c, nil
}
// Otherwise, wait for the lead thread to attempt the connection
// and use what's in the pool at that point.
select {
case <-p.shutdownCh:
return nil, fmt.Errorf("rpc error: shutdown")
case <-wait:
}
// See if the lead thread was able to get us a connection.
p.Lock()
if c := p.pool[addr.String()]; c != nil {
c.markForUse()
p.Unlock()
return c, nil
}
p.Unlock()
return nil, fmt.Errorf("rpc error: lead thread didn't get connection")
}
// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(region string, addr net.Addr) (*Conn, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
return nil, err
}
// Cast to TCPConn
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetKeepAlive(true)
tcp.SetNoDelay(true)
}
// Check if TLS is enabled
if p.tlsWrap != nil {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(RpcTLS)}); err != nil {
conn.Close()
return nil, err
}
// Wrap the connection in a TLS client
tlsConn, err := p.tlsWrap(region, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tlsConn
}
// Write the multiplex byte to set the mode
if _, err := conn.Write([]byte{byte(RpcMultiplexV2)}); err != nil {
conn.Close()
return nil, err
}
// Setup the logger
conf := yamux.DefaultConfig()
conf.LogOutput = nil
conf.Logger = p.logger
// Create a multiplexed session
session, err := yamux.Client(conn, conf)
if err != nil {
conn.Close()
return nil, err
}
// Wrap the connection
c := &Conn{
refCount: 1,
addr: addr,
session: session,
clients: list.New(),
lastUsed: atomic.Pointer[time.Time]{},
pool: p,
}
now := time.Now()
c.lastUsed.Store(&now)
return c, nil
}
// clearConn is used to clear any cached connection, potentially in response to
// an error
func (p *ConnPool) clearConn(conn *Conn) {
// Ensure returned streams are closed
atomic.StoreInt32(&conn.shouldClose, 1)
// Clear from the cache
p.Lock()
if c, ok := p.pool[conn.addr.String()]; ok && c == conn {
delete(p.pool, conn.addr.String())
}
p.Unlock()
// Close down immediately if idle
if refCount := atomic.LoadInt32(&conn.refCount); refCount == 0 {
conn.Close()
}
}
// getClient is used to get a usable client for an address
func (p *ConnPool) getRPCClient(region string, addr net.Addr) (*Conn, *StreamClient, error) {
retries := 0
START:
// Try to get a conn first
conn, err := p.acquire(region, addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
}
// Get a client
client, err := conn.getRPCClient()
if err != nil {
p.clearConn(conn)
conn.releaseUse()
// Try to redial, possible that the TCP session closed due to timeout
if retries == 0 {
retries++
goto START
}
return nil, nil, fmt.Errorf("failed to start stream: %v", err)
}
return conn, client, nil
}
// StreamingRPC is used to make an streaming RPC call. Callers must
// close the connection when done.
func (p *ConnPool) StreamingRPC(region string, addr net.Addr) (net.Conn, error) {
conn, err := p.acquire(region, addr)
if err != nil {
return nil, fmt.Errorf("failed to get conn: %v", err)
}
s, err := conn.session.Open()
if err != nil {
return nil, fmt.Errorf("failed to open a streaming connection: %v", err)
}
if _, err := s.Write([]byte{byte(RpcStreaming)}); err != nil {
conn.Close()
return nil, err
}
return s, nil
}
// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interface{}, reply interface{}) error {
// Get a usable client
conn, sc, err := p.getRPCClient(region, addr)
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
defer conn.releaseUse()
// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
if err != nil {
sc.Close()
// If we read EOF, the session is toast. Clear it and open a
// new session next time
// See https://github.com/hashicorp/consul/blob/v1.6.3/agent/pool/pool.go#L471-L477
if helper.IsErrEOF(err) {
p.clearConn(conn)
}
// If the error is an RPC Coded error
// return the coded error without wrapping
if structs.IsErrRPCCoded(err) {
return err
}
// TODO wrap with RPCCoded error instead
return fmt.Errorf("rpc error: %w", err)
}
// Done with the connection
conn.returnClient(sc)
return nil
}
// Reap is used to close conns open over maxTime
func (p *ConnPool) reap() {
for {
// Sleep for a while
select {
case <-p.shutdownCh:
return
case <-time.After(time.Second):
}
// Reap all old conns
p.Lock()
var removed []string
now := time.Now()
for host, conn := range p.pool {
// Skip recently used connections
if now.Sub(*conn.lastUsed.Load()) < p.maxTime {
continue
}
// Skip connections with active streams
if atomic.LoadInt32(&conn.refCount) > 0 {
continue
}
// Close the conn
conn.Close()
// Remove from pool
removed = append(removed, host)
}
for _, host := range removed {
delete(p.pool, host)
}
p.Unlock()
}
}