diff --git a/cmd/config_parser.go b/cmd/config_parser.go index 326b227a..75892c29 100644 --- a/cmd/config_parser.go +++ b/cmd/config_parser.go @@ -121,14 +121,18 @@ func poolConfig() (int, *network.Client) { receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize") receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline") sendDeadline := globalConfig.Duration(ref + ".sendDeadline") + tcpKeepAlive := globalConfig.Bool(ref + ".tcpKeepAlive") + tcpKeepAlivePeriod := globalConfig.Duration(ref + ".tcpKeepAlivePeriod") return poolSize, &network.Client{ - Network: net, - Address: address, - ReceiveBufferSize: receiveBufferSize, - ReceiveChunkSize: receiveChunkSize, - ReceiveDeadline: receiveDeadline, - SendDeadline: sendDeadline, + Network: net, + Address: address, + TCPKeepAlive: tcpKeepAlive, + TCPKeepAlivePeriod: tcpKeepAlivePeriod, + ReceiveBufferSize: receiveBufferSize, + ReceiveChunkSize: receiveChunkSize, + ReceiveDeadline: receiveDeadline, + SendDeadline: sendDeadline, } } @@ -141,15 +145,33 @@ func proxyConfig() (bool, bool, *network.Client) { net := globalConfig.String(ref + ".network") address := globalConfig.String(ref + ".address") receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize") + receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize") + receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline") + sendDeadline := globalConfig.Duration(ref + ".sendDeadline") + tcpKeepAlive := globalConfig.Bool(ref + ".tcpKeepAlive") + tcpKeepAlivePeriod := globalConfig.Duration(ref + ".tcpKeepAlivePeriod") if receiveBufferSize <= 0 { receiveBufferSize = network.DefaultBufferSize } + if receiveChunkSize <= 0 { + receiveChunkSize = network.DefaultChunkSize + } + + if tcpKeepAlive && tcpKeepAlivePeriod <= 0 { + tcpKeepAlivePeriod = network.DefaultTCPKeepAlivePeriod + } + return elastic, reuseElasticClients, &network.Client{ - Network: net, - Address: address, - ReceiveBufferSize: receiveBufferSize, + Network: net, + Address: address, + TCPKeepAlive: tcpKeepAlive, + TCPKeepAlivePeriod: tcpKeepAlivePeriod, + ReceiveBufferSize: receiveBufferSize, + ReceiveChunkSize: receiveChunkSize, + ReceiveDeadline: receiveDeadline, + SendDeadline: sendDeadline, } } diff --git a/cmd/run.go b/cmd/run.go index fb0a73cb..d4188801 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -124,6 +124,8 @@ var runCmd = &cobra.Command{ clientConfig.ReceiveChunkSize, clientConfig.ReceiveDeadline, clientConfig.SendDeadline, + clientConfig.TCPKeepAlive, + clientConfig.TCPKeepAlivePeriod, logger, ) diff --git a/network/client.go b/network/client.go index c3fae80b..93cf6dbc 100644 --- a/network/client.go +++ b/network/client.go @@ -10,10 +10,11 @@ import ( ) const ( - DefaultSeed = 1000 - DefaultChunkSize = 4096 - DefaultReceiveDeadline = 0 // 0 means no deadline (timeout) - DefaultSendDeadline = 0 + DefaultSeed = 1000 + DefaultChunkSize = 4096 + DefaultReceiveDeadline = 0 // 0 means no deadline (timeout) + DefaultSendDeadline = 0 + DefaultTCPKeepAlivePeriod = 30 * time.Second ) type ClientInterface interface { @@ -28,13 +29,15 @@ type Client struct { logger zerolog.Logger - ReceiveBufferSize int - ReceiveChunkSize int - ReceiveDeadline time.Duration - SendDeadline time.Duration - ID string - Network string // tcp/udp/unix - Address string + TCPKeepAlive bool + TCPKeepAlivePeriod time.Duration + ReceiveBufferSize int + ReceiveChunkSize int + ReceiveDeadline time.Duration + SendDeadline time.Duration + ID string + Network string // tcp/udp/unix + Address string } var _ ClientInterface = &Client{} @@ -48,6 +51,7 @@ func NewClient( network, address string, receiveBufferSize, receiveChunkSize int, receiveDeadline, sendDeadline time.Duration, + tcpKeepAlive bool, tcpKeepAlivePeriod time.Duration, logger zerolog.Logger, ) *Client { var client Client @@ -84,6 +88,24 @@ func NewClient( client.Conn = conn + // Set the TCP keep alive. + client.TCPKeepAlive = tcpKeepAlive + if tcpKeepAlivePeriod <= 0 { + client.TCPKeepAlivePeriod = DefaultTCPKeepAlivePeriod + } else { + client.TCPKeepAlivePeriod = tcpKeepAlivePeriod + } + + if c, ok := client.Conn.(*net.TCPConn); ok { + if err := c.SetKeepAlive(client.TCPKeepAlive); err != nil { + logger.Error().Err(err).Msg("Failed to set keep alive") + } else { + if err := c.SetKeepAlivePeriod(client.TCPKeepAlivePeriod); err != nil { + logger.Error().Err(err).Msg("Failed to set keep alive period") + } + } + } + // Set the receive deadline (timeout). if receiveDeadline <= 0 { client.ReceiveDeadline = DefaultReceiveDeadline diff --git a/network/client_test.go b/network/client_test.go index 7ddd4ed7..62d5c638 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -38,6 +38,8 @@ func TestNewClient(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) defer client.Close() @@ -77,6 +79,8 @@ func TestSend(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) defer client.Close() @@ -116,6 +120,8 @@ func TestReceive(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) defer client.Close() @@ -165,6 +171,8 @@ func TestClose(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) assert.NotNil(t, client) client.Close() diff --git a/network/proxy.go b/network/proxy.go index 1b88b908..0cb8412e 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -85,6 +85,8 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError { pr.ClientConfig.ReceiveChunkSize, pr.ClientConfig.ReceiveDeadline, pr.ClientConfig.SendDeadline, + pr.ClientConfig.TCPKeepAlive, + pr.ClientConfig.TCPKeepAlivePeriod, pr.logger, ) pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection") @@ -276,6 +278,8 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { pr.ClientConfig.ReceiveChunkSize, pr.ClientConfig.ReceiveDeadline, pr.ClientConfig.SendDeadline, + pr.ClientConfig.TCPKeepAlive, + pr.ClientConfig.TCPKeepAlivePeriod, pr.logger, ) pr.busyConnections.Remove(gconn) diff --git a/network/proxy_test.go b/network/proxy_test.go index 4961b1ce..93a587e6 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -42,6 +42,8 @@ func TestNewProxy(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) err := pool.Put(client.ID, client) assert.Nil(t, err) diff --git a/network/server_test.go b/network/server_test.go index 0cc698e4..353e7299 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -100,6 +100,8 @@ func TestRunServer(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) err := pool.Put(client1.ID, client1) assert.Nil(t, err) @@ -110,6 +112,8 @@ func TestRunServer(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) err = pool.Put(client2.ID, client2) assert.Nil(t, err) @@ -166,6 +170,8 @@ func TestRunServer(t *testing.T) { DefaultChunkSize, DefaultReceiveDeadline, DefaultSendDeadline, + false, + DefaultTCPKeepAlivePeriod, logger) defer client.Close()