From 3584cbb23602342144413ff99230f9dc8ce0b4a4 Mon Sep 17 00:00:00 2001 From: Michael Murphy <36900417+murphymj25@users.noreply.github.com> Date: Mon, 2 Dec 2019 12:22:37 -0600 Subject: [PATCH] Issue 613 tcp dynamic (#626) * moved tcp dynamic * setting .gitignore and Makefile back to master * added features md for tcp-dynamic-proxy * udpated features doco * updated log message for cert source * added split check for route * added : check for tcp-dynamic * fixed dial timeout error --- config/config.go | 1 + config/load.go | 12 +++- config/load_test.go | 7 ++ docs/content/feature/tcp-dynamic-proxy.md | 17 +++++ fabio.properties | 7 ++ main.go | 68 +++++++++++++++++++ proxy/tcp/tcp_dynamic_proxy.go | 82 +++++++++++++++++++++++ proxy/tcp_integration_test.go | 32 +++++++++ registry/consul/routecmd.go | 3 + registry/consul/routecmd_test.go | 5 +- 10 files changed, 229 insertions(+), 5 deletions(-) create mode 100644 docs/content/feature/tcp-dynamic-proxy.md create mode 100644 proxy/tcp/tcp_dynamic_proxy.go diff --git a/config/config.go b/config/config.go index cf97eb7ea..e7434b384 100644 --- a/config/config.go +++ b/config/config.go @@ -45,6 +45,7 @@ type Listen struct { TLSCiphers []uint16 ProxyProto bool ProxyHeaderTimeout time.Duration + Refresh time.Duration } type UI struct { diff --git a/config/load.go b/config/load.go index 5b2978ead..d0305c5f3 100644 --- a/config/load.go +++ b/config/load.go @@ -380,7 +380,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w case "proto": l.Proto = v switch l.Proto { - case "tcp", "tcp+sni", "http", "https", "grpc", "grpcs": + case "tcp", "tcp+sni", "tcp-dynamic", "http", "https", "grpc", "grpcs": // ok default: return Listen{}, fmt.Errorf("unknown protocol %q", v) @@ -435,6 +435,12 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w return Listen{}, err } l.ProxyHeaderTimeout = d + case "refresh": + d, err := time.ParseDuration(v) + if err != nil { + return Listen{}, err + } + l.Refresh = d } } @@ -444,8 +450,8 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w if l.Addr == "" { return Listen{}, fmt.Errorf("need listening host:port") } - if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "grpcs" { - return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp' or 'grpcs'") + if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "tcp-dynamic" && l.Proto != "grpcs" { + return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp', 'tcp-dynamic' or 'grpcs'") } if csName == "" && l.Proto == "https" { return Listen{}, fmt.Errorf("proto 'https' requires cert source") diff --git a/config/load_test.go b/config/load_test.go index 5b4b6989d..958c70e41 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -105,6 +105,13 @@ func TestLoad(t *testing.T) { return cfg }, }, + { + args: []string{"-proxy.addr", ":5555;proto=tcp-dynamic"}, + cfg: func(cfg *Config) *Config { + cfg.Listen = []Listen{{Addr: ":5555", Proto: "tcp-dynamic"}} + return cfg + }, + }, { desc: "-proxy.addr with tls configs", args: []string{"-proxy.addr", `:5555;rt=1s;wt=2s;tlsmin=0x0300;tlsmax=0x305;tlsciphers="0x123,0x456"`}, diff --git a/docs/content/feature/tcp-dynamic-proxy.md b/docs/content/feature/tcp-dynamic-proxy.md new file mode 100644 index 000000000..c362509c8 --- /dev/null +++ b/docs/content/feature/tcp-dynamic-proxy.md @@ -0,0 +1,17 @@ +--- +title: "TCP Dynamic Proxy" +--- + +The TCP dynamic proxy is similar to the TCP Proxy, but the listener is started from the Consul urlprefix tag. +Also, the service is defined with IP and port, so that multiple services can be defined on the load balancer using +the same TCP port. Connections are forwarded to services based on the combination of ip:port + +To use TCP Dynamic proxy support the service needs to advertise `urlprefix-127.0.0.1:1234 proto=tcp` in +Consul. In addition, fabio needs to be configured with a placeholder for the proxy.addr.: + +``` +fabio -proxy.addr '0.0.0.0:0;proto=tcp-dynamic;refresh=5s' +``` + +The TCP listener is started for the given TCP ports. To use IP addressing to separate the services, matching IP +addressed would need to be added to the loopback interface on the host. \ No newline at end of file diff --git a/fabio.properties b/fabio.properties index 0794dbdb7..a32bf10d3 100644 --- a/fabio.properties +++ b/fabio.properties @@ -206,6 +206,7 @@ # * https for HTTPS based protocols # * tcp for a raw TCP proxy with or witout TLS support # * tcp+sni for an SNI aware TCP proxy +# * tcp-dynamic for a consul driven TCP proxy # # If no 'proto' option is specified then the protocol # is either 'http' or 'https' depending on whether a @@ -240,6 +241,9 @@ # pxytimeout: Sets PROXY protocol header read timeout as a duration (e.g. '250ms'). # This defaults to 250ms if not set when 'pxyproto' is enabled. # +# refresh: Sets the refresh interval to check the route table for updates. +# Used when 'tcp-dynamic' is enabled. +# # TLS options: # # tlsmin: Sets the minimum TLS version for the handshake. This value @@ -280,6 +284,9 @@ # # TCP listener on port 443 with SNI routing # proxy.addr = :443;proto=tcp+sni # +# # TCP listeners using consul for config with 5 second refresh interval +# proxy.addr = 0.0.0.0:0;proto=tcp-dynamic;refresh=5s +# # The default is # # proxy.addr = :9999 diff --git a/main.go b/main.go index 199407bd2..b417ebe35 100644 --- a/main.go +++ b/main.go @@ -343,6 +343,52 @@ func startServers(cfg *config.Config) { exit.Fatal("[FATAL] ", err) } }() + case "tcp-dynamic": + go func() { + var buffer strings.Builder + for { + time.Sleep(l.Refresh) + table := route.GetTable() + ports := []string{} + for target, rts := range table { + if strings.Contains(target, ":") { + buffer.WriteString(":") + buffer.WriteString(strings.Split(target, ":")[1]) + + schemes := tableSchemes(rts) + if len(schemes) == 1 && schemes[0] == "tcp" { + ports = append(ports, buffer.String()) + } + buffer.Reset() + } + ports = unique(ports) + } + for _, port := range ports { + l := l + port := port + conn, err := net.Listen("tcp", port) + if err != nil { + log.Printf("[DEBUG] Dynamic TCP port %s in use", port) + continue + } + conn.Close() + log.Printf("[INFO] Starting dynamic TCP listener on port %s ", port) + go func() { + h := &tcp.DynamicProxy{ + DialTimeout: cfg.Proxy.DialTimeout, + Lookup: lookupHostFn(cfg), + Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), + ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), + Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + } + l.Addr = port + if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { + exit.Fatal("[FATAL] ", err) + } + }() + } + } + }() default: exit.Fatal("[FATAL] Invalid protocol ", l.Proto) } @@ -551,3 +597,25 @@ func toJSON(v interface{}) string { } return string(data) } + +func unique(strSlice []string) []string { + keys := make(map[string]bool) + list := []string{} + for _, entry := range strSlice { + if _, value := keys[entry]; !value { + keys[entry] = true + list = append(list, entry) + } + } + return list +} + +func tableSchemes(r route.Routes) []string { + schemes := []string{} + for _, rt := range r { + for _, target := range rt.Targets { + schemes = append(schemes, target.URL.Scheme) + } + } + return unique(schemes) +} \ No newline at end of file diff --git a/proxy/tcp/tcp_dynamic_proxy.go b/proxy/tcp/tcp_dynamic_proxy.go new file mode 100644 index 000000000..2cd4ad8b3 --- /dev/null +++ b/proxy/tcp/tcp_dynamic_proxy.go @@ -0,0 +1,82 @@ +package tcp + +import ( + "io" + "log" + "net" + "time" + + "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/route" +) + +// Proxy implements a generic TCP proxying handler. +type DynamicProxy struct { + // DialTimeout sets the timeout for establishing the outbound + // connection. + DialTimeout time.Duration + + // Lookup returns a target host for the given request. + // The proxy will panic if this value is nil. + Lookup func(host string) *route.Target + + // Conn counts the number of connections. + Conn metrics.Counter + + // ConnFail counts the failed upstream connection attempts. + ConnFail metrics.Counter + + // Noroute counts the failed Lookup() calls. + Noroute metrics.Counter +} + +func (p *DynamicProxy) ServeTCP(in net.Conn) error { + defer in.Close() + + if p.Conn != nil { + p.Conn.Inc(1) + } + target := in.LocalAddr().String() + t := p.Lookup(target) + if t == nil { + if p.Noroute != nil { + p.Noroute.Inc(1) + } + return nil + } + addr := t.URL.Host + log.Printf("[DEBUG] Connection: %s incoming %s to %s: ", in.RemoteAddr(), target, addr) + + if t.AccessDeniedTCP(in) { + return nil + } + + out, err := net.DialTimeout("tcp", addr, p.DialTimeout) + if err != nil { + log.Print("[WARN] tcp: cannot connect to upstream ", addr) + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } + return err + } + defer out.Close() + + errc := make(chan error, 2) + cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + errc <- copyBuffer(dst, src, c) + } + + // rx measures the traffic to the upstream server (in <- out) + // tx measures the traffic from the upstream server (out <- in) + rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") + tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + + go cp(in, out, rx) + go cp(out, in, tx) + err = <-errc + if err != nil && err != io.EOF { + log.Print("[WARN]: tcp: ", err) + return err + } + return nil +} diff --git a/proxy/tcp_integration_test.go b/proxy/tcp_integration_test.go index 069de7818..ec5923f67 100644 --- a/proxy/tcp_integration_test.go +++ b/proxy/tcp_integration_test.go @@ -32,6 +32,38 @@ var echoHandler tcp.HandlerFunc = func(c net.Conn) error { return err } +// TestTCPDynamicProxy tests proxying an unencrypted TCP connection +// to a TCP upstream server. +func TestTCPDyanmicProxy(t *testing.T) { + srv := tcptest.NewServer(echoHandler) + defer srv.Close() + + // start proxy + proxyAddr := "127.0.0.1:57778" + go func() { + h := &tcp.DynamicProxy{ + Lookup: func(h string) *route.Target { + tbl, _ := route.NewTable("route add srv 127.0.0.1:57778 tcp://" + srv.Addr) + return tbl.LookupHost(h, route.Picker["rr"]) + }, + } + l := config.Listen{Addr: proxyAddr} + if err := ListenAndServeTCP(l, h, nil); err != nil { + t.Log("ListenAndServeTCP: ", err) + } + }() + defer Close() + + // connect to proxy + out, err := tcptest.NewRetryDialer().Dial("tcp", proxyAddr) + if err != nil { + t.Fatalf("net.Dial: %#v", err) + } + defer out.Close() + + testRoundtrip(t, out) +} + // TestTCPProxy tests proxying an unencrypted TCP connection // to a TCP upstream server. func TestTCPProxy(t *testing.T) { diff --git a/registry/consul/routecmd.go b/registry/consul/routecmd.go index 2f2707b7a..7a17ef0be 100644 --- a/registry/consul/routecmd.go +++ b/registry/consul/routecmd.go @@ -134,6 +134,9 @@ func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts str return s, opts, true } + if !strings.Contains(s, "/") { + return s, opts, true + } // prefix is "host/path" p = strings.SplitN(s, "/", 2) if len(p) == 1 { diff --git a/registry/consul/routecmd_test.go b/registry/consul/routecmd_test.go index 2c906e476..d6b19b031 100644 --- a/registry/consul/routecmd_test.go +++ b/registry/consul/routecmd_test.go @@ -64,14 +64,15 @@ func TestParseTag(t *testing.T) { ok bool }{ {tag: "p", route: "", ok: false}, - {tag: "p-", route: "", ok: false}, - {tag: "p- ", route: "", ok: false}, + {tag: "p-", route: "", ok: true}, + {tag: "p- ", route: "", ok: true}, {tag: "p-/", route: "/", ok: true}, {tag: " p-/", route: "/", ok: true}, {tag: "p-/ ", route: "/", ok: true}, {tag: "p- / ", route: "/", ok: true}, {tag: "p-/foo", route: "/foo", ok: true}, {tag: "p- /foo", route: "/foo", ok: true}, + {tag: "p-1.1.1.1:999", route: "1.1.1.1:999", ok: true}, {tag: "p-bar/foo", route: "bar/foo", ok: true}, {tag: "p-bar/foo/foo", route: "bar/foo/foo", ok: true}, {tag: "p-www.bar.com/foo/foo", route: "www.bar.com/foo/foo", ok: true},