/
proxy.go
312 lines (279 loc) · 10 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
// Copyright 2014 The Serviced Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package container
import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"strconv"
"strings"
"time"
"github.com/control-center/serviced/auth"
"github.com/control-center/serviced/utils"
"github.com/zenoss/glog"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
/*
The 'prxy' service implemented here provides both a prxy for outbound
service requests and a multiplexer for inbound requests. The diagram below
illustrates one way proxies interoperate.
proxy A proxy B
+-----------+ +-----------+
22250 | +---->22250 ---------------+
| | | | | |
+-->3306 --------------+ | | |
+-->4369 --------------+ | | |
| | | | | |
| +-----------+ +-----------+ |
| |
+----zensvc mysql/3306 <-------+
rabbitmq/4369 <----+
proxy A exposes MySQL and RabbitMQ ports, 3306 and 4369 respectively, to its
zensvc. When zensvc connects to those ports proxy A forwards the resulting
traffic to the appropriate remote services via the TCPMux port exposed by
proxy B.
Start the service from the command line by typing
prxy [OPTIONS] SERVICE_ID
-certfile="": path to public certificate file (defaults to compiled in public cert)
-endpoint="127.0.0.1:4979": serviced endpoint address
-keyfile="": path to private key file (defaults to compiled in private key)
-mux=true: enable port multiplexing
-muxport=22250: multiplexing port to use
tls is always enabled
To terminate the prxy service connect to it via port 4321 and it will exit.
The netcat (nc) command is particularly useful for this:
nc 127.0.0.1 4321
*/
type addressTuple struct {
host string // IP of the host on which the container is running
containerAddr string // Container IP:port of the remote service
}
type proxy struct {
name string // Name of the remote service
tenantEndpointID string // Tenant endpoint ID
addresses []addressTuple // Public/container IP:Port of the remote service
tcpMuxPort uint16 // the port to use for TCP Muxing, 0 is disabled
useTLS bool // use encryption over mux port
closing chan chan error // internal shutdown signal
newAddresses chan []addressTuple // a stream of updates to the addresses
listener net.Listener // handle on the listening socket
allowDirectConn bool // allow container to container connections
}
// Newproxy create a new proxy object. It starts listening on the prxy port asynchronously.
func newProxy(name, tenantEndpointID string, tcpMuxPort uint16, useTLS bool, listener net.Listener, allowDirectConn bool) (p *proxy, err error) {
if len(name) == 0 {
return nil, fmt.Errorf("prxy: name can not be empty")
}
p = &proxy{
name: name,
tenantEndpointID: tenantEndpointID,
addresses: make([]addressTuple, 0),
tcpMuxPort: tcpMuxPort,
useTLS: useTLS,
listener: listener,
allowDirectConn: allowDirectConn,
}
p.newAddresses = make(chan []addressTuple, 2)
go p.listenAndproxy()
return p, nil
}
// Name() returns the application name associated with the prxy
func (p *proxy) Name() string {
return p.name
}
// String() pretty prints the proxy struct.
func (p *proxy) String() string {
return fmt.Sprintf("proxy[%s; %s]=>%v", p.name, p.listener, p.addresses)
}
// TCPMuxPort() returns the tcp port use for muxing, 0 if not used.
func (p *proxy) TCPMuxPort() uint16 {
return p.tcpMuxPort
}
// UseTLS() returns true if TLS is used during tcp muxing.
func (p *proxy) UseTLS() bool {
return p.useTLS
}
// Set a new Destination Address set for the prxy
func (p *proxy) SetNewAddresses(addresses []addressTuple) {
// Randomize the addresses so not all instances get them in the same order
dest := make([]addressTuple, len(addresses))
perm := rand.Perm(len(addresses))
for i, v := range perm {
dest[v] = addresses[i]
}
p.newAddresses <- dest
}
// Close() terminates the prxy; it can not be restarted.
func (p *proxy) Close() error {
p.listener.Close()
errc := make(chan error)
p.closing <- errc
return <-errc
}
// listenAndproxy listens, locally, on the prxy's specified Port. For each
// incoming connection a goroutine running the prxy method is created.
func (p *proxy) listenAndproxy() {
connections := make(chan net.Conn)
go func(lsocket net.Listener, conns chan net.Conn) {
for {
conn, err := lsocket.Accept()
if err != nil {
glog.Fatal("Error (net.Accept): ", err)
}
conns <- conn
}
}(p.listener, connections)
i := 0
for {
select {
case conn := <-connections:
if len(p.addresses) == 0 {
glog.Warningf("No remote services available for prxying %v", p)
conn.Close()
continue
}
i++
// round robin connections to list of addresses
glog.V(1).Infof("choosing address from %v", p.addresses)
go p.prxy(conn, p.addresses[i%len(p.addresses)])
case p.addresses = <-p.newAddresses:
case errc := <-p.closing:
p.listener.Close()
errc <- nil
return
}
}
}
func getPort(addr string) (int, error) {
parts := strings.Split(addr, ":")
if len(parts) == 0 {
return 0, fmt.Errorf("could not determint port from %v", addr)
}
port := parts[len(parts)-1]
return strconv.Atoi(port)
}
// prxy takes an established local connection, Dials the remote address specified
// by the proxy structure and then copies data to and from the resulting pair
// of endpoints.
func (p *proxy) prxy(local net.Conn, address addressTuple) {
var (
remote net.Conn
err error
)
glog.V(2).Infof("Setting up proxy for %#v", address)
isLocalContainer := false
localAddr := address.containerAddr
if p.allowDirectConn {
//check if the host for the container is running on the same host
isLocalContainer = isLocalAddress(address.host)
glog.V(4).Infof("Checking is local for %s %t in %#v", address.host, isLocalContainer, hostIPs)
// don't proxy localhost addresses, we'll end up in a loop
if isLocalContainer {
switch {
case strings.HasPrefix(address.host, "127"):
isLocalContainer = false
case address.host == "localhost":
isLocalContainer = false
case strings.HasPrefix(address.containerAddr, "127") || strings.HasPrefix(address.containerAddr, "localhost:"):
//if the host is local and the container has a local style addr
//then container is exposing port directly on host; go to host and use container port
if containerPort, err := getPort(address.containerAddr); err != nil {
glog.Warningf("could not get port %v", err)
isLocalContainer = false
} else {
localAddr = fmt.Sprintf("%s:%d", address.host, containerPort)
}
}
}
}
if p.tcpMuxPort == 0 {
// TODO: Do this properly
glog.Errorf("Mux port is unspecified. Using default of 22250.")
p.tcpMuxPort = 22250
}
muxAddr := fmt.Sprintf("%s:%d", address.host, p.tcpMuxPort)
// Build the authentication header before dialing the connection, so the
// connection isn't sitting open waiting for an authentication token to be
// loaded.
var (
muxAddrPacked []byte
token string
tokenTimeout = 30 * time.Second
)
if !isLocalContainer {
muxAddrPacked, err = utils.PackTCPAddressString(address.containerAddr)
if err != nil {
glog.Errorf("Container address is invalid. Can't create proxy: %s", address.containerAddr)
return
}
select {
case token = <-auth.AuthToken(nil):
case <-time.After(tokenTimeout):
glog.Error("Unable to retrieve authentication token with 30 seconds")
return
}
}
// Dial the target connection, which will be either a local container
// address or a mux port on a remote host.
switch {
case isLocalContainer:
glog.V(2).Infof("dialing local addr=> %s", localAddr)
remote, err = net.Dial("tcp4", localAddr)
if err != nil {
glog.Errorf("Error Local (net.Dial): %s", err)
return
}
case p.useTLS:
glog.V(2).Infof("dialing remote tls => %s", muxAddr)
config := tls.Config{InsecureSkipVerify: true}
tlsConn, err := tls.Dial("tcp4", muxAddr, &config)
if err != nil {
glog.Errorf("Error TLS (net.Dial): %s", err)
return
}
remote = tlsConn // cast it to the net.Conn interface
cipher := tlsConn.ConnectionState().CipherSuite
glog.V(2).Infof("Proxy connected to mux with TLS cipher=%s (%d)", utils.GetCipherName(cipher), cipher)
default:
glog.V(2).Infof("dialing remote => %s", muxAddr)
remote, err = net.Dial("tcp4", muxAddr)
if err != nil {
glog.Errorf("Error Remote (net.Dial): %s", err)
return
}
}
// If this is not a local container, write the mux header
if token != "" && len(muxAddrPacked) > 0 {
auth.AddSignedMuxHeader(remote, muxAddrPacked, token)
}
glog.V(2).Infof("Using hostAgent:%v to prxy %v<->%v<->%v<->%v",
remote.RemoteAddr(), local.LocalAddr(), local.RemoteAddr(), remote.LocalAddr(), address)
go func(address string) {
defer local.Close()
defer remote.Close()
io.Copy(local, remote)
glog.V(2).Infof("Closing hostAgent:%v to prxy %v<->%v<->%v<->%v",
remote.RemoteAddr(), local.LocalAddr(), local.RemoteAddr(), remote.LocalAddr(), address)
}(address.containerAddr)
go func(address string) {
defer local.Close()
defer remote.Close()
io.Copy(remote, local)
glog.V(2).Infof("closing hostAgent:%v to prxy %v<->%v<->%v<->%v",
remote.RemoteAddr(), local.LocalAddr(), local.RemoteAddr(), remote.LocalAddr(), address)
}(address.containerAddr)
}