From 72db34fb54665e5fbf38766a1528fe85bd62dfc0 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 11 Jan 2023 00:13:52 +0100 Subject: [PATCH 1/6] Add gocron for scheduling health checks --- go.mod | 3 +++ go.sum | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/go.mod b/go.mod index 65a1f02b..43fb45d0 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/Masterminds/semver/v3 v3.2.0 github.com/fergusstrange/embedded-postgres v1.19.0 + github.com/go-co-op/gocron v1.18.0 github.com/google/go-cmp v0.5.9 github.com/hashicorp/go-hclog v1.4.0 github.com/hashicorp/go-plugin v1.4.8 @@ -33,12 +34,14 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.5.0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.6.0 // indirect google.golang.org/genproto v0.0.0-20230104163317-caabf589fcbf // indirect diff --git a/go.sum b/go.sum index 0f7cf922..0bb5d3d7 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v1.18.0 h1:SxTyJ5xnSN4byCq7b10LmmszFdxQlSQJod8s3gbnXxA= +github.com/go-co-op/gocron v1.18.0/go.mod h1:sD/a0Aadtw5CpflUJ/lpP9Vfdk979Wl1Sg33HPHg0FY= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= @@ -255,6 +257,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY= @@ -350,6 +354,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From af9024d102be188cae590803c88e36f141fbf50d Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 11 Jan 2023 00:29:31 +0100 Subject: [PATCH 2/6] Add an in-process scheduler to recycle timed-out connection periodically --- config/constants.go | 17 ++++++--------- network/proxy.go | 52 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/config/constants.go b/config/constants.go index 12c6da78..b450e0ca 100644 --- a/config/constants.go +++ b/config/constants.go @@ -1,9 +1,5 @@ package config -import ( - "time" -) - type ( Status uint Policy uint @@ -63,19 +59,20 @@ const ( DefaultChunkSize = 4096 DefaultReceiveDeadline = 0 // 0 means no deadline (timeout) DefaultSendDeadline = 0 - DefaultTCPKeepAlivePeriod = 30 * time.Second + DefaultTCPKeepAlivePeriod = "30s" // Pool constants. - EmptyPoolCapacity = 0 - DefaultPoolSize = 10 - MinimumPoolSize = 2 + EmptyPoolCapacity = 0 + DefaultPoolSize = 10 + MinimumPoolSize = 2 + DefaultHealthCheckPeriod = "60s" // This must match PostgreSQL authentication timeout. // Server constants. DefaultListenNetwork = "tcp" DefaultListenAddress = "0.0.0.0:15432" - DefaultTickInterval = 5 * time.Second + DefaultTickInterval = "5s" DefaultBufferSize = 1 << 24 // 16777216 bytes - DefaultTCPKeepAlive = 3 * time.Second + DefaultTCPKeepAlive = "3s" DefaultLoadBalancer = "roundrobin" // Utility constants. diff --git a/network/proxy.go b/network/proxy.go index 1e73a761..9d354d4a 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -2,11 +2,13 @@ package network import ( "context" + "time" "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin/hook" "github.com/gatewayd-io/gatewayd/pool" + "github.com/go-co-op/gocron" "github.com/panjf2000/gnet/v2" "github.com/rs/zerolog" ) @@ -25,9 +27,11 @@ type Proxy struct { busyConnections pool.IPool logger zerolog.Logger hookConfig *hook.Config + scheduler *gocron.Scheduler Elastic bool ReuseElasticClients bool + HealthCheckPeriod time.Duration // ClientConfig is used for elastic proxy and reconnection ClientConfig *config.Client @@ -39,17 +43,63 @@ var _ IProxy = &Proxy{} func NewProxy( p pool.IPool, hookConfig *hook.Config, elastic, reuseElasticClients bool, + healthCheckPeriod time.Duration, clientConfig *config.Client, logger zerolog.Logger, ) *Proxy { - return &Proxy{ + proxy := Proxy{ availableConnections: p, busyConnections: pool.NewPool(config.EmptyPoolCapacity), logger: logger, hookConfig: hookConfig, + scheduler: gocron.NewScheduler(time.UTC), Elastic: elastic, ReuseElasticClients: reuseElasticClients, ClientConfig: clientConfig, } + + if proxy.HealthCheckPeriod == 0 { + if healthCheck, err := time.ParseDuration(config.DefaultHealthCheckPeriod); err == nil { + proxy.HealthCheckPeriod = healthCheck + } else { + logger.Error().Err(err).Msg("Failed to parse the health check period") + } + } else { + proxy.HealthCheckPeriod = healthCheckPeriod + } + + startDelay := time.Now().Add(proxy.HealthCheckPeriod) + // Schedule the client health check. + if _, err := proxy.scheduler.Every(proxy.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do( + func() { + logger.Debug().Msg("Running the client health check, and might recycle connection(s).") + proxy.availableConnections.ForEach(func(_, value interface{}) bool { + if client, ok := value.(*Client); ok { + // Connection is probably dead by now. + proxy.availableConnections.Remove(client.ID) + client.Close() + // Create a new client. + client = NewClient(proxy.ClientConfig, proxy.logger) + if err := proxy.availableConnections.Put(client.ID, client); err != nil { + proxy.logger.Err(err).Msg("Failed to update the client connection") + } + } + return true + }) + }, + ); err != nil { + proxy.logger.Error().Err(err).Msg("Failed to schedule the client health check") + } + + // Start the scheduler. + proxy.scheduler.StartAsync() + logger.Debug().Fields( + map[string]interface{}{ + "startDelay": startDelay, + "healthCheckPeriod": proxy.HealthCheckPeriod.String(), + }, + ).Msg("Started the client health check scheduler") + + return &proxy } // Connect maps a server connection from the available connection pool to a incoming connection. From efca9f734b640aa10ae31a6d780654d07f90263d Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 11 Jan 2023 00:31:15 +0100 Subject: [PATCH 3/6] Apply changes to the NewProxy function throughout the code --- cmd/run.go | 13 +++++++++++-- config/types.go | 5 +++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 4c5e77dd..5a2e5c85 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -199,7 +199,7 @@ var runCmd = &cobra.Command{ } } - // Verify that the pool is properly populated + // Verify that the pool is properly populated. logger.Info().Str("count", fmt.Sprint(pool.Size())).Msg( "There are clients available in the pool") if pool.Size() != poolSize { @@ -223,12 +223,21 @@ var runCmd = &cobra.Command{ // Create a prefork proxy with the pool of clients. elastic := gConfig.Proxy[config.Default].Elastic reuseElasticClients := gConfig.Proxy[config.Default].ReuseElasticClients + healthCheckPeriod := gConfig.Proxy[config.Default].HealthCheckPeriod proxy := network.NewProxy( - pool, hooksConfig, elastic, reuseElasticClients, &clientConfig, logger) + pool, + hooksConfig, + elastic, + reuseElasticClients, + healthCheckPeriod, + &clientConfig, + logger, + ) proxyCfg := map[string]interface{}{ "elastic": elastic, "reuseElasticClients": reuseElasticClients, + "healthCheckPeriod": healthCheckPeriod.String(), "clientConfig": map[string]interface{}{ "network": clientConfig.Network, "address": clientConfig.Address, diff --git a/config/types.go b/config/types.go index 7da7650c..a512fa1c 100644 --- a/config/types.go +++ b/config/types.go @@ -61,8 +61,9 @@ type Pool struct { } type Proxy struct { - Elastic bool `koanf:"elastic"` - ReuseElasticClients bool `koanf:"reuseElasticClients"` + Elastic bool `koanf:"elastic"` + ReuseElasticClients bool `koanf:"reuseElasticClients"` + HealthCheckPeriod time.Duration `koanf:"healthCheckPeriod"` } type Server struct { From 00de34068eaeb672d2283f8693fdeac85b22c291 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 11 Jan 2023 00:31:58 +0100 Subject: [PATCH 4/6] Use string form of duration --- cmd/run.go | 18 +++++++++--------- network/client.go | 8 ++++++-- network/server.go | 8 ++++++-- plugin/utils/functions.go | 2 +- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 5a2e5c85..c89f4007 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -167,7 +167,7 @@ var runCmd = &cobra.Command{ // Get client config from the config file. clientConfig := gConfig.Clients[config.Default] - // Add clients to the pool + // Add clients to the pool. for i := 0; i < poolSize; i++ { client := network.NewClient(&clientConfig, logger) @@ -178,10 +178,10 @@ var runCmd = &cobra.Command{ "address": client.Address, "receiveBufferSize": client.ReceiveBufferSize, "receiveChunkSize": client.ReceiveChunkSize, - "receiveDeadline": client.ReceiveDeadline.Seconds(), - "sendDeadline": client.SendDeadline.Seconds(), + "receiveDeadline": client.ReceiveDeadline.String(), + "sendDeadline": client.SendDeadline.String(), "tcpKeepAlive": client.TCPKeepAlive, - "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.Seconds(), + "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(), } _, err := hooksConfig.Run( context.Background(), @@ -243,10 +243,10 @@ var runCmd = &cobra.Command{ "address": clientConfig.Address, "receiveBufferSize": clientConfig.ReceiveBufferSize, "receiveChunkSize": clientConfig.ReceiveChunkSize, - "receiveDeadline": clientConfig.ReceiveDeadline.Seconds(), - "sendDeadline": clientConfig.SendDeadline.Seconds(), + "receiveDeadline": clientConfig.ReceiveDeadline.String(), + "sendDeadline": clientConfig.SendDeadline.String(), "tcpKeepAlive": clientConfig.TCPKeepAlive, - "tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.Seconds(), + "tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.String(), }, } _, err = hooksConfig.Run( @@ -297,7 +297,7 @@ var runCmd = &cobra.Command{ "address": gConfig.Server.Address, "softLimit": gConfig.Server.SoftLimit, "hardLimit": gConfig.Server.HardLimit, - "tickInterval": gConfig.Server.TickInterval.Seconds(), + "tickInterval": gConfig.Server.TickInterval.String(), "multiCore": gConfig.Server.MultiCore, "lockOSThread": gConfig.Server.LockOSThread, "enableTicker": gConfig.Server.EnableTicker, @@ -308,7 +308,7 @@ var runCmd = &cobra.Command{ "socketSendBuffer": gConfig.Server.SocketSendBuffer, "reuseAddress": gConfig.Server.ReuseAddress, "reusePort": gConfig.Server.ReusePort, - "tcpKeepAlive": gConfig.Server.TCPKeepAlive.Seconds(), + "tcpKeepAlive": gConfig.Server.TCPKeepAlive.String(), "tcpNoDelay": gConfig.Server.TCPNoDelay, } _, err = hooksConfig.Run( diff --git a/network/client.go b/network/client.go index feb0fe99..7a149e4a 100644 --- a/network/client.go +++ b/network/client.go @@ -82,7 +82,11 @@ func NewClient(clientConfig *config.Client, logger zerolog.Logger) *Client { // Set the TCP keep alive. client.TCPKeepAlive = clientConfig.TCPKeepAlive if clientConfig.TCPKeepAlivePeriod <= 0 { - client.TCPKeepAlivePeriod = config.DefaultTCPKeepAlivePeriod + if keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod); err == nil { + client.TCPKeepAlivePeriod = keepAlive + } else { + logger.Error().Err(err).Msg("Failed to parse TCP keep alive period") + } } else { client.TCPKeepAlivePeriod = clientConfig.TCPKeepAlivePeriod } @@ -105,7 +109,7 @@ func NewClient(clientConfig *config.Client, logger zerolog.Logger) *Client { if err := client.Conn.SetReadDeadline(time.Now().Add(client.ReceiveDeadline)); err != nil { logger.Error().Err(err).Msg("Failed to set receive deadline") } else { - logger.Debug().Str("duration", fmt.Sprint(client.ReceiveDeadline.Seconds())).Msg( + logger.Debug().Str("duration", fmt.Sprint(client.ReceiveDeadline.String())).Msg( "Set receive deadline") } } diff --git a/network/server.go b/network/server.go index fdbb1d37..5c725a32 100644 --- a/network/server.go +++ b/network/server.go @@ -382,8 +382,12 @@ func NewServer( } if tickInterval == 0 { - server.TickInterval = config.DefaultTickInterval - logger.Debug().Msg("Tick interval is not set, using the default value") + if tickInterval, err := time.ParseDuration(config.DefaultTickInterval); err == nil { + server.TickInterval = tickInterval + logger.Debug().Msg("Tick interval is not set, using the default value") + } else { + logger.Error().Err(err).Msg("Failed to parse the default tick interval") + } } else { server.TickInterval = tickInterval } diff --git a/plugin/utils/functions.go b/plugin/utils/functions.go index bf06f57c..c9ecf5f5 100644 --- a/plugin/utils/functions.go +++ b/plugin/utils/functions.go @@ -72,7 +72,7 @@ func CastToPrimitiveTypes(args map[string]interface{}) map[string]interface{} { for key, value := range args { switch value := value.(type) { case time.Duration: - args[key] = value.Seconds() + args[key] = value.String() // TODO: Add more types here as needed. default: args[key] = value From 50eefe3cafa42057558c07b2ee6a42db5f3d6fee Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 11 Jan 2023 00:32:34 +0100 Subject: [PATCH 5/6] Update tests to reflect new changes to health check scheduler and duration parsing --- network/client_test.go | 22 ++++++++++++++++++---- network/proxy_test.go | 42 +++++++++++++++++++++++++++++------------- network/server_test.go | 24 +++++++++++++++++++----- 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/network/client_test.go b/network/client_test.go index facfb0ba..f54ff487 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -2,12 +2,14 @@ package network import ( "testing" + "time" embeddedpostgres "github.com/fergusstrange/embedded-postgres" "github.com/gatewayd-io/gatewayd/config" "github.com/gatewayd-io/gatewayd/logging" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestNewClient tests the NewClient function. @@ -32,6 +34,9 @@ func TestNewClient(t *testing.T) { logger := logging.NewLogger(cfg) + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + client := NewClient( &config.Client{ Network: "tcp", @@ -41,7 +46,7 @@ func TestNewClient(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, }, logger) defer client.Close() @@ -75,6 +80,9 @@ func TestSend(t *testing.T) { logger := logging.NewLogger(cfg) + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + client := NewClient( &config.Client{ Network: "tcp", @@ -84,7 +92,7 @@ func TestSend(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, }, logger) defer client.Close() @@ -118,6 +126,9 @@ func TestReceive(t *testing.T) { logger := logging.NewLogger(cfg) + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + client := NewClient( &config.Client{ Network: "tcp", @@ -127,7 +138,7 @@ func TestReceive(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, }, logger) defer client.Close() @@ -171,6 +182,9 @@ func TestClose(t *testing.T) { logger := logging.NewLogger(cfg) + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + client := NewClient( &config.Client{ Network: "tcp", @@ -180,7 +194,7 @@ func TestClose(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, }, logger) assert.NotNil(t, client) diff --git a/network/proxy_test.go b/network/proxy_test.go index 639a1649..000d3eed 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -2,6 +2,7 @@ package network import ( "testing" + "time" embeddedpostgres "github.com/fergusstrange/embedded-postgres" "github.com/gatewayd-io/gatewayd/config" @@ -10,6 +11,7 @@ import ( "github.com/gatewayd-io/gatewayd/pool" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestNewProxy tests the creation of a new proxy with a fixed connection pool. @@ -36,6 +38,10 @@ func TestNewProxy(t *testing.T) { // Create a connection pool pool := pool.NewPool(config.EmptyPoolCapacity) + + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + client := NewClient( &config.Client{ Network: "tcp", @@ -45,14 +51,17 @@ func TestNewProxy(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, }, logger) - err := pool.Put(client.ID, client) + err = pool.Put(client.ID, client) assert.Nil(t, err) + healthCheck, err := time.ParseDuration(config.DefaultHealthCheckPeriod) + require.NoError(t, err) + // Create a proxy with a fixed buffer pool - proxy := NewProxy(pool, hook.NewHookConfig(), false, false, nil, logger) + proxy := NewProxy(pool, hook.NewHookConfig(), false, false, healthCheck, nil, logger) assert.NotNil(t, proxy) assert.Equal(t, 0, proxy.busyConnections.Size(), "Proxy should have no connected clients") @@ -80,17 +89,24 @@ func TestNewProxyElastic(t *testing.T) { // Create a connection pool pool := pool.NewPool(config.EmptyPoolCapacity) + healthCheck, err := time.ParseDuration(config.DefaultHealthCheckPeriod) + require.NoError(t, err) + + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + // Create a proxy with an elastic buffer pool - proxy := NewProxy(pool, hook.NewHookConfig(), true, false, &config.Client{ - Network: "tcp", - Address: "localhost:5432", - ReceiveBufferSize: config.DefaultBufferSize, - ReceiveChunkSize: config.DefaultChunkSize, - ReceiveDeadline: config.DefaultReceiveDeadline, - SendDeadline: config.DefaultSendDeadline, - TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, - }, logger) + proxy := NewProxy(pool, hook.NewHookConfig(), true, false, healthCheck, + &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: keepAlive, + }, logger) assert.NotNil(t, proxy) assert.Equal(t, 0, proxy.busyConnections.Size()) diff --git a/network/server_test.go b/network/server_test.go index a72f4dd7..3c38549c 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "errors" "testing" + "time" embeddedpostgres "github.com/fergusstrange/embedded-postgres" "github.com/gatewayd-io/gatewayd/config" @@ -14,6 +15,7 @@ import ( "github.com/panjf2000/gnet/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/structpb" ) @@ -94,6 +96,9 @@ func TestRunServer(t *testing.T) { } hooksConfig.Add(hook.OnTrafficFromServer, 1, onTrafficFromServer) + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + clientConfig := config.Client{ Network: "tcp", Address: "localhost:5432", @@ -102,20 +107,26 @@ func TestRunServer(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, } // Create a connection pool. pool := pool.NewPool(2) client1 := NewClient(&clientConfig, logger) - err := pool.Put(client1.ID, client1) + err = pool.Put(client1.ID, client1) assert.Nil(t, err) client2 := NewClient(&clientConfig, logger) err = pool.Put(client2.ID, client2) assert.Nil(t, err) + healthCheck, err := time.ParseDuration(config.DefaultHealthCheckPeriod) + require.NoError(t, err) + // Create a proxy with a fixed buffer pool. - proxy := NewProxy(pool, hooksConfig, false, false, &clientConfig, logger) + proxy := NewProxy(pool, hooksConfig, false, false, healthCheck, &clientConfig, logger) + + tickInterval, err := time.ParseDuration(config.DefaultTickInterval) + require.NoError(t, err) // Create a server. server := NewServer( @@ -123,7 +134,7 @@ func TestRunServer(t *testing.T) { "127.0.0.1:15432", 0, 0, - config.DefaultTickInterval, + tickInterval, []gnet.Option{ gnet.WithMulticore(false), gnet.WithReuseAddr(true), @@ -146,6 +157,9 @@ func TestRunServer(t *testing.T) { go func(t *testing.T, server *Server, errs chan error) { for { if server.IsRunning() { + keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod) + require.NoError(t, err) + client := NewClient( &config.Client{ Network: "tcp", @@ -155,7 +169,7 @@ func TestRunServer(t *testing.T) { ReceiveDeadline: config.DefaultReceiveDeadline, SendDeadline: config.DefaultSendDeadline, TCPKeepAlive: false, - TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + TCPKeepAlivePeriod: keepAlive, }, logger) From 52f9d8274e052d73fecd611a263355325ba83b0e Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Wed, 11 Jan 2023 00:34:31 +0100 Subject: [PATCH 6/6] Fix linter issue with short var name --- network/proxy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index 9d354d4a..373186ec 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -41,13 +41,13 @@ var _ IProxy = &Proxy{} // NewProxy creates a new proxy. func NewProxy( - p pool.IPool, hookConfig *hook.Config, + connPool pool.IPool, hookConfig *hook.Config, elastic, reuseElasticClients bool, healthCheckPeriod time.Duration, clientConfig *config.Client, logger zerolog.Logger, ) *Proxy { proxy := Proxy{ - availableConnections: p, + availableConnections: connPool, busyConnections: pool.NewPool(config.EmptyPoolCapacity), logger: logger, hookConfig: hookConfig,