Skip to content

Commit

Permalink
Stop dynamic TCP listener when upstream is no longer available (#798)
Browse files Browse the repository at this point in the history
* Stop dynamic TCP listener when upstream is no longer available

* Make `proxy.servers` a map in order to easily pick proxy to close
  • Loading branch information
fwkz committed Nov 30, 2020
1 parent 2a56cb9 commit 019ad7a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
21 changes: 21 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func startServers(cfg *config.Config) {
case "tcp-dynamic":
go func() {
var buffer strings.Builder
lastPorts := []string{}
for {
time.Sleep(l.Refresh)
table := route.GetTable()
Expand All @@ -395,6 +396,10 @@ func startServers(cfg *config.Config) {
}
ports = unique(ports)
}
for _, port := range difference(lastPorts, ports) {
log.Printf("[DEBUG] Dynamic TCP listener on %s eligable for termination", port)
proxy.CloseProxy(port)
}
for _, port := range ports {
l := l
port := port
Expand All @@ -419,6 +424,7 @@ func startServers(cfg *config.Config) {
}
}()
}
lastPorts = ports
}
}()
case "https+tcp+sni":
Expand Down Expand Up @@ -655,6 +661,21 @@ func unique(strSlice []string) []string {
return list
}

// difference returns elements in `a` that aren't in `b`
func difference(a, b []string) []string {
mb := make(map[string]struct{}, len(b))
for _, x := range b {
mb[x] = struct{}{}
}
var diff []string
for _, x := range a {
if _, found := mb[x]; !found {
diff = append(diff, x)
}
}
return diff
}

func tableSchemes(r route.Routes) []string {
schemes := []string{}
for _, rt := range r {
Expand Down
34 changes: 26 additions & 8 deletions proxy/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"log"
"net"
"net/http"
"sync"
Expand All @@ -28,23 +29,39 @@ var (
// mu guards servers which contains the list
// of running proxy servers.
mu sync.Mutex
servers []Server
servers = make(map[string]Server)
)

func CloseProxy(address string) error {
mu.Lock()
if srv, ok := servers[address]; ok {
err := srv.Close()
if err != nil {
return err
}
log.Printf("[INFO] Dynamic TCP listener on %s has been terminated", address)
delete(servers, address)
}
mu.Unlock()
return nil
}

func Close() {
mu.Lock()
for _, srv := range servers {
srv.Close()
}
servers = []Server{}
servers = make(map[string]Server)
mu.Unlock()
}

func Shutdown(timeout time.Duration) {
mu.Lock()
srvs := make([]Server, len(servers))
copy(srvs, servers)
servers = []Server{}
srvs := make(map[string]Server, len(servers))
for k, v := range servers {
srvs[k] = v
}
servers = make(map[string]Server)
mu.Unlock()

var wg sync.WaitGroup
Expand Down Expand Up @@ -129,8 +146,9 @@ func ListenAndServeHTTPSTCPSNI(l config.Listen, h http.Handler, p tcp.Handler, c
})

// tcpproxy creates its own listener from the configuration above so we can
// safely pass nil here.
return serve(nil, tps)
// safely pass nil here, nonetheless we are passing `httpsListener` to
// extract it's address and save server in the `servers` map.
return serve(httpsListener, tps)
}

func ListenAndServeGRPC(l config.Listen, opts []grpc.ServerOption, cfg *tls.Config) error {
Expand Down Expand Up @@ -162,7 +180,7 @@ func ListenAndServeTCP(l config.Listen, h tcp.Handler, cfg *tls.Config) error {

func serve(ln net.Listener, srv Server) error {
mu.Lock()
servers = append(servers, srv)
servers[ln.Addr().String()] = srv
mu.Unlock()
err := srv.Serve(ln)
if err != nil {
Expand Down

0 comments on commit 019ad7a

Please sign in to comment.