Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ var runCmd = &cobra.Command{
// Get client config from the config file.
clientConfig := gConfig.Clients[config.Default]

// Add clients to the pool
// Add clients to the pool.
for i := 0; i < poolSize; i++ {
client := network.NewClient(&clientConfig, logger)

Expand All @@ -178,10 +178,10 @@ var runCmd = &cobra.Command{
"address": client.Address,
"receiveBufferSize": client.ReceiveBufferSize,
"receiveChunkSize": client.ReceiveChunkSize,
"receiveDeadline": client.ReceiveDeadline.Seconds(),
"sendDeadline": client.SendDeadline.Seconds(),
"receiveDeadline": client.ReceiveDeadline.String(),
"sendDeadline": client.SendDeadline.String(),
"tcpKeepAlive": client.TCPKeepAlive,
"tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.Seconds(),
"tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(),
}
_, err := hooksConfig.Run(
context.Background(),
Expand All @@ -199,7 +199,7 @@ var runCmd = &cobra.Command{
}
}

// Verify that the pool is properly populated
// Verify that the pool is properly populated.
logger.Info().Str("count", fmt.Sprint(pool.Size())).Msg(
"There are clients available in the pool")
if pool.Size() != poolSize {
Expand All @@ -223,21 +223,30 @@ var runCmd = &cobra.Command{
// Create a prefork proxy with the pool of clients.
elastic := gConfig.Proxy[config.Default].Elastic
reuseElasticClients := gConfig.Proxy[config.Default].ReuseElasticClients
healthCheckPeriod := gConfig.Proxy[config.Default].HealthCheckPeriod
proxy := network.NewProxy(
pool, hooksConfig, elastic, reuseElasticClients, &clientConfig, logger)
pool,
hooksConfig,
elastic,
reuseElasticClients,
healthCheckPeriod,
&clientConfig,
logger,
)

proxyCfg := map[string]interface{}{
"elastic": elastic,
"reuseElasticClients": reuseElasticClients,
"healthCheckPeriod": healthCheckPeriod.String(),
"clientConfig": map[string]interface{}{
"network": clientConfig.Network,
"address": clientConfig.Address,
"receiveBufferSize": clientConfig.ReceiveBufferSize,
"receiveChunkSize": clientConfig.ReceiveChunkSize,
"receiveDeadline": clientConfig.ReceiveDeadline.Seconds(),
"sendDeadline": clientConfig.SendDeadline.Seconds(),
"receiveDeadline": clientConfig.ReceiveDeadline.String(),
"sendDeadline": clientConfig.SendDeadline.String(),
"tcpKeepAlive": clientConfig.TCPKeepAlive,
"tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.Seconds(),
"tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.String(),
},
}
_, err = hooksConfig.Run(
Expand Down Expand Up @@ -288,7 +297,7 @@ var runCmd = &cobra.Command{
"address": gConfig.Server.Address,
"softLimit": gConfig.Server.SoftLimit,
"hardLimit": gConfig.Server.HardLimit,
"tickInterval": gConfig.Server.TickInterval.Seconds(),
"tickInterval": gConfig.Server.TickInterval.String(),
"multiCore": gConfig.Server.MultiCore,
"lockOSThread": gConfig.Server.LockOSThread,
"enableTicker": gConfig.Server.EnableTicker,
Expand All @@ -299,7 +308,7 @@ var runCmd = &cobra.Command{
"socketSendBuffer": gConfig.Server.SocketSendBuffer,
"reuseAddress": gConfig.Server.ReuseAddress,
"reusePort": gConfig.Server.ReusePort,
"tcpKeepAlive": gConfig.Server.TCPKeepAlive.Seconds(),
"tcpKeepAlive": gConfig.Server.TCPKeepAlive.String(),
"tcpNoDelay": gConfig.Server.TCPNoDelay,
}
_, err = hooksConfig.Run(
Expand Down
17 changes: 7 additions & 10 deletions config/constants.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package config

import (
"time"
)

type (
Status uint
Policy uint
Expand Down Expand Up @@ -63,19 +59,20 @@ const (
DefaultChunkSize = 4096
DefaultReceiveDeadline = 0 // 0 means no deadline (timeout)
DefaultSendDeadline = 0
DefaultTCPKeepAlivePeriod = 30 * time.Second
DefaultTCPKeepAlivePeriod = "30s"

// Pool constants.
EmptyPoolCapacity = 0
DefaultPoolSize = 10
MinimumPoolSize = 2
EmptyPoolCapacity = 0
DefaultPoolSize = 10
MinimumPoolSize = 2
DefaultHealthCheckPeriod = "60s" // This must match PostgreSQL authentication timeout.

// Server constants.
DefaultListenNetwork = "tcp"
DefaultListenAddress = "0.0.0.0:15432"
DefaultTickInterval = 5 * time.Second
DefaultTickInterval = "5s"
DefaultBufferSize = 1 << 24 // 16777216 bytes
DefaultTCPKeepAlive = 3 * time.Second
DefaultTCPKeepAlive = "3s"
DefaultLoadBalancer = "roundrobin"

// Utility constants.
Expand Down
5 changes: 3 additions & 2 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type Pool struct {
}

type Proxy struct {
Elastic bool `koanf:"elastic"`
ReuseElasticClients bool `koanf:"reuseElasticClients"`
Elastic bool `koanf:"elastic"`
ReuseElasticClients bool `koanf:"reuseElasticClients"`
HealthCheckPeriod time.Duration `koanf:"healthCheckPeriod"`
}

type Server struct {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/fergusstrange/embedded-postgres v1.19.0
github.com/go-co-op/gocron v1.18.0
github.com/google/go-cmp v0.5.9
github.com/hashicorp/go-hclog v1.4.0
github.com/hashicorp/go-plugin v1.4.8
Expand Down Expand Up @@ -33,12 +34,14 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20230104163317-caabf589fcbf // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-co-op/gocron v1.18.0 h1:SxTyJ5xnSN4byCq7b10LmmszFdxQlSQJod8s3gbnXxA=
github.com/go-co-op/gocron v1.18.0/go.mod h1:sD/a0Aadtw5CpflUJ/lpP9Vfdk979Wl1Sg33HPHg0FY=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
Expand Down Expand Up @@ -255,6 +257,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY=
Expand Down Expand Up @@ -350,6 +354,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
8 changes: 6 additions & 2 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func NewClient(clientConfig *config.Client, logger zerolog.Logger) *Client {
// Set the TCP keep alive.
client.TCPKeepAlive = clientConfig.TCPKeepAlive
if clientConfig.TCPKeepAlivePeriod <= 0 {
client.TCPKeepAlivePeriod = config.DefaultTCPKeepAlivePeriod
if keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod); err == nil {
client.TCPKeepAlivePeriod = keepAlive
} else {
logger.Error().Err(err).Msg("Failed to parse TCP keep alive period")
}
} else {
client.TCPKeepAlivePeriod = clientConfig.TCPKeepAlivePeriod
}
Expand All @@ -105,7 +109,7 @@ func NewClient(clientConfig *config.Client, logger zerolog.Logger) *Client {
if err := client.Conn.SetReadDeadline(time.Now().Add(client.ReceiveDeadline)); err != nil {
logger.Error().Err(err).Msg("Failed to set receive deadline")
} else {
logger.Debug().Str("duration", fmt.Sprint(client.ReceiveDeadline.Seconds())).Msg(
logger.Debug().Str("duration", fmt.Sprint(client.ReceiveDeadline.String())).Msg(
"Set receive deadline")
}
}
Expand Down
22 changes: 18 additions & 4 deletions network/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package network

import (
"testing"
"time"

embeddedpostgres "github.com/fergusstrange/embedded-postgres"
"github.com/gatewayd-io/gatewayd/config"
"github.com/gatewayd-io/gatewayd/logging"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestNewClient tests the NewClient function.
Expand All @@ -32,6 +34,9 @@ func TestNewClient(t *testing.T) {

logger := logging.NewLogger(cfg)

keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod)
require.NoError(t, err)

client := NewClient(
&config.Client{
Network: "tcp",
Expand All @@ -41,7 +46,7 @@ func TestNewClient(t *testing.T) {
ReceiveDeadline: config.DefaultReceiveDeadline,
SendDeadline: config.DefaultSendDeadline,
TCPKeepAlive: false,
TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod,
TCPKeepAlivePeriod: keepAlive,
},
logger)
defer client.Close()
Expand Down Expand Up @@ -75,6 +80,9 @@ func TestSend(t *testing.T) {

logger := logging.NewLogger(cfg)

keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod)
require.NoError(t, err)

client := NewClient(
&config.Client{
Network: "tcp",
Expand All @@ -84,7 +92,7 @@ func TestSend(t *testing.T) {
ReceiveDeadline: config.DefaultReceiveDeadline,
SendDeadline: config.DefaultSendDeadline,
TCPKeepAlive: false,
TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod,
TCPKeepAlivePeriod: keepAlive,
},
logger)
defer client.Close()
Expand Down Expand Up @@ -118,6 +126,9 @@ func TestReceive(t *testing.T) {

logger := logging.NewLogger(cfg)

keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod)
require.NoError(t, err)

client := NewClient(
&config.Client{
Network: "tcp",
Expand All @@ -127,7 +138,7 @@ func TestReceive(t *testing.T) {
ReceiveDeadline: config.DefaultReceiveDeadline,
SendDeadline: config.DefaultSendDeadline,
TCPKeepAlive: false,
TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod,
TCPKeepAlivePeriod: keepAlive,
},
logger)
defer client.Close()
Expand Down Expand Up @@ -171,6 +182,9 @@ func TestClose(t *testing.T) {

logger := logging.NewLogger(cfg)

keepAlive, err := time.ParseDuration(config.DefaultTCPKeepAlivePeriod)
require.NoError(t, err)

client := NewClient(
&config.Client{
Network: "tcp",
Expand All @@ -180,7 +194,7 @@ func TestClose(t *testing.T) {
ReceiveDeadline: config.DefaultReceiveDeadline,
SendDeadline: config.DefaultSendDeadline,
TCPKeepAlive: false,
TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod,
TCPKeepAlivePeriod: keepAlive,
},
logger)
assert.NotNil(t, client)
Expand Down
56 changes: 53 additions & 3 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package network

import (
"context"
"time"

"github.com/gatewayd-io/gatewayd/config"
gerr "github.com/gatewayd-io/gatewayd/errors"
"github.com/gatewayd-io/gatewayd/plugin/hook"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/go-co-op/gocron"
"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
)
Expand All @@ -25,9 +27,11 @@ type Proxy struct {
busyConnections pool.IPool
logger zerolog.Logger
hookConfig *hook.Config
scheduler *gocron.Scheduler

Elastic bool
ReuseElasticClients bool
HealthCheckPeriod time.Duration

// ClientConfig is used for elastic proxy and reconnection
ClientConfig *config.Client
Expand All @@ -37,19 +41,65 @@ var _ IProxy = &Proxy{}

// NewProxy creates a new proxy.
func NewProxy(
p pool.IPool, hookConfig *hook.Config,
connPool pool.IPool, hookConfig *hook.Config,
elastic, reuseElasticClients bool,
healthCheckPeriod time.Duration,
clientConfig *config.Client, logger zerolog.Logger,
) *Proxy {
return &Proxy{
availableConnections: p,
proxy := Proxy{
availableConnections: connPool,
busyConnections: pool.NewPool(config.EmptyPoolCapacity),
logger: logger,
hookConfig: hookConfig,
scheduler: gocron.NewScheduler(time.UTC),
Elastic: elastic,
ReuseElasticClients: reuseElasticClients,
ClientConfig: clientConfig,
}

if proxy.HealthCheckPeriod == 0 {
if healthCheck, err := time.ParseDuration(config.DefaultHealthCheckPeriod); err == nil {
proxy.HealthCheckPeriod = healthCheck
} else {
logger.Error().Err(err).Msg("Failed to parse the health check period")
}
} else {
proxy.HealthCheckPeriod = healthCheckPeriod
}

startDelay := time.Now().Add(proxy.HealthCheckPeriod)
// Schedule the client health check.
if _, err := proxy.scheduler.Every(proxy.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do(
func() {
logger.Debug().Msg("Running the client health check, and might recycle connection(s).")
proxy.availableConnections.ForEach(func(_, value interface{}) bool {
if client, ok := value.(*Client); ok {
// Connection is probably dead by now.
proxy.availableConnections.Remove(client.ID)
client.Close()
// Create a new client.
client = NewClient(proxy.ClientConfig, proxy.logger)
if err := proxy.availableConnections.Put(client.ID, client); err != nil {
proxy.logger.Err(err).Msg("Failed to update the client connection")
}
}
return true
})
},
); err != nil {
proxy.logger.Error().Err(err).Msg("Failed to schedule the client health check")
}

// Start the scheduler.
proxy.scheduler.StartAsync()
logger.Debug().Fields(
map[string]interface{}{
"startDelay": startDelay,
"healthCheckPeriod": proxy.HealthCheckPeriod.String(),
},
).Msg("Started the client health check scheduler")

return &proxy
}

// Connect maps a server connection from the available connection pool to a incoming connection.
Expand Down
Loading