Skip to content

Commit

Permalink
Issue 613 tcp dynamic (#626)
Browse files Browse the repository at this point in the history
* moved tcp dynamic

* setting .gitignore and Makefile back to master

* added features md for tcp-dynamic-proxy

* udpated features doco

* updated log message for cert source

* added split check for route

* added : check for tcp-dynamic

* fixed dial timeout error
  • Loading branch information
murphymj25 authored and Aaron Hurt committed Dec 2, 2019
1 parent 08b493a commit 3584cbb
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 5 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Listen struct {
TLSCiphers []uint16
ProxyProto bool
ProxyHeaderTimeout time.Duration
Refresh time.Duration
}

type UI struct {
Expand Down
12 changes: 9 additions & 3 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w
case "proto":
l.Proto = v
switch l.Proto {
case "tcp", "tcp+sni", "http", "https", "grpc", "grpcs":
case "tcp", "tcp+sni", "tcp-dynamic", "http", "https", "grpc", "grpcs":
// ok
default:
return Listen{}, fmt.Errorf("unknown protocol %q", v)
Expand Down Expand Up @@ -435,6 +435,12 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w
return Listen{}, err
}
l.ProxyHeaderTimeout = d
case "refresh":
d, err := time.ParseDuration(v)
if err != nil {
return Listen{}, err
}
l.Refresh = d
}
}

Expand All @@ -444,8 +450,8 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w
if l.Addr == "" {
return Listen{}, fmt.Errorf("need listening host:port")
}
if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "grpcs" {
return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp' or 'grpcs'")
if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "tcp-dynamic" && l.Proto != "grpcs" {
return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp', 'tcp-dynamic' or 'grpcs'")
}
if csName == "" && l.Proto == "https" {
return Listen{}, fmt.Errorf("proto 'https' requires cert source")
Expand Down
7 changes: 7 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func TestLoad(t *testing.T) {
return cfg
},
},
{
args: []string{"-proxy.addr", ":5555;proto=tcp-dynamic"},
cfg: func(cfg *Config) *Config {
cfg.Listen = []Listen{{Addr: ":5555", Proto: "tcp-dynamic"}}
return cfg
},
},
{
desc: "-proxy.addr with tls configs",
args: []string{"-proxy.addr", `:5555;rt=1s;wt=2s;tlsmin=0x0300;tlsmax=0x305;tlsciphers="0x123,0x456"`},
Expand Down
17 changes: 17 additions & 0 deletions docs/content/feature/tcp-dynamic-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
title: "TCP Dynamic Proxy"
---

The TCP dynamic proxy is similar to the TCP Proxy, but the listener is started from the Consul urlprefix tag.
Also, the service is defined with IP and port, so that multiple services can be defined on the load balancer using
the same TCP port. Connections are forwarded to services based on the combination of ip:port

To use TCP Dynamic proxy support the service needs to advertise `urlprefix-127.0.0.1:1234 proto=tcp` in
Consul. In addition, fabio needs to be configured with a placeholder for the proxy.addr.:

```
fabio -proxy.addr '0.0.0.0:0;proto=tcp-dynamic;refresh=5s'
```

The TCP listener is started for the given TCP ports. To use IP addressing to separate the services, matching IP
addressed would need to be added to the loopback interface on the host.
7 changes: 7 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
# * https for HTTPS based protocols
# * tcp for a raw TCP proxy with or witout TLS support
# * tcp+sni for an SNI aware TCP proxy
# * tcp-dynamic for a consul driven TCP proxy
#
# If no 'proto' option is specified then the protocol
# is either 'http' or 'https' depending on whether a
Expand Down Expand Up @@ -240,6 +241,9 @@
# pxytimeout: Sets PROXY protocol header read timeout as a duration (e.g. '250ms').
# This defaults to 250ms if not set when 'pxyproto' is enabled.
#
# refresh: Sets the refresh interval to check the route table for updates.
# Used when 'tcp-dynamic' is enabled.
#
# TLS options:
#
# tlsmin: Sets the minimum TLS version for the handshake. This value
Expand Down Expand Up @@ -280,6 +284,9 @@
# # TCP listener on port 443 with SNI routing
# proxy.addr = :443;proto=tcp+sni
#
# # TCP listeners using consul for config with 5 second refresh interval
# proxy.addr = 0.0.0.0:0;proto=tcp-dynamic;refresh=5s
#
# The default is
#
# proxy.addr = :9999
Expand Down
68 changes: 68 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,52 @@ func startServers(cfg *config.Config) {
exit.Fatal("[FATAL] ", err)
}
}()
case "tcp-dynamic":
go func() {
var buffer strings.Builder
for {
time.Sleep(l.Refresh)
table := route.GetTable()
ports := []string{}
for target, rts := range table {
if strings.Contains(target, ":") {
buffer.WriteString(":")
buffer.WriteString(strings.Split(target, ":")[1])

schemes := tableSchemes(rts)
if len(schemes) == 1 && schemes[0] == "tcp" {
ports = append(ports, buffer.String())
}
buffer.Reset()
}
ports = unique(ports)
}
for _, port := range ports {
l := l
port := port
conn, err := net.Listen("tcp", port)
if err != nil {
log.Printf("[DEBUG] Dynamic TCP port %s in use", port)
continue
}
conn.Close()
log.Printf("[INFO] Starting dynamic TCP listener on port %s ", port)
go func() {
h := &tcp.DynamicProxy{
DialTimeout: cfg.Proxy.DialTimeout,
Lookup: lookupHostFn(cfg),
Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"),
ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"),
Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"),
}
l.Addr = port
if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil {
exit.Fatal("[FATAL] ", err)
}
}()
}
}
}()
default:
exit.Fatal("[FATAL] Invalid protocol ", l.Proto)
}
Expand Down Expand Up @@ -551,3 +597,25 @@ func toJSON(v interface{}) string {
}
return string(data)
}

func unique(strSlice []string) []string {
keys := make(map[string]bool)
list := []string{}
for _, entry := range strSlice {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}

func tableSchemes(r route.Routes) []string {
schemes := []string{}
for _, rt := range r {
for _, target := range rt.Targets {
schemes = append(schemes, target.URL.Scheme)
}
}
return unique(schemes)
}
82 changes: 82 additions & 0 deletions proxy/tcp/tcp_dynamic_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package tcp

import (
"io"
"log"
"net"
"time"

"github.com/fabiolb/fabio/metrics"
"github.com/fabiolb/fabio/route"
)

// Proxy implements a generic TCP proxying handler.
type DynamicProxy struct {
// DialTimeout sets the timeout for establishing the outbound
// connection.
DialTimeout time.Duration

// Lookup returns a target host for the given request.
// The proxy will panic if this value is nil.
Lookup func(host string) *route.Target

// Conn counts the number of connections.
Conn metrics.Counter

// ConnFail counts the failed upstream connection attempts.
ConnFail metrics.Counter

// Noroute counts the failed Lookup() calls.
Noroute metrics.Counter
}

func (p *DynamicProxy) ServeTCP(in net.Conn) error {
defer in.Close()

if p.Conn != nil {
p.Conn.Inc(1)
}
target := in.LocalAddr().String()
t := p.Lookup(target)
if t == nil {
if p.Noroute != nil {
p.Noroute.Inc(1)
}
return nil
}
addr := t.URL.Host
log.Printf("[DEBUG] Connection: %s incoming %s to %s: ", in.RemoteAddr(), target, addr)

if t.AccessDeniedTCP(in) {
return nil
}

out, err := net.DialTimeout("tcp", addr, p.DialTimeout)
if err != nil {
log.Print("[WARN] tcp: cannot connect to upstream ", addr)
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return err
}
defer out.Close()

errc := make(chan error, 2)
cp := func(dst io.Writer, src io.Reader, c metrics.Counter) {
errc <- copyBuffer(dst, src, c)
}

// rx measures the traffic to the upstream server (in <- out)
// tx measures the traffic from the upstream server (out <- in)
rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx")
tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx")

go cp(in, out, rx)
go cp(out, in, tx)
err = <-errc
if err != nil && err != io.EOF {
log.Print("[WARN]: tcp: ", err)
return err
}
return nil
}
32 changes: 32 additions & 0 deletions proxy/tcp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,38 @@ var echoHandler tcp.HandlerFunc = func(c net.Conn) error {
return err
}

// TestTCPDynamicProxy tests proxying an unencrypted TCP connection
// to a TCP upstream server.
func TestTCPDyanmicProxy(t *testing.T) {
srv := tcptest.NewServer(echoHandler)
defer srv.Close()

// start proxy
proxyAddr := "127.0.0.1:57778"
go func() {
h := &tcp.DynamicProxy{
Lookup: func(h string) *route.Target {
tbl, _ := route.NewTable("route add srv 127.0.0.1:57778 tcp://" + srv.Addr)
return tbl.LookupHost(h, route.Picker["rr"])
},
}
l := config.Listen{Addr: proxyAddr}
if err := ListenAndServeTCP(l, h, nil); err != nil {
t.Log("ListenAndServeTCP: ", err)
}
}()
defer Close()

// connect to proxy
out, err := tcptest.NewRetryDialer().Dial("tcp", proxyAddr)
if err != nil {
t.Fatalf("net.Dial: %#v", err)
}
defer out.Close()

testRoundtrip(t, out)
}

// TestTCPProxy tests proxying an unencrypted TCP connection
// to a TCP upstream server.
func TestTCPProxy(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions registry/consul/routecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts str
return s, opts, true
}

if !strings.Contains(s, "/") {
return s, opts, true
}
// prefix is "host/path"
p = strings.SplitN(s, "/", 2)
if len(p) == 1 {
Expand Down
5 changes: 3 additions & 2 deletions registry/consul/routecmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ func TestParseTag(t *testing.T) {
ok bool
}{
{tag: "p", route: "", ok: false},
{tag: "p-", route: "", ok: false},
{tag: "p- ", route: "", ok: false},
{tag: "p-", route: "", ok: true},
{tag: "p- ", route: "", ok: true},
{tag: "p-/", route: "/", ok: true},
{tag: " p-/", route: "/", ok: true},
{tag: "p-/ ", route: "/", ok: true},
{tag: "p- / ", route: "/", ok: true},
{tag: "p-/foo", route: "/foo", ok: true},
{tag: "p- /foo", route: "/foo", ok: true},
{tag: "p-1.1.1.1:999", route: "1.1.1.1:999", ok: true},
{tag: "p-bar/foo", route: "bar/foo", ok: true},
{tag: "p-bar/foo/foo", route: "bar/foo/foo", ok: true},
{tag: "p-www.bar.com/foo/foo", route: "www.bar.com/foo/foo", ok: true},
Expand Down

0 comments on commit 3584cbb

Please sign in to comment.