-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
wanfed.go
163 lines (138 loc) · 4.06 KB
/
wanfed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package wanfed
import (
"context"
"encoding/binary"
"errors"
"fmt"
"net"
"strings"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
)
const (
// GossipPacketMaxIdleTime controls how long we keep an idle connection
// open to a server.
//
// Conceptually similar to: agent/consul/server.go:serverRPCCache
GossipPacketMaxIdleTime = 2 * time.Minute
// GossipPacketMaxByteSize is the maximum allowed size of a packet
// forwarded via wanfed. This is 4MB which should be way bigger than serf
// or memberlist allow practically so it should never be hit in practice.
GossipPacketMaxByteSize = 4 * 1024 * 1024
)
type MeshGatewayResolver func(datacenter string) string
type IngestionAwareTransport interface {
memberlist.NodeAwareTransport
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
IngestStream(conn net.Conn) error
}
func NewTransport(
tlsConfigurator *tlsutil.Configurator,
transport IngestionAwareTransport,
datacenter string,
gwResolver MeshGatewayResolver,
) (*Transport, error) {
if tlsConfigurator == nil {
return nil, errors.New("wanfed: tlsConfigurator is nil")
}
if gwResolver == nil {
return nil, errors.New("wanfed: gwResolver is nil")
}
cp, err := newConnPool(GossipPacketMaxIdleTime)
if err != nil {
return nil, err
}
t := &Transport{
IngestionAwareTransport: transport,
tlsConfigurator: tlsConfigurator,
datacenter: datacenter,
gwResolver: gwResolver,
pool: cp,
}
return t, nil
}
type Transport struct {
IngestionAwareTransport
tlsConfigurator *tlsutil.Configurator
datacenter string
gwResolver MeshGatewayResolver
pool *connPool
}
var _ memberlist.NodeAwareTransport = (*Transport)(nil)
// Shutdown implements memberlist.Transport.
func (t *Transport) Shutdown() error {
err1 := t.pool.Close()
err2 := t.IngestionAwareTransport.Shutdown()
if err2 != nil {
// the more important error is err2
return err2
}
if err1 != nil {
return err1
}
return nil
}
// WriteToAddress implements memberlist.NodeAwareTransport.
func (t *Transport) WriteToAddress(b []byte, addr memberlist.Address) (time.Time, error) {
node, dc, err := SplitNodeName(addr.Name)
if err != nil {
return time.Time{}, err
}
if dc != t.datacenter {
dialFunc := func() (net.Conn, error) {
return t.dial(dc, node, pool.ALPN_WANGossipPacket)
}
conn, err := t.pool.AcquireOrDial(addr.Name, dialFunc)
if err != nil {
return time.Time{}, err
}
defer conn.ReturnOrClose()
// Send the length first.
if err := binary.Write(conn, binary.BigEndian, uint32(len(b))); err != nil {
conn.MarkFailed()
return time.Time{}, err
}
if _, err = conn.Write(b); err != nil {
conn.MarkFailed()
return time.Time{}, err
}
return time.Now(), nil
}
return t.IngestionAwareTransport.WriteToAddress(b, addr)
}
// DialAddressTimeout implements memberlist.NodeAwareTransport.
func (t *Transport) DialAddressTimeout(addr memberlist.Address, timeout time.Duration) (net.Conn, error) {
node, dc, err := SplitNodeName(addr.Name)
if err != nil {
return nil, err
}
if dc != t.datacenter {
return t.dial(dc, node, pool.ALPN_WANGossipStream)
}
return t.IngestionAwareTransport.DialAddressTimeout(addr, timeout)
}
func (t *Transport) dial(dc, nodeName, nextProto string) (net.Conn, error) {
conn, _, err := pool.DialRPCViaMeshGateway(
context.Background(),
dc,
nodeName,
nil, // TODO(rb): thread source address through here?
t.tlsConfigurator.OutgoingALPNRPCWrapper(),
nextProto,
true,
t.gwResolver,
)
return conn, err
}
// SplitNodeName splits a node name as it would be represented in
// serf/memberlist in the WAN pool of the form "<short-node-name>.<datacenter>"
// like "nyc-web42.dc5" => "nyc-web42" & "dc5"
func SplitNodeName(nodeName string) (shortName, dc string, err error) {
parts := strings.Split(nodeName, ".")
if len(parts) != 2 {
return "", "", fmt.Errorf("node name does not encode a datacenter: %s", nodeName)
}
return parts[0], parts[1], nil
}