Skip to content

Commit

Permalink
Updating grpc handler to gracefully close backend connections
Browse files Browse the repository at this point in the history
This should address part 1 of #807 and #912

update grpc docs
  • Loading branch information
nathanejohnson committed Nov 18, 2022
1 parent 458c701 commit 434e12e
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 71 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Proxy struct {
AuthSchemes map[string]AuthScheme
GRPCMaxRxMsgSize int
GRPCMaxTxMsgSize int
GRPCGShutdownTimeout time.Duration
}

type STSHeader struct {
Expand Down
25 changes: 13 additions & 12 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ var defaultConfig = &Config{
},
},
Proxy: Proxy{
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
GRPCGShutdownTimeout: time.Second * 2,
},
Registry: Registry{
Backend: "consul",
Expand Down
4 changes: 2 additions & 2 deletions config/kvslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

// parseKVSlice parses a configuration string in the form
//
// key=val;key=val,key=val;key=val
// key=val;key=val,key=val;key=val
//
// into a list of string maps. maps are separated by comma and key/value
// pairs within a map are separated by semicolons. The first key/value
// pair of a map can omit the key and its value will be stored under the
// empty key. This allows support of legacy configuration formats which
// are
//
// val;opt1=val1;opt2=val2;...
// val;opt1=val1;opt2=val2;...
func parseKVSlice(in string) ([]map[string]string, error) {
var keyOrFirstVal string
maps := []map[string]string{}
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend")
f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")
Expand Down
37 changes: 19 additions & 18 deletions demo/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,34 @@
//
// During startup the server performs the following steps:
//
// - Add a handler for each prefix which provides a unique
// response for that instance and endpoint
// - Add a `/health` handler for the consul health check
// - Register the service in consul with the listen address,
// a health check under the given name and with one `urlprefix-`
// tag per prefix
// - Install a signal handler to deregister the service on exit
// * Add a handler for each prefix which provides a unique
// response for that instance and endpoint
// * Add a `/health` handler for the consul health check
// * Register the service in consul with the listen address,
// a health check under the given name and with one `urlprefix-`
// tag per prefix
// * Install a signal handler to deregister the service on exit
//
// If the protocol is set to "ws" the registered endpoints function
// as websocket echo servers.
//
// Example:
//
// # http server
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
// # http server
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
//
// # https server
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
// # https server
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
//
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
package main

import (
Expand Down
10 changes: 10 additions & 0 deletions docs/content/ref/proxy.grpcmaxrxmsgsize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: "proxy.grpcmaxrxmsgsize"
---

`proxy.grpcmaxrxmsgsize` configures the grpc max receive message size in bytes. The default
value is

proxy.grpcmaxrxmsgsize = 4194304

which is 4MB
10 changes: 10 additions & 0 deletions docs/content/ref/proxy.grpcmaxtxmsgsize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: "proxy.grpcmaxtxmsgsize"
---

`proxy.grpcmaxtxmsgsize` configures the grpc max transmit message size in bytes. The default
value is

proxy.grpcmaxtxmsgsize = 4194304

which is 4MB
9 changes: 9 additions & 0 deletions docs/content/ref/proxy.grpcshutdowntimeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: "proxy.grpcshutdowntimeout"
---

`proxy.grpcshutdowntimeout` configures the amount of time fabio will wait to attempt
to close the connection while waiting for grpc traffic to finish to a backend that's been
deregistered. The default value is

proxy.grpcshutdowntimeout = 2s
6 changes: 6 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,12 @@
# The default is
# proxy.grpcmaxtxmsgsize = 4194304
#
#
# proxy.grpcshutdowntimeout configures the amount of time fabio will wait to attempt
# to close the connection while waiting for grpc traffic to finish to a backend that's been
# deregistered. Default value is
# proxy.grpcshutdowntimeout = 2s
# setting to 0s disables the wait.

# log.access.format configures the format of the access log.
#
Expand Down
63 changes: 32 additions & 31 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,38 @@
// takes place. Text between two fields is printed verbatim. See the common
// log file formats for an example.
//
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
// $remote_addr - host:port of remote client
// $remote_host - host of remote client
// $remote_port - port of remote client
// $request - request <method> <uri> <proto>
// $request_args - request query parameters
// $request_host - request host header (aka server name)
// $request_method - request method
// $request_scheme - request scheme
// $request_uri - request URI
// $request_url - request URL
// $request_proto - request protocol
// $response_body_size - response body size in bytes
// $response_status - response status code
// $response_time_ms - response time in S.sss format
// $response_time_us - response time in S.ssssss format
// $response_time_ns - response time in S.sssssssss format
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
// $time_unix_ms - log timestamp in unix epoch ms
// $time_unix_us - log timestamp in unix epoch us
// $time_unix_ns - log timestamp in unix epoch ns
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
// $upstream_addr - host:port of upstream server
// $upstream_host - host of upstream server
// $upstream_port - port of upstream server
// $upstream_request_scheme - upstream request scheme
// $upstream_request_uri - upstream request URI
// $upstream_request_url - upstream request URL
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
// $remote_addr - host:port of remote client
// $remote_host - host of remote client
// $remote_port - port of remote client
// $request - request <method> <uri> <proto>
// $request_args - request query parameters
// $request_host - request host header (aka server name)
// $request_method - request method
// $request_scheme - request scheme
// $request_uri - request URI
// $request_url - request URL
// $request_proto - request protocol
// $response_body_size - response body size in bytes
// $response_status - response status code
// $response_time_ms - response time in S.sss format
// $response_time_us - response time in S.ssssss format
// $response_time_ns - response time in S.sssssssss format
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
// $time_unix_ms - log timestamp in unix epoch ms
// $time_unix_us - log timestamp in unix epoch us
// $time_unix_ns - log timestamp in unix epoch ns
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
// $upstream_addr - host:port of upstream server
// $upstream_host - host of upstream server
// $upstream_port - port of upstream server
// $upstream_request_scheme - upstream request scheme
// $upstream_request_uri - upstream request URI
// $upstream_request_url - upstream request URL
//
package logger

import (
Expand Down
11 changes: 9 additions & 2 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,21 @@ func (p *grpcConnectionPool) cleanup() {
p.lock.Lock()
table := route.GetTable()
for tKey, cs := range p.connections {
if cs.GetState() == connectivity.Shutdown {
state := cs.GetState()
if state == connectivity.Shutdown {
delete(p.connections, tKey)
continue
}

if !hasTarget(tKey, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
cs.Close()
go func(cs *grpc.ClientConn, state connectivity.State) {
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout)
defer cancel()
// wait for state to change, or timeout, before closing, in case it's still handling traffic.
cs.WaitForStateChange(ctx, state)
cs.Close()
}(cs, state)
delete(p.connections, tKey)
}
}
Expand Down
1 change: 1 addition & 0 deletions proxy/http_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func addResponseHeaders(w http.ResponseWriter, r *http.Request, cfg config.Proxy
// * add X-Real-Ip, if not present
// * ClientIPHeader != "": Set header with that name to <remote ip>
// * TLS connection: Set header with name from `cfg.TLSHeader` to `cfg.TLSHeaderValue`
//
func addHeaders(r *http.Request, cfg config.Proxy, stripPath string) error {
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion proxy/http_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
globDisabled = true
)

// Global GlobCache for Testing
//Global GlobCache for Testing
var globCache = route.NewGlobCache(1000)

func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions registry/consul/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const (
// consul. To wait for completion the caller should read the next value from
// the dereg channel.
//
// dereg <- true // trigger deregistration
// <-dereg // wait for completion
// dereg <- true // trigger deregistration
// <-dereg // wait for completion
//
func register(c *api.Client, service *api.AgentServiceRegistration) chan bool {
registered := func(serviceID string) bool {
if serviceID == "" {
Expand Down
2 changes: 2 additions & 0 deletions route/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ var randIntn = func(n int) int {
}
return rand.Intn(n)
}


2 changes: 1 addition & 1 deletion route/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var oldRandInt = func(n int) int {
if n == 0 {
return 0
}
return int(time.Now().UnixNano() / int64(time.Microsecond) % int64(n))
return int(time.Now().UnixNano()/int64(time.Microsecond) % int64(n))
}

var result int // prevent compiler optimization
Expand Down
2 changes: 1 addition & 1 deletion route/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func SetTable(t Table) {
type Table map[string]Routes

// hostpath splits a 'host/path' prefix into 'host' and '/path' or it returns a
// ':port' prefix as ':port' and since there is no path component for TCP
// ':port' prefix as ':port' and '' since there is no path component for TCP
// connections.
func hostpath(prefix string) (host string, path string) {
if strings.HasPrefix(prefix, ":") {
Expand Down
2 changes: 1 addition & 1 deletion route/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
globDisabled = true
)

// Global GlobCache for Testing
//Global GlobCache for Testing
var globCache = NewGlobCache(1000)

func TestTableParse(t *testing.T) {
Expand Down

0 comments on commit 434e12e

Please sign in to comment.