From 2821d446918e2ab5d9e98338ea907cec9effc892 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 18 Dec 2022 23:49:35 +0100 Subject: [PATCH 01/28] Fix bug in setting the default value for client.ReceiveBufferSize --- network/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/network/client.go b/network/client.go index 1ac2200d..2f81716c 100644 --- a/network/client.go +++ b/network/client.go @@ -58,9 +58,12 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo } client.Conn = conn - if client.ReceiveBufferSize == 0 { + if receiveBufferSize <= 0 { client.ReceiveBufferSize = DefaultBufferSize + } else { + client.ReceiveBufferSize = receiveBufferSize } + logger.Debug().Msgf("New client created: %s", client.Address) client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger) From db8ac1399d6d0a375cea607131140d52aedee0de Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 18 Dec 2022 23:51:55 +0100 Subject: [PATCH 02/28] Return the amount of data sent to the server (database) --- network/client.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/network/client.go b/network/client.go index 2f81716c..368b017e 100644 --- a/network/client.go +++ b/network/client.go @@ -70,13 +70,15 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo return &client } -func (c *Client) Send(data []byte) error { - if _, err := c.Write(data); err != nil { +func (c *Client) Send(data []byte) (int, error) { + if sent, err := c.Conn.Write(data); err != nil { c.logger.Error().Err(err).Msgf("Couldn't send data to the server: %s", err) - return fmt.Errorf("couldn't send data to the server: %w", err) + // TODO: Wrap the original error + return 0, gerr.ErrClientSendFailed + } else { + c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address) + return sent, nil } - c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address) - return nil } func (c *Client) Receive() (int, []byte, error) { From 03827c79d9fbf1af3a6492e596bb659d29a7ec5f Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 18 Dec 2022 23:53:07 +0100 Subject: [PATCH 03/28] Handle io.EOF when reading from the server (database) --- network/client.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/network/client.go b/network/client.go index 368b017e..66a9e567 100644 --- a/network/client.go +++ b/network/client.go @@ -83,13 +83,12 @@ func (c *Client) Send(data []byte) (int, error) { func (c *Client) Receive() (int, []byte, error) { buf := make([]byte, c.ReceiveBufferSize) - read, err := c.Read(buf) - if err != nil { - c.logger.Error().Err(err).Msgf("Couldn't receive data from the server: %s", err) - return 0, nil, fmt.Errorf("couldn't receive data from the server: %w", err) + received, err := c.Conn.Read(buf) + if err != nil && err != io.EOF { + c.logger.Error().Err(err).Msg("Couldn't receive data from the server") + return 0, nil, err } - c.logger.Debug().Msgf("Received %d bytes from %s", read, c.Address) - return read, buf, nil + return received, buf, err } func (c *Client) Close() { From 7fe1244ed4999549f4536940c14ad81f92295aaf Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 18 Dec 2022 23:53:53 +0100 Subject: [PATCH 04/28] Don't reset the receive buffer size on close --- network/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/network/client.go b/network/client.go index 66a9e567..a497c4e1 100644 --- a/network/client.go +++ b/network/client.go @@ -100,5 +100,4 @@ func (c *Client) Close() { c.Conn = nil c.Address = "" c.Network = "" - c.ReceiveBufferSize = 0 } From c6102a3719c7af9d9ca30a9e170494ca13c4f821 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 18 Dec 2022 23:54:36 +0100 Subject: [PATCH 05/28] Add Client.IsConnected function to check if the client's underlying connection is alive --- network/client.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/network/client.go b/network/client.go index a497c4e1..5e2d12df 100644 --- a/network/client.go +++ b/network/client.go @@ -1,9 +1,11 @@ package network import ( - "fmt" + "errors" + "io" "net" + gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/rs/zerolog" ) @@ -101,3 +103,26 @@ func (c *Client) Close() { c.Address = "" c.Network = "" } + +// Go returns io.EOF when the server closes the connection. +// So, if I read 0 bytes and the error is io.EOF or net.ErrClosed, I should reconnect. +func (c *Client) IsConnected() bool { + if c == nil { + return false + } + + if c != nil && c.Conn == nil || c.ID == "" { + c.Close() + return false + } + + buf := make([]byte, 0) + _, err := c.Read(buf) + if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + c.logger.Debug().Msgf("Connection to %s is closed", c.Address) + c.Close() + return false + } + + return true +} From 8f57718d80505a6ab3c8f2fcadb9275c9e80d9eb Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:03:02 +0100 Subject: [PATCH 06/28] Fix tests for client.go --- network/client_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/network/client_test.go b/network/client_test.go index 3ae2b697..d30e1a09 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -65,8 +65,10 @@ func TestSend(t *testing.T) { defer client.Close() assert.NotNil(t, client) - err := client.Send(CreatePostgreSQLPacket('Q', []byte("select 1;"))) + packet := CreatePostgreSQLPacket('Q', []byte("select 1;")) + sent, err := client.Send(packet) assert.Nil(t, err) + assert.Equal(t, len(packet), sent) } func TestReceive(t *testing.T) { @@ -94,8 +96,10 @@ func TestReceive(t *testing.T) { defer client.Close() assert.NotNil(t, client) - err := client.Send(CreatePgStartupPacket()) + packet := CreatePgStartupPacket() + sent, err := client.Send(packet) assert.Nil(t, err) + assert.Equal(t, len(packet), sent) size, data, err := client.Receive() msg := "\x00\x00\x00\x03" @@ -136,5 +140,5 @@ func TestClose(t *testing.T) { assert.Equal(t, "", client.Network) assert.Equal(t, "", client.Address) assert.Nil(t, client.Conn) - assert.Equal(t, 0, client.ReceiveBufferSize) + assert.Equal(t, DefaultBufferSize, client.ReceiveBufferSize) } From 2ef25c447eefe51a10b31a8a33eaeb758798101b Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:03:56 +0100 Subject: [PATCH 07/28] Set default buffer size to a larger value: 16 MB --- network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/server.go b/network/server.go index c91fa310..a4ab5391 100644 --- a/network/server.go +++ b/network/server.go @@ -21,7 +21,7 @@ const ( DefaultTickInterval = 5 * time.Second DefaultPoolSize = 10 MinimumPoolSize = 2 - DefaultBufferSize = 4096 + DefaultBufferSize = 1 << 24 // 16777216 bytes ) type Server struct { From dd01896e115fca25d035b4f116fb43b4e9dfac1f Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:06:18 +0100 Subject: [PATCH 08/28] Set default value for read/write buffer caps and socket buffer lengths --- cmd/config_parser.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/cmd/config_parser.go b/cmd/config_parser.go index fdd29585..bdb341c2 100644 --- a/cmd/config_parser.go +++ b/cmd/config_parser.go @@ -181,6 +181,26 @@ func getTCPNoDelay() gnet.TCPSocketOpt { } func serverConfig() *ServerConfig { + readBufferCap := globalConfig.Int("server.readBufferCap") + if readBufferCap <= 0 { + readBufferCap = network.DefaultBufferSize + } + + writeBufferCap := globalConfig.Int("server.writeBufferCap") + if writeBufferCap <= 0 { + writeBufferCap = network.DefaultBufferSize + } + + socketRecvBuffer := globalConfig.Int("server.socketRecvBuffer") + if socketRecvBuffer <= 0 { + socketRecvBuffer = network.DefaultBufferSize + } + + socketSendBuffer := globalConfig.Int("server.socketSendBuffer") + if socketSendBuffer <= 0 { + socketSendBuffer = network.DefaultBufferSize + } + return &ServerConfig{ Network: globalConfig.String("server.network"), Address: globalConfig.String("server.address"), @@ -191,10 +211,10 @@ func serverConfig() *ServerConfig { MultiCore: globalConfig.Bool("server.multiCore"), LockOSThread: globalConfig.Bool("server.lockOSThread"), LoadBalancer: getLoadBalancer(globalConfig.String("server.loadBalancer")), - ReadBufferCap: globalConfig.Int("server.readBufferCap"), - WriteBufferCap: globalConfig.Int("server.writeBufferCap"), - SocketRecvBuffer: globalConfig.Int("server.socketRecvBuffer"), - SocketSendBuffer: globalConfig.Int("server.socketSendBuffer"), + ReadBufferCap: readBufferCap, + WriteBufferCap: writeBufferCap, + SocketRecvBuffer: socketRecvBuffer, + SocketSendBuffer: socketSendBuffer, ReuseAddress: globalConfig.Bool("server.reuseAddress"), ReusePort: globalConfig.Bool("server.reusePort"), TCPKeepAlive: globalConfig.Duration("server.tcpKeepAlive"), From 4454af8d354111f8fcceba882dbc13d1a106b24a Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:07:25 +0100 Subject: [PATCH 09/28] Update dependencies to latest versions --- go.mod | 8 ++++---- go.sum | 12 ++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index d8bd6539..e81c0ed6 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,11 @@ require ( github.com/hashicorp/go-plugin v1.4.8 github.com/knadh/koanf v1.4.4 github.com/mitchellh/mapstructure v1.5.0 - github.com/panjf2000/gnet/v2 v2.2.0 + github.com/panjf2000/gnet/v2 v2.2.1 github.com/rs/zerolog v1.28.0 github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.1 - golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 + golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 ) @@ -21,7 +21,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -36,7 +36,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.8.0 // indirect + go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.4.0 // indirect golang.org/x/sys v0.3.0 // indirect diff --git a/go.sum b/go.sum index 2da9cb9b..09fffbda 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ github.com/fergusstrange/embedded-postgres v1.19.0 h1:NqDufJHeA03U7biULlPHZ0pZ10 github.com/fergusstrange/embedded-postgres v1.19.0/go.mod h1:0B+3bPsMvcNgR9nN+bdM2x9YaNYDnf3ksUqYp1OAub0= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -222,8 +224,11 @@ github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/panjf2000/ants/v2 v2.4.8 h1:JgTbolX6K6RreZ4+bfctI0Ifs+3mrE5BIHudQxUDQ9k= github.com/panjf2000/ants/v2 v2.4.8/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/panjf2000/ants/v2 v2.7.0 h1:Y3Bgpfo9HDkBoHNVFbMfY5mAvi5TAA17y3HbzQ74p5Y= github.com/panjf2000/gnet/v2 v2.2.0 h1:+6itXhRlHJpv5UGAyN1DebHzK1l0GbZMOsg2Spb1VS0= github.com/panjf2000/gnet/v2 v2.2.0/go.mod h1:unWr2B4jF0DQPJH3GsXBGQiDcAamM6+Pf5FiK705kc4= +github.com/panjf2000/gnet/v2 v2.2.1 h1:HJVK3vmD6rBgOeTnYkG4czW6jphVHygxLLWTEBU3nqU= +github.com/panjf2000/gnet/v2 v2.2.1/go.mod h1:y8xWR1EEK6pGDuAQ6XULY/WWmPv0Pgbsq2Q4lbXJ6JA= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= @@ -304,6 +309,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= @@ -316,6 +323,10 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 h1:OvjRkcNHnf6/W5FZXSxODbxwD+X7fspczG7Jn/xQVD4= golang.org/x/exp v0.0.0-20221212164502-fae10dda9338/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20221215174704-0915cd710c24 h1:6w3iSY8IIkp5OQtbYj8NeuKG1jS9d+kYaubXqsoOiQ8= +golang.org/x/exp v0.0.0-20221215174704-0915cd710c24/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w= +golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -391,6 +402,7 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= From 397720fb939e39f69402a0ff71ef59c2f08d16b6 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:11:11 +0100 Subject: [PATCH 10/28] Add cap'd pool Don't swallow errors Add Cap getter function Return ErrPoolExhausted error if the pool is cap'd and the capacity is full NewPool now accepts either an EmptyPoolCapacity (0) or a positive integer to make an uncap'd or cap'd pool --- pool/pool.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index a823ccfa..c6585209 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -2,6 +2,12 @@ package pool import ( "sync" + + gerr "github.com/gatewayd-io/gatewayd/errors" +) + +const ( + EmptyPoolCapacity = 0 ) type Callback func(key, value interface{}) bool @@ -9,17 +15,19 @@ type Callback func(key, value interface{}) bool type Pool interface { ForEach(Callback) Pool() *sync.Map - Put(key, value interface{}) + Put(key, value interface{}) error Get(key interface{}) interface{} - GetOrPut(key, value interface{}) (interface{}, bool) + GetOrPut(key, value interface{}) (interface{}, bool, error) Pop(key interface{}) interface{} Remove(key interface{}) Size() int Clear() + Cap() int } type Impl struct { pool sync.Map + cap int } var _ Pool = &Impl{} @@ -32,32 +40,46 @@ func (p *Impl) Pool() *sync.Map { return &p.pool } -func (p *Impl) Put(key, value interface{}) { +func (p *Impl) Put(key, value interface{}) error { + if p.cap > 0 && p.Size() >= p.cap { + return gerr.ErrPoolExhausted + } p.pool.Store(key, value) + return nil } func (p *Impl) Get(key interface{}) interface{} { if value, ok := p.pool.Load(key); ok { return value } - return nil } -func (p *Impl) GetOrPut(key, value interface{}) (interface{}, bool) { - return p.pool.LoadOrStore(key, value) +func (p *Impl) GetOrPut(key, value interface{}) (interface{}, bool, error) { + if p.cap > 0 && p.Size() >= p.cap { + return nil, false, gerr.ErrPoolExhausted + } + val, loaded := p.pool.LoadOrStore(key, value) + return val, loaded, nil } func (p *Impl) Pop(key interface{}) interface{} { + if p.Size() == 0 { + return nil + } if value, ok := p.pool.LoadAndDelete(key); ok { return value } - return nil } func (p *Impl) Remove(key interface{}) { - p.pool.Delete(key) + if p.Size() == 0 { + return + } + if _, ok := p.pool.Load(key); ok { + p.pool.Delete(key) + } } func (p *Impl) Size() int { @@ -74,6 +96,10 @@ func (p *Impl) Clear() { p.pool = sync.Map{} } -func NewPool() *Impl { - return &Impl{pool: sync.Map{}} +func (p *Impl) Cap() int { + return p.cap +} + +func NewPool(cap int) *Impl { + return &Impl{pool: sync.Map{}, cap: cap} } From 98053bb53286f67a45930f3f8c18a3474a5d5970 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:14:48 +0100 Subject: [PATCH 11/28] Fix tests for pool.go --- pool/pool_test.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pool/pool_test.go b/pool/pool_test.go index 34a8ac0e..926656c7 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -7,7 +7,7 @@ import ( ) func TestNewPool(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -15,7 +15,7 @@ func TestNewPool(t *testing.T) { } func TestPool_Put(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -28,7 +28,7 @@ func TestPool_Put(t *testing.T) { //nolint:dupl func TestPool_Pop(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -52,7 +52,7 @@ func TestPool_Pop(t *testing.T) { } func TestPool_Clear(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -66,7 +66,7 @@ func TestPool_Clear(t *testing.T) { } func TestPool_ForEach(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -85,7 +85,7 @@ func TestPool_ForEach(t *testing.T) { //nolint:dupl func TestPool_Get(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -109,7 +109,7 @@ func TestPool_Get(t *testing.T) { } func TestPool_GetOrPut(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -118,7 +118,7 @@ func TestPool_GetOrPut(t *testing.T) { assert.Equal(t, 1, pool.Size()) pool.Put("client2.ID", "client2") assert.Equal(t, 2, pool.Size()) - c1, loaded := pool.GetOrPut("client1.ID", "client1") + c1, loaded, err := pool.GetOrPut("client1.ID", "client1") assert.True(t, loaded) if c1, ok := c1.(string); !ok { assert.Equal(t, c1, "client1") @@ -126,7 +126,8 @@ func TestPool_GetOrPut(t *testing.T) { assert.Equal(t, "client1", c1) assert.Equal(t, 2, pool.Size()) } - c2, loaded := pool.GetOrPut("client2.ID", "client2") + assert.Nil(t, err) + c2, loaded, err := pool.GetOrPut("client2.ID", "client2") assert.True(t, loaded) if c2, ok := c2.(string); !ok { assert.Equal(t, c2, "client2") @@ -134,10 +135,11 @@ func TestPool_GetOrPut(t *testing.T) { assert.Equal(t, "client2", c2) assert.Equal(t, 2, pool.Size()) } + assert.Nil(t, err) } func TestPool_Remove(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -153,7 +155,7 @@ func TestPool_Remove(t *testing.T) { } func TestPool_GetClientIDs(t *testing.T) { - pool := NewPool() + pool := NewPool(EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) From 38ac9ca76039a2b5f0dcb29f13ce595b92088927 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:15:29 +0100 Subject: [PATCH 12/28] Set default value for buffers and commented them out --- gatewayd.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/gatewayd.yaml b/gatewayd.yaml index 1463b0c6..64fa8596 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -15,7 +15,7 @@ clients: client1: network: tcp address: localhost:5432 - receiveBufferSize: 4096 + # receiveBufferSize: 16777216 # Pool config pool: @@ -52,10 +52,10 @@ server: multiCore: True lockOSThread: False loadBalancer: roundrobin - readBufferCap: 4096 - writeBufferCap: 4096 - socketRecvBuffer: 4096 - socketSendBuffer: 4096 + # readBufferCap: 16777216 + # writeBufferCap: 16777216 + # socketRecvBuffer: 16777216 + # socketSendBuffer: 16777216 reuseAddress: True reusePort: True tcpKeepAlive: 3s # seconds From 87e0261a70cfa3eb4d6188ded1501d093e64d573 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:17:19 +0100 Subject: [PATCH 13/28] Update code to reflect the changes in the pool (cap'd) --- plugin/hooks.go | 1 + plugin/registry.go | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/plugin/hooks.go b/plugin/hooks.go index 079ace8d..f870c095 100644 --- a/plugin/hooks.go +++ b/plugin/hooks.go @@ -93,6 +93,7 @@ func (h *HookConfig) Run( verification Policy, opts ...grpc.CallOption, ) (*structpb.Struct, error) { + // TODO: accept args as map[string]interface{} and convert to structpb.Struct if ctx == nil { ctx = context.Background() } diff --git a/plugin/registry.go b/plugin/registry.go index 6311b2b8..d2349d78 100644 --- a/plugin/registry.go +++ b/plugin/registry.go @@ -17,6 +17,7 @@ const ( DefaultMinPort uint = 50000 DefaultMaxPort uint = 60000 PluginPriorityStart uint = 1000 + EmptyPoolCapacity int = 0 LoggerName string = "plugin" ) @@ -44,11 +45,15 @@ type RegistryImpl struct { var _ Registry = &RegistryImpl{} func NewRegistry(hooksConfig *HookConfig) *RegistryImpl { - return &RegistryImpl{plugins: pool.NewPool(), hooksConfig: hooksConfig} + return &RegistryImpl{plugins: pool.NewPool(EmptyPoolCapacity), hooksConfig: hooksConfig} } func (reg *RegistryImpl) Add(plugin *Impl) bool { - _, loaded := reg.plugins.GetOrPut(plugin.ID, plugin) + _, loaded, err := reg.plugins.GetOrPut(plugin.ID, plugin) + if err != nil { + reg.hooksConfig.Logger.Error().Err(err).Msg("Failed to add plugin to registry") + return false + } return loaded } From 3c337bd3aec53dd9aadc0e3f0ddec22ea0acca47 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:23:48 +0100 Subject: [PATCH 14/28] Fix bugs in server.go Add new errors messages Log when the pool is exhausted Clarify log messages and add more comments Flush connection to ensure all the data is sent Server shutdown should happen before disconnections to avoid pool panics Fix tests for server.go --- cmd/run.go | 2 +- errors/errors.go | 7 +++++++ network/server.go | 19 +++++++++++++++---- network/server_test.go | 5 +++-- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 3dc3fe15..5b7063c4 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -114,8 +114,8 @@ var runCmd = &cobra.Command{ } // Create and initialize a pool of connections - pool := pool.NewPool() poolSize, clientConfig := poolConfig() + pool := pool.NewPool(poolSize) // Add clients to the pool for i := 0; i < poolSize; i++ { diff --git a/errors/errors.go b/errors/errors.go index eb541e56..63b75c21 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -10,6 +10,13 @@ var ( ErrPluginNotFound = errors.New("plugin not found") ErrPluginNotReady = errors.New("plugin is not ready") + + ErrClientReceiveFailed = errors.New("couldn't receive data from the server") + ErrClientSendFailed = errors.New("couldn't send data to the server") + + ErrPutFailed = errors.New("failed to put in pool") + + ErrCastFailed = errors.New("failed to cast") ) const ( diff --git a/network/server.go b/network/server.go index a4ab5391..7fb4c86a 100644 --- a/network/server.go +++ b/network/server.go @@ -2,10 +2,12 @@ package network import ( "context" + "errors" "fmt" "os" "time" + gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" "github.com/panjf2000/gnet/v2" "github.com/rs/zerolog" @@ -113,6 +115,11 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) { } if err := s.proxy.Connect(gconn); err != nil { + if !errors.Is(err, gerr.ErrPoolExhausted) { + // This should never happen + // TODO: Send error to client or retry connection + s.logger.Error().Err(err).Msg("Failed to connect to proxy") + } return nil, gnet.Close } @@ -160,14 +167,16 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action { } } - if err := s.proxy.Disconnect(gconn); err != nil { - s.logger.Error().Err(err).Msg("Failed to disconnect from the client") - } - + // Shutdown the server if there are no more connections and the server is stopped if uint64(s.engine.CountConnections()) == 0 && s.Status == Stopped { return gnet.Shutdown } + if err := s.proxy.Disconnect(gconn); err != nil { + s.logger.Error().Err(err).Msg("Failed to disconnect the server connection") + return gnet.Close + } + data = map[string]interface{}{ "client": map[string]interface{}{ "local": gconn.LocalAddr().String(), @@ -215,6 +224,8 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { // TODO: Close the connection *gracefully* return gnet.Close } + // Flush the connection to make sure all data is sent + gconn.Flush() return gnet.None } diff --git a/network/server_test.go b/network/server_test.go index b5f08bb8..078cfe96 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -90,7 +90,7 @@ func TestRunServer(t *testing.T) { hooksConfig.Add(plugin.OnEgressTraffic, 1, onEgressTraffic) // Create a connection pool - pool := pool.NewPool() + pool := pool.NewPool(2) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) pool.Put(client1.ID, client1) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) @@ -140,8 +140,9 @@ func TestRunServer(t *testing.T) { defer client.Close() assert.NotNil(t, client) - err := client.Send(CreatePgStartupPacket()) + sent, err := client.Send(CreatePgStartupPacket()) assert.Nil(t, err) + assert.Equal(t, len(CreatePgStartupPacket()), sent) // The server should respond with a 'R' packet size, data, err := client.Receive() From 060f7437d4d386afa3ee6629c7fc2a60fec95f62 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:41:05 +0100 Subject: [PATCH 15/28] Fix connection management in proxy Rename Reconnect to TryReconnect to check for pool exhaustion and client disconnection Add IsExhausted method to check if the available client pool is exhausted Fix comments and add more structured logs --- network/proxy.go | 189 +++++++++++++++++++++++++++++------------------ 1 file changed, 117 insertions(+), 72 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index b0506136..f6e7d6ac 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -2,9 +2,7 @@ package network import ( "context" - "errors" - "fmt" - "io" + "time" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" @@ -15,12 +13,17 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) +const ( + EmptyPoolCapacity int = 0 +) + type Proxy interface { Connect(gconn gnet.Conn) error Disconnect(gconn gnet.Conn) error PassThrough(gconn gnet.Conn) error - Reconnect(cl *Client) *Client + TryReconnect(cl *Client) (*Client, error) Shutdown() + IsExhausted() bool } type ProxyImpl struct { @@ -45,7 +48,7 @@ func NewProxy( ) *ProxyImpl { return &ProxyImpl{ availableConnections: p, - busyConnections: pool.NewPool(), + busyConnections: pool.NewPool(EmptyPoolCapacity), logger: logger, hookConfig: hookConfig, Elastic: elastic, @@ -66,8 +69,8 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { }) var client *Client - if pr.availableConnections.Size() == 0 { - // Pool is exhausted + if pr.IsExhausted() { + // Pool is exhausted or is elastic if pr.Elastic { // Create a new client client = NewClient( @@ -81,45 +84,59 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { return gerr.ErrPoolExhausted } } else { - // Get a client from the pool - pr.logger.Debug().Msgf("Available clients: %v", pr.availableConnections.Size()) + // Get the client from the pool with the given clientID if cl, ok := pr.availableConnections.Pop(clientID).(*Client); ok { client = cl } } - if clientID != "" || client.ID != "" { - pr.busyConnections.Put(gconn, client) - pr.logger.Debug().Msgf("Client %s has been assigned to %s", client.ID, gconn.RemoteAddr().String()) - } else { - return gerr.ErrClientNotConnected + client, err := pr.TryReconnect(client) + if err != nil { + return err } - pr.logger.Debug().Msgf("[C] There are %d clients in the pool", pr.availableConnections.Size()) - pr.logger.Debug().Msgf("[C] There are %d clients in use", pr.busyConnections.Size()) + if err := pr.busyConnections.Put(gconn, client); err != nil { + // This should never happen + return gerr.ErrPutFailed + } + pr.logger.Debug().Msgf( + "Client %s has been assigned to %s", client.ID, gconn.RemoteAddr().String()) + + pr.logger.Debug().Str("function", "Proxy.Connect").Msgf( + "There are %d available clients", pr.availableConnections.Size()) + pr.logger.Debug().Str("function", "Proxy.Connect").Msgf( + "There are %d busy clients", pr.busyConnections.Size()) return nil } func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { - var client *Client - if cl, ok := pr.busyConnections.Pop(gconn).(*Client); !ok { - client = cl - } - - // TODO: The connection is unstable when I put the client back in the pool - // If the client is not in the pool, put it back - if pr.Elastic && pr.ReuseElasticClients || !pr.Elastic { - client = pr.Reconnect(client) - if client != nil && client.ID != "" { - pr.availableConnections.Put(client.ID, client) + client := pr.busyConnections.Pop(gconn) + if client != nil { + if cl, ok := client.(*Client); ok { + if (pr.Elastic && pr.ReuseElasticClients) || !pr.Elastic { + cl, err := pr.TryReconnect(cl) + if err != nil { + return err + } + // If the client is not in the pool, put it back + pr.availableConnections.Put(cl.ID, cl) + } else { + return gerr.ErrClientNotConnected + } + } else { + // This should never happen, but if it does, + // then there are some serious issues with the pool + return gerr.ErrCastFailed } } else { - client.Close() + return gerr.ErrClientNotFound } - pr.logger.Debug().Msgf("[D] There are %d clients in the pool", pr.availableConnections.Size()) - pr.logger.Debug().Msgf("[D] There are %d clients in use", pr.busyConnections.Size()) + pr.logger.Debug().Str("function", "Proxy.Disconnect").Msgf( + "There are %d available clients", pr.availableConnections.Size()) + pr.logger.Debug().Str("function", "Proxy.Disconnect").Msgf( + "There are %d busy clients", pr.busyConnections.Size()) return nil } @@ -133,10 +150,14 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { // that listens for data from the server and sends it to the client var client *Client + if pr.busyConnections.Get(gconn) == nil { + return gerr.ErrClientNotFound + } + if cl, ok := pr.busyConnections.Get(gconn).(*Client); ok { client = cl } else { - return gerr.ErrClientNotFound + return gerr.ErrCastFailed } // buf contains the data from the client (, length, query) @@ -144,6 +165,13 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { if err != nil { pr.logger.Error().Err(err).Msgf("Error reading from client: %v", err) } + pr.logger.Debug().Fields( + map[string]interface{}{ + "length": len(buf), + "local": gconn.LocalAddr().String(), + "remote": gconn.RemoteAddr().String(), + }, + ).Msg("Received data from client") addresses := map[string]interface{}{ "client": map[string]interface{}{ @@ -188,21 +216,37 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { } } - // TODO: This is a very basic implementation of the gateway - // and it is synchronous. I should make it asynchronous. - pr.logger.Debug().Msgf("Received %d bytes from %s", len(buf), gconn.RemoteAddr().String()) - // Send the query to the server - err = client.Send(buf) + sent, err := client.Send(buf) if err != nil { - return err + // return err + pr.logger.Error().Err(err).Msgf("Error sending data to database") } + pr.logger.Debug().Fields( + map[string]interface{}{ + "function": "Proxy.PassThrough", + "length": sent, + "local": client.Conn.LocalAddr().String(), + "remote": client.Conn.RemoteAddr().String(), + }, + ).Msg("Sent data to database") + + // This is a hack to make sure the server has time to respond + time.Sleep(time.Millisecond * 100) // Receive the response from the server - size, response, err := client.Receive() + received, response, err := client.Receive() + pr.logger.Debug().Fields( + map[string]interface{}{ + "function": "Proxy.PassThrough", + "length": received, + "local": client.Conn.LocalAddr().String(), + "remote": client.Conn.RemoteAddr().String(), + }, + ).Msg("Received data from database") egress := map[string]interface{}{ - "response": response[:size], // Will be converted to base64-encoded string + "response": response[:received], // Will be converted to base64-encoded string "error": "", } if err != nil { @@ -233,46 +277,39 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { } } - //nolint:gocritic - if err != nil && errors.Is(err, io.EOF) { - // The server has closed the connection - pr.logger.Error().Err(err).Msg("The client is not connected to the server anymore") - // Either the client is not connected to the server anymore or - // server forceful closed the connection - // Reconnect the client - client = pr.Reconnect(client) - // Put the client in the busy connections pool, effectively replacing the old one - pr.busyConnections.Put(gconn, client) + err = gconn.AsyncWrite(response[:received], func(gconn gnet.Conn, err error) error { + pr.logger.Debug().Fields( + map[string]interface{}{ + "function": "Proxy.PassThrough", + "length": received, + "local": gconn.LocalAddr().String(), + "remote": gconn.RemoteAddr().String(), + }, + ).Msg("Sent data to client") + return err + }) + if err != nil { + pr.logger.Error().Err(err).Msgf("Error writing to client") return err - } else if err != nil { - // Write the error to the client - _, err := gconn.Write(response[:size]) - if err != nil { - pr.logger.Error().Err(err).Msgf("Error writing the error to client: %v", err) - } - return fmt.Errorf("error receiving data from server: %w", err) - } else { - // Write the response to the incoming connection - _, err = gconn.Write(response[:size]) - if err != nil { - pr.logger.Error().Err(err).Msgf("Error writing to client: %v", err) - } } return nil } -func (pr *ProxyImpl) Reconnect(cl *Client) *Client { - // Close the client - if cl != nil && cl.ID != "" { - cl.Close() +func (pr *ProxyImpl) TryReconnect(cl *Client) (*Client, error) { + // TODO: try retriable connection? + + if pr.IsExhausted() { + pr.logger.Error().Msg("No more available connections :: TryReconnect") + return cl, gerr.ErrPoolExhausted + } + + if !cl.IsConnected() { + pr.logger.Error().Msg("Client is disconnected") + return cl, gerr.ErrClientNotConnected } - return NewClient( - pr.ClientConfig.Network, - pr.ClientConfig.Address, - pr.ClientConfig.ReceiveBufferSize, - pr.logger, - ) + + return cl, nil } func (pr *ProxyImpl) Shutdown() { @@ -297,3 +334,11 @@ func (pr *ProxyImpl) Shutdown() { pr.busyConnections.Clear() pr.logger.Debug().Msg("All busy connections have been closed") } + +func (pr *ProxyImpl) IsExhausted() bool { + if pr.Elastic { + return false + } + + return pr.availableConnections.Size() == 0 && pr.availableConnections.Cap() > 0 +} From a32cfd68052c1aa8db67b6badd5e0836c2d85528 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:45:29 +0100 Subject: [PATCH 16/28] Fix tests for proxy.go --- network/proxy.go | 4 ---- network/proxy_test.go | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index f6e7d6ac..f1ddf530 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -2,7 +2,6 @@ package network import ( "context" - "time" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" @@ -231,9 +230,6 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { }, ).Msg("Sent data to database") - // This is a hack to make sure the server has time to respond - time.Sleep(time.Millisecond * 100) - // Receive the response from the server received, response, err := client.Receive() pr.logger.Debug().Fields( diff --git a/network/proxy_test.go b/network/proxy_test.go index 86ad32ed..ac6939c8 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -33,7 +33,7 @@ func TestNewProxy(t *testing.T) { logger := logging.NewLogger(cfg) // Create a connection pool - pool := pool.NewPool() + pool := pool.NewPool(EmptyPoolCapacity) client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) pool.Put(client.ID, client) @@ -63,7 +63,7 @@ func TestNewProxyElastic(t *testing.T) { logger := logging.NewLogger(cfg) // Create a connection pool - pool := pool.NewPool() + pool := pool.NewPool(EmptyPoolCapacity) // Create a proxy with an elastic buffer pool proxy := NewProxy(pool, plugin.NewHookConfig(), true, false, &Client{ From 9b4fe4ebfd5948ec0696c9dd9727e258afa13ac0 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 00:57:07 +0100 Subject: [PATCH 17/28] Fix linter errors --- cmd/run.go | 5 ++++- network/client.go | 13 ++++++------ network/proxy.go | 16 ++++++++------ network/proxy_test.go | 3 ++- network/server_test.go | 6 ++++-- pool/pool.go | 1 + pool/pool_test.go | 48 ++++++++++++++++++++++++++++-------------- 7 files changed, 59 insertions(+), 33 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 5b7063c4..5e917ece 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -146,7 +146,10 @@ var runCmd = &cobra.Command{ } } - pool.Put(client.ID, client) + err = pool.Put(client.ID, client) + if err != nil { + logger.Error().Err(err).Msg("Failed to add client to the pool") + } } } diff --git a/network/client.go b/network/client.go index 5e2d12df..d5557c3c 100644 --- a/network/client.go +++ b/network/client.go @@ -73,20 +73,20 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo } func (c *Client) Send(data []byte) (int, error) { - if sent, err := c.Conn.Write(data); err != nil { + sent, err := c.Conn.Write(data) + if err != nil { c.logger.Error().Err(err).Msgf("Couldn't send data to the server: %s", err) // TODO: Wrap the original error return 0, gerr.ErrClientSendFailed - } else { - c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address) - return sent, nil } + c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address) + return sent, nil } func (c *Client) Receive() (int, []byte, error) { buf := make([]byte, c.ReceiveBufferSize) received, err := c.Conn.Read(buf) - if err != nil && err != io.EOF { + if err != nil && errors.Is(err, io.EOF) { c.logger.Error().Err(err).Msg("Couldn't receive data from the server") return 0, nil, err } @@ -117,8 +117,7 @@ func (c *Client) IsConnected() bool { } buf := make([]byte, 0) - _, err := c.Read(buf) - if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + if _, err := c.Read(buf); errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { c.logger.Debug().Msgf("Connection to %s is closed", c.Address) c.Close() return false diff --git a/network/proxy.go b/network/proxy.go index f1ddf530..be96dc2b 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -111,6 +111,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { client := pr.busyConnections.Pop(gconn) + //nolint:nestif if client != nil { if cl, ok := client.(*Client); ok { if (pr.Elastic && pr.ReuseElasticClients) || !pr.Elastic { @@ -119,7 +120,10 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { return err } // If the client is not in the pool, put it back - pr.availableConnections.Put(cl.ID, cl) + err = pr.availableConnections.Put(cl.ID, cl) + if err != nil { + return err + } } else { return gerr.ErrClientNotConnected } @@ -292,20 +296,20 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { return nil } -func (pr *ProxyImpl) TryReconnect(cl *Client) (*Client, error) { +func (pr *ProxyImpl) TryReconnect(client *Client) (*Client, error) { // TODO: try retriable connection? if pr.IsExhausted() { pr.logger.Error().Msg("No more available connections :: TryReconnect") - return cl, gerr.ErrPoolExhausted + return client, gerr.ErrPoolExhausted } - if !cl.IsConnected() { + if !client.IsConnected() { pr.logger.Error().Msg("Client is disconnected") - return cl, gerr.ErrClientNotConnected + return client, gerr.ErrClientNotConnected } - return cl, nil + return client, nil } func (pr *ProxyImpl) Shutdown() { diff --git a/network/proxy_test.go b/network/proxy_test.go index ac6939c8..e6ff4484 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -35,7 +35,8 @@ func TestNewProxy(t *testing.T) { // Create a connection pool pool := pool.NewPool(EmptyPoolCapacity) client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - pool.Put(client.ID, client) + err := pool.Put(client.ID, client) + assert.Nil(t, err) // Create a proxy with a fixed buffer pool proxy := NewProxy(pool, plugin.NewHookConfig(), false, false, nil, logger) diff --git a/network/server_test.go b/network/server_test.go index 078cfe96..e5bc6b7c 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -92,9 +92,11 @@ func TestRunServer(t *testing.T) { // Create a connection pool pool := pool.NewPool(2) client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - pool.Put(client1.ID, client1) + err := pool.Put(client1.ID, client1) + assert.Nil(t, err) client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) - pool.Put(client2.ID, client2) + err = pool.Put(client2.ID, client2) + assert.Nil(t, err) // Create a proxy with a fixed buffer pool proxy := NewProxy(pool, hooksConfig, false, false, &Client{ diff --git a/pool/pool.go b/pool/pool.go index c6585209..202a4392 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -100,6 +100,7 @@ func (p *Impl) Cap() int { return p.cap } +//nolint:predeclared func NewPool(cap int) *Impl { return &Impl{pool: sync.Map{}, cap: cap} } diff --git a/pool/pool_test.go b/pool/pool_test.go index 926656c7..834ad69c 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -20,10 +20,12 @@ func TestPool_Put(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") assert.Equal(t, 2, pool.Size()) + assert.Nil(t, err) } //nolint:dupl @@ -33,9 +35,11 @@ func TestPool_Pop(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) if c1, ok := pool.Pop("client1.ID").(string); !ok { assert.Equal(t, c1, "client1") @@ -57,9 +61,11 @@ func TestPool_Clear(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) pool.Clear() assert.Equal(t, 0, pool.Size()) @@ -71,9 +77,11 @@ func TestPool_ForEach(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) pool.ForEach(func(key, value interface{}) bool { if c, ok := value.(string); ok { @@ -90,9 +98,11 @@ func TestPool_Get(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) if c1, ok := pool.Get("client1.ID").(string); !ok { assert.Equal(t, c1, "client1") @@ -114,9 +124,11 @@ func TestPool_GetOrPut(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) c1, loaded, err := pool.GetOrPut("client1.ID", "client1") assert.True(t, loaded) @@ -144,9 +156,11 @@ func TestPool_Remove(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) pool.Remove("client1.ID") assert.Equal(t, 1, pool.Size()) @@ -160,9 +174,11 @@ func TestPool_GetClientIDs(t *testing.T) { assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) assert.Equal(t, 0, pool.Size()) - pool.Put("client1.ID", "client1") + err := pool.Put("client1.ID", "client1") + assert.Nil(t, err) assert.Equal(t, 1, pool.Size()) - pool.Put("client2.ID", "client2") + err = pool.Put("client2.ID", "client2") + assert.Nil(t, err) assert.Equal(t, 2, pool.Size()) var ids []string From a402abca94b2b85f4a7b1c2e3caf6866a3180728 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:04:59 +0100 Subject: [PATCH 18/28] Ignore wrapcheck linter errors for now (#50) --- network/client.go | 4 ++-- network/proxy.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/network/client.go b/network/client.go index d5557c3c..435e223f 100644 --- a/network/client.go +++ b/network/client.go @@ -88,9 +88,9 @@ func (c *Client) Receive() (int, []byte, error) { received, err := c.Conn.Read(buf) if err != nil && errors.Is(err, io.EOF) { c.logger.Error().Err(err).Msg("Couldn't receive data from the server") - return 0, nil, err + return 0, nil, err //nolint:wrapcheck } - return received, buf, err + return received, buf, err //nolint:wrapcheck } func (c *Client) Close() { diff --git a/network/proxy.go b/network/proxy.go index be96dc2b..8c2d6baf 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -122,7 +122,7 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { // If the client is not in the pool, put it back err = pr.availableConnections.Put(cl.ID, cl) if err != nil { - return err + return err //nolint:wrapcheck } } else { return gerr.ErrClientNotConnected @@ -290,7 +290,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { }) if err != nil { pr.logger.Error().Err(err).Msgf("Error writing to client") - return err + return err //nolint:wrapcheck } return nil From ea7d3d8aa6dec6a818875184c0080f49888d3e40 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:07:24 +0100 Subject: [PATCH 19/28] Remove python test file --- client_test.py | 76 -------------------------------------------------- 1 file changed, 76 deletions(-) delete mode 100644 client_test.py diff --git a/client_test.py b/client_test.py deleted file mode 100644 index 9ffae34b..00000000 --- a/client_test.py +++ /dev/null @@ -1,76 +0,0 @@ -import os -from pprint import pprint -from concurrent.futures import ThreadPoolExecutor -import psycopg - - -def create_table(): - conn = None - try: - conn = psycopg.connect(host="localhost", port=15432, dbname="test", - user="postgres", password="postgres", sslmode="disable") - - conn.execute( - "CREATE TABLE IF NOT EXISTS test (id serial PRIMARY KEY, num integer, data varchar);") - conn.close() - except KeyboardInterrupt: - if conn: - conn.close() - os._exit(0) - except Exception as e: - print("Worker %s: %s" % (id, e)) - - return - - -def writer(id): - conn = None - try: - conn = psycopg.connect(host="localhost", port=15432, dbname="test", - user="postgres", password="postgres", sslmode="disable") - - conn.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (id, "abc'def")) - - conn.close() - except KeyboardInterrupt: - if conn: - conn.close() - os._exit(0) - except Exception as e: - print("Worker %s: %s" % (id, e)) - - return - - -def reader(): - conn = None - try: - conn = psycopg.connect(host="localhost", port=15432, dbname="test", - user="postgres", password="postgres", sslmode="disable") - - for row in conn.execute("SELECT * FROM test;"): - print("ID=%s, NUM=%s, DATA=%s" % row) - # conn.execute("DROP TABLE test;") - conn.close() - except KeyboardInterrupt: - if conn: - conn.close() - os._exit(0) - except Exception as e: - print("Worker %s: %s" % (id, e)) - - return - - -if __name__ == '__main__': - with ThreadPoolExecutor(max_workers=10) as executor: - # Create 11 connections to the server and run queries in parallel - # This will cause the server to crash - executor.submit(create_table) - - for i in range(10): - executor.submit(writer, i) - - # Wait for all threads to finish - executor.submit(reader) - executor.shutdown(wait=True) From 4cd91381eeb9251b49cb9fb78560559053842f23 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:13:14 +0100 Subject: [PATCH 20/28] Clean up CI envs --- .github/workflows/test.yaml | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 87f75e80..cc380338 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -92,13 +92,10 @@ jobs: run: | sudo apt-get update sudo apt-get install --yes --no-install-recommends postgresql-client - psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -c "CREATE DATABASE gatewayd_test;" - psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));" - psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "INSERT INTO test_table (name) VALUES ('test');" - psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "SELECT * FROM test_table;" | grep test + psql ${PGURL1} -c "CREATE DATABASE gatewayd_test;" + psql ${PGURL2} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));" + psql ${PGURL2} -c "INSERT INTO test_table (name) VALUES ('test');" + psql ${PGURL2} -c "SELECT * FROM test_table;" | grep test || exit 1 env: - DBNAME: gatewayd_test - PGUSER: postgres - PGPASSWORD: postgres - PGHOST: localhost - PGPORT: 15432 + PGURL1: postgresql://postgres:postgres@localhost:15432/postgres + PGURL2: postgresql://postgres:postgres@localhost:15432/gatewayd_test From 9bbe639b1e5a46c1fb68543c0d7a3b31d26730cd Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:23:40 +0100 Subject: [PATCH 21/28] TryReconnect should be revisited --- network/proxy.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index 8c2d6baf..5351ac08 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -301,12 +301,6 @@ func (pr *ProxyImpl) TryReconnect(client *Client) (*Client, error) { if pr.IsExhausted() { pr.logger.Error().Msg("No more available connections :: TryReconnect") - return client, gerr.ErrPoolExhausted - } - - if !client.IsConnected() { - pr.logger.Error().Msg("Client is disconnected") - return client, gerr.ErrClientNotConnected } return client, nil From 7199e4182e220cceaa2bb78a7eb8ecb580de8e5d Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:28:51 +0100 Subject: [PATCH 22/28] Increase pool size to 10 --- gatewayd.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gatewayd.yaml b/gatewayd.yaml index 64fa8596..2de46d2f 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -22,7 +22,7 @@ pool: # Use the logger config passed here # i.e. don't assume it's the same as the logger config above logger: loggers.logger - size: 2 + size: 10 # Database configs for the connection pool client: clients.client1 From ea6e4d9bb6dd322577f4c302ae07653b48b353f4 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:39:28 +0100 Subject: [PATCH 23/28] Temporary hack to fix zero bytes response from database --- network/proxy.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/network/proxy.go b/network/proxy.go index 5351ac08..11bdab0a 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -277,6 +277,15 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { } } + // If the database didn't respond, send an error message to the client + // TODO: Figure out how to handle this better + if received == 0 { + pr.logger.Warn().Msg("Received 0 bytes from database") + _, err := gconn.Write([]byte("No response from database")) + return err //nolint:wrapcheck + } + + // Send the actual response to the client asynchronously err = gconn.AsyncWrite(response[:received], func(gconn gnet.Conn, err error) error { pr.logger.Debug().Fields( map[string]interface{}{ From e4770cb45e47d730bedbc610e22f0d96ecc45c30 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 01:47:50 +0100 Subject: [PATCH 24/28] Reconnect if the connection is lost --- network/proxy.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/network/proxy.go b/network/proxy.go index 11bdab0a..a886c4e7 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -312,6 +312,17 @@ func (pr *ProxyImpl) TryReconnect(client *Client) (*Client, error) { pr.logger.Error().Msg("No more available connections :: TryReconnect") } + if !client.IsConnected() { + client.Close() + + client = NewClient( + pr.ClientConfig.Network, + pr.ClientConfig.Address, + pr.ClientConfig.ReceiveBufferSize, + client.logger, + ) + } + return client, nil } From 708ed9f9b1e8bafe22af418273afa7b8dd687df9 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 02:10:13 +0100 Subject: [PATCH 25/28] Revert "Temporary hack to fix zero bytes response from database" This reverts commit ea6e4d9bb6dd322577f4c302ae07653b48b353f4. --- network/proxy.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index a886c4e7..57632c64 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -277,15 +277,6 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { } } - // If the database didn't respond, send an error message to the client - // TODO: Figure out how to handle this better - if received == 0 { - pr.logger.Warn().Msg("Received 0 bytes from database") - _, err := gconn.Write([]byte("No response from database")) - return err //nolint:wrapcheck - } - - // Send the actual response to the client asynchronously err = gconn.AsyncWrite(response[:received], func(gconn gnet.Conn, err error) error { pr.logger.Debug().Fields( map[string]interface{}{ From c4d1b419020974b7e91e1b339e00a046d86e0c97 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 02:57:31 +0100 Subject: [PATCH 26/28] Use default buffer size if not given --- cmd/config_parser.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/config_parser.go b/cmd/config_parser.go index bdb341c2..98126cc4 100644 --- a/cmd/config_parser.go +++ b/cmd/config_parser.go @@ -131,6 +131,10 @@ func proxyConfig() (bool, bool, *network.Client) { address := globalConfig.String(ref + ".address") receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize") + if receiveBufferSize <= 0 { + receiveBufferSize = network.DefaultBufferSize + } + return elastic, reuseElasticClients, &network.Client{ Network: net, Address: address, From 83848e46f9e45dc84b02ea2e196e722f4200383f Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 02:58:17 +0100 Subject: [PATCH 27/28] Fixes to make concurrent connections work --- network/proxy.go | 25 ++++++++++--------------- network/server.go | 18 +++++++++++++++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index 57632c64..5a0356c9 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -91,7 +91,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) error { client, err := pr.TryReconnect(client) if err != nil { - return err + pr.logger.Error().Err(err).Msgf("Failed to connect to the client") } if err := pr.busyConnections.Put(gconn, client); err != nil { @@ -115,14 +115,16 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { if client != nil { if cl, ok := client.(*Client); ok { if (pr.Elastic && pr.ReuseElasticClients) || !pr.Elastic { - cl, err := pr.TryReconnect(cl) - if err != nil { - return err + if !cl.IsConnected() { + _, err := pr.TryReconnect(cl) + if err != nil { + pr.logger.Error().Err(err).Msgf("Failed to reconnect to the client") + } } // If the client is not in the pool, put it back - err = pr.availableConnections.Put(cl.ID, cl) + err := pr.availableConnections.Put(cl.ID, cl) if err != nil { - return err //nolint:wrapcheck + pr.logger.Error().Err(err).Msgf("Failed to put the client back in the pool") } } else { return gerr.ErrClientNotConnected @@ -222,7 +224,6 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) error { // Send the query to the server sent, err := client.Send(buf) if err != nil { - // return err pr.logger.Error().Err(err).Msgf("Error sending data to database") } pr.logger.Debug().Fields( @@ -301,17 +302,11 @@ func (pr *ProxyImpl) TryReconnect(client *Client) (*Client, error) { if pr.IsExhausted() { pr.logger.Error().Msg("No more available connections :: TryReconnect") + return client, gerr.ErrPoolExhausted } if !client.IsConnected() { - client.Close() - - client = NewClient( - pr.ClientConfig.Network, - pr.ClientConfig.Address, - pr.ClientConfig.ReceiveBufferSize, - client.logger, - ) + pr.logger.Error().Msg("Client is disconnected") } return client, nil diff --git a/network/server.go b/network/server.go index 7fb4c86a..2675cf2f 100644 --- a/network/server.go +++ b/network/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "os" "time" @@ -115,12 +116,14 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) { } if err := s.proxy.Connect(gconn); err != nil { - if !errors.Is(err, gerr.ErrPoolExhausted) { + if errors.Is(err, gerr.ErrPoolExhausted) { + return nil, gnet.Close + } else { // This should never happen // TODO: Send error to client or retry connection s.logger.Error().Err(err).Msg("Failed to connect to proxy") + return nil, gnet.None } - return nil, gnet.Close } onOpenedData, err := structpb.NewStruct(map[string]interface{}{ @@ -222,7 +225,16 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { if err := s.proxy.PassThrough(gconn); err != nil { s.logger.Error().Err(err).Msg("Failed to pass through traffic") // TODO: Close the connection *gracefully* - return gnet.Close + switch { + case errors.Is(err, gerr.ErrPoolExhausted): + case errors.Is(err, gerr.ErrCastFailed): + case errors.Is(err, gerr.ErrClientNotFound): + case errors.Is(err, gerr.ErrClientNotConnected): + case errors.Is(err, gerr.ErrClientSendFailed): + case errors.Is(err, gerr.ErrClientReceiveFailed): + case errors.Is(err, io.EOF): + return gnet.Close + } } // Flush the connection to make sure all data is sent gconn.Flush() From 514cd14e79ad1bfbaa6f09de17168365aa9b56cd Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 19 Dec 2022 03:00:46 +0100 Subject: [PATCH 28/28] Fix linter errors --- network/proxy.go | 8 ++++---- network/server.go | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index 5a0356c9..e0738d98 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -113,16 +113,16 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) error { client := pr.busyConnections.Pop(gconn) //nolint:nestif if client != nil { - if cl, ok := client.(*Client); ok { + if client, ok := client.(*Client); ok { if (pr.Elastic && pr.ReuseElasticClients) || !pr.Elastic { - if !cl.IsConnected() { - _, err := pr.TryReconnect(cl) + if !client.IsConnected() { + _, err := pr.TryReconnect(client) if err != nil { pr.logger.Error().Err(err).Msgf("Failed to reconnect to the client") } } // If the client is not in the pool, put it back - err := pr.availableConnections.Put(cl.ID, cl) + err := pr.availableConnections.Put(client.ID, client) if err != nil { pr.logger.Error().Err(err).Msgf("Failed to put the client back in the pool") } diff --git a/network/server.go b/network/server.go index 2675cf2f..bc4e8244 100644 --- a/network/server.go +++ b/network/server.go @@ -118,12 +118,12 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) { if err := s.proxy.Connect(gconn); err != nil { if errors.Is(err, gerr.ErrPoolExhausted) { return nil, gnet.Close - } else { - // This should never happen - // TODO: Send error to client or retry connection - s.logger.Error().Err(err).Msg("Failed to connect to proxy") - return nil, gnet.None } + + // This should never happen + // TODO: Send error to client or retry connection + s.logger.Error().Err(err).Msg("Failed to connect to proxy") + return nil, gnet.None } onOpenedData, err := structpb.NewStruct(map[string]interface{}{