Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions cmd/config_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,18 @@ func poolConfig() (int, *network.Client) {
receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize")
receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline")
sendDeadline := globalConfig.Duration(ref + ".sendDeadline")
tcpKeepAlive := globalConfig.Bool(ref + ".tcpKeepAlive")
tcpKeepAlivePeriod := globalConfig.Duration(ref + ".tcpKeepAlivePeriod")

return poolSize, &network.Client{
Network: net,
Address: address,
ReceiveBufferSize: receiveBufferSize,
ReceiveChunkSize: receiveChunkSize,
ReceiveDeadline: receiveDeadline,
SendDeadline: sendDeadline,
Network: net,
Address: address,
TCPKeepAlive: tcpKeepAlive,
TCPKeepAlivePeriod: tcpKeepAlivePeriod,
ReceiveBufferSize: receiveBufferSize,
ReceiveChunkSize: receiveChunkSize,
ReceiveDeadline: receiveDeadline,
SendDeadline: sendDeadline,
}
}

Expand All @@ -141,15 +145,33 @@ func proxyConfig() (bool, bool, *network.Client) {
net := globalConfig.String(ref + ".network")
address := globalConfig.String(ref + ".address")
receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize")
receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize")
receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline")
sendDeadline := globalConfig.Duration(ref + ".sendDeadline")
tcpKeepAlive := globalConfig.Bool(ref + ".tcpKeepAlive")
tcpKeepAlivePeriod := globalConfig.Duration(ref + ".tcpKeepAlivePeriod")

if receiveBufferSize <= 0 {
receiveBufferSize = network.DefaultBufferSize
}

if receiveChunkSize <= 0 {
receiveChunkSize = network.DefaultChunkSize
}

if tcpKeepAlive && tcpKeepAlivePeriod <= 0 {
tcpKeepAlivePeriod = network.DefaultTCPKeepAlivePeriod
}

return elastic, reuseElasticClients, &network.Client{
Network: net,
Address: address,
ReceiveBufferSize: receiveBufferSize,
Network: net,
Address: address,
TCPKeepAlive: tcpKeepAlive,
TCPKeepAlivePeriod: tcpKeepAlivePeriod,
ReceiveBufferSize: receiveBufferSize,
ReceiveChunkSize: receiveChunkSize,
ReceiveDeadline: receiveDeadline,
SendDeadline: sendDeadline,
}
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ var runCmd = &cobra.Command{
clientConfig.ReceiveChunkSize,
clientConfig.ReceiveDeadline,
clientConfig.SendDeadline,
clientConfig.TCPKeepAlive,
clientConfig.TCPKeepAlivePeriod,
logger,
)

Expand Down
44 changes: 33 additions & 11 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
)

const (
DefaultSeed = 1000
DefaultChunkSize = 4096
DefaultReceiveDeadline = 0 // 0 means no deadline (timeout)
DefaultSendDeadline = 0
DefaultSeed = 1000
DefaultChunkSize = 4096
DefaultReceiveDeadline = 0 // 0 means no deadline (timeout)
DefaultSendDeadline = 0
DefaultTCPKeepAlivePeriod = 30 * time.Second
)

type ClientInterface interface {
Expand All @@ -28,13 +29,15 @@ type Client struct {

logger zerolog.Logger

ReceiveBufferSize int
ReceiveChunkSize int
ReceiveDeadline time.Duration
SendDeadline time.Duration
ID string
Network string // tcp/udp/unix
Address string
TCPKeepAlive bool
TCPKeepAlivePeriod time.Duration
ReceiveBufferSize int
ReceiveChunkSize int
ReceiveDeadline time.Duration
SendDeadline time.Duration
ID string
Network string // tcp/udp/unix
Address string
}

var _ ClientInterface = &Client{}
Expand All @@ -48,6 +51,7 @@ func NewClient(
network, address string,
receiveBufferSize, receiveChunkSize int,
receiveDeadline, sendDeadline time.Duration,
tcpKeepAlive bool, tcpKeepAlivePeriod time.Duration,
logger zerolog.Logger,
) *Client {
var client Client
Expand Down Expand Up @@ -84,6 +88,24 @@ func NewClient(

client.Conn = conn

// Set the TCP keep alive.
client.TCPKeepAlive = tcpKeepAlive
if tcpKeepAlivePeriod <= 0 {
client.TCPKeepAlivePeriod = DefaultTCPKeepAlivePeriod
} else {
client.TCPKeepAlivePeriod = tcpKeepAlivePeriod
}

if c, ok := client.Conn.(*net.TCPConn); ok {
if err := c.SetKeepAlive(client.TCPKeepAlive); err != nil {
logger.Error().Err(err).Msg("Failed to set keep alive")
} else {
if err := c.SetKeepAlivePeriod(client.TCPKeepAlivePeriod); err != nil {
logger.Error().Err(err).Msg("Failed to set keep alive period")
}
}
}

// Set the receive deadline (timeout).
if receiveDeadline <= 0 {
client.ReceiveDeadline = DefaultReceiveDeadline
Expand Down
8 changes: 8 additions & 0 deletions network/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func TestNewClient(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
defer client.Close()

Expand Down Expand Up @@ -77,6 +79,8 @@ func TestSend(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
defer client.Close()

Expand Down Expand Up @@ -116,6 +120,8 @@ func TestReceive(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
defer client.Close()

Expand Down Expand Up @@ -165,6 +171,8 @@ func TestClose(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
assert.NotNil(t, client)
client.Close()
Expand Down
4 changes: 4 additions & 0 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError {
pr.ClientConfig.ReceiveChunkSize,
pr.ClientConfig.ReceiveDeadline,
pr.ClientConfig.SendDeadline,
pr.ClientConfig.TCPKeepAlive,
pr.ClientConfig.TCPKeepAlivePeriod,
pr.logger,
)
pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection")
Expand Down Expand Up @@ -276,6 +278,8 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
pr.ClientConfig.ReceiveChunkSize,
pr.ClientConfig.ReceiveDeadline,
pr.ClientConfig.SendDeadline,
pr.ClientConfig.TCPKeepAlive,
pr.ClientConfig.TCPKeepAlivePeriod,
pr.logger,
)
pr.busyConnections.Remove(gconn)
Expand Down
2 changes: 2 additions & 0 deletions network/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func TestNewProxy(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
err := pool.Put(client.ID, client)
assert.Nil(t, err)
Expand Down
6 changes: 6 additions & 0 deletions network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func TestRunServer(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
err := pool.Put(client1.ID, client1)
assert.Nil(t, err)
Expand All @@ -110,6 +112,8 @@ func TestRunServer(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
err = pool.Put(client2.ID, client2)
assert.Nil(t, err)
Expand Down Expand Up @@ -166,6 +170,8 @@ func TestRunServer(t *testing.T) {
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
false,
DefaultTCPKeepAlivePeriod,
logger)
defer client.Close()

Expand Down