-
Notifications
You must be signed in to change notification settings - Fork 2k
/
raft_rpc.go
134 lines (115 loc) · 3.01 KB
/
raft_rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package nomad
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/raft"
)
// RaftLayer implements the raft.StreamLayer interface,
// so that we can use a single RPC layer for Raft and Nomad
type RaftLayer struct {
// Addr is the listener address to return
addr net.Addr
// connCh is used to accept connections
connCh chan net.Conn
// TLS wrapper
tlsWrap tlsutil.Wrapper
tlsWrapLock sync.RWMutex
// Tracks if we are closed
closed bool
closeCh chan struct{}
closeLock sync.Mutex
}
// NewRaftLayer is used to initialize a new RaftLayer which can
// be used as a StreamLayer for Raft. If a tlsConfig is provided,
// then the connection will use TLS.
func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer {
layer := &RaftLayer{
addr: addr,
connCh: make(chan net.Conn),
tlsWrap: tlsWrap,
closeCh: make(chan struct{}),
}
return layer
}
// Handoff is used to hand off a connection to the
// RaftLayer. This allows it to be Accept()'ed
func (l *RaftLayer) Handoff(ctx context.Context, c net.Conn) error {
select {
case l.connCh <- c:
return nil
case <-l.closeCh:
return fmt.Errorf("Raft RPC layer closed")
case <-ctx.Done():
return nil
}
}
// Accept is used to return connection which are
// dialed to be used with the Raft layer
func (l *RaftLayer) Accept() (net.Conn, error) {
select {
case conn := <-l.connCh:
return conn, nil
case <-l.closeCh:
return nil, fmt.Errorf("Raft RPC layer closed")
}
}
// Close is used to stop listening for Raft connections
func (l *RaftLayer) Close() error {
l.closeLock.Lock()
defer l.closeLock.Unlock()
if !l.closed {
l.closed = true
close(l.closeCh)
}
return nil
}
// getTLSWrapper is used to retrieve the current TLS wrapper
func (l *RaftLayer) getTLSWrapper() tlsutil.Wrapper {
l.tlsWrapLock.RLock()
defer l.tlsWrapLock.RUnlock()
return l.tlsWrap
}
// ReloadTLS swaps the TLS wrapper. This is useful when upgrading or
// downgrading TLS connections.
func (l *RaftLayer) ReloadTLS(tlsWrap tlsutil.Wrapper) {
l.tlsWrapLock.Lock()
defer l.tlsWrapLock.Unlock()
l.tlsWrap = tlsWrap
}
// Addr is used to return the address of the listener
func (l *RaftLayer) Addr() net.Addr {
return l.addr
}
// Dial is used to create a new outgoing connection
func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", string(address), timeout)
if err != nil {
return nil, err
}
tlsWrapper := l.getTLSWrapper()
// Check for tls mode
if tlsWrapper != nil {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(pool.RpcTLS)}); err != nil {
conn.Close()
return nil, err
}
// Wrap the connection in a TLS client
conn, err = tlsWrapper(conn)
if err != nil {
return nil, err
}
}
// Write the Raft byte to set the mode
_, err = conn.Write([]byte{byte(pool.RpcRaft)})
if err != nil {
conn.Close()
return nil, err
}
return conn, err
}