Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2821d44
Fix bug in setting the default value for client.ReceiveBufferSize
mostafa Dec 18, 2022
db8ac13
Return the amount of data sent to the server (database)
mostafa Dec 18, 2022
03827c7
Handle io.EOF when reading from the server (database)
mostafa Dec 18, 2022
7fe1244
Don't reset the receive buffer size on close
mostafa Dec 18, 2022
c6102a3
Add Client.IsConnected function to check if the client's underlying c…
mostafa Dec 18, 2022
8f57718
Fix tests for client.go
mostafa Dec 18, 2022
2ef25c4
Set default buffer size to a larger value: 16 MB
mostafa Dec 18, 2022
dd01896
Set default value for read/write buffer caps and socket buffer lengths
mostafa Dec 18, 2022
4454af8
Update dependencies to latest versions
mostafa Dec 18, 2022
397720f
Add cap'd pool
mostafa Dec 18, 2022
98053bb
Fix tests for pool.go
mostafa Dec 18, 2022
38ac9ca
Set default value for buffers and commented them out
mostafa Dec 18, 2022
87e0261
Update code to reflect the changes in the pool (cap'd)
mostafa Dec 18, 2022
3c337bd
Fix bugs in server.go
mostafa Dec 18, 2022
060f743
Fix connection management in proxy
mostafa Dec 18, 2022
a32cfd6
Fix tests for proxy.go
mostafa Dec 18, 2022
9b4fe4e
Fix linter errors
mostafa Dec 18, 2022
a402abc
Ignore wrapcheck linter errors for now (#50)
mostafa Dec 19, 2022
ea7d3d8
Remove python test file
mostafa Dec 19, 2022
4cd9138
Clean up CI envs
mostafa Dec 19, 2022
9bbe639
TryReconnect should be revisited
mostafa Dec 19, 2022
7199e41
Increase pool size to 10
mostafa Dec 19, 2022
ea6e4d9
Temporary hack to fix zero bytes response from database
mostafa Dec 19, 2022
e4770cb
Reconnect if the connection is lost
mostafa Dec 19, 2022
708ed9f
Revert "Temporary hack to fix zero bytes response from database"
mostafa Dec 19, 2022
c4d1b41
Use default buffer size if not given
mostafa Dec 19, 2022
83848e4
Fixes to make concurrent connections work
mostafa Dec 19, 2022
514cd14
Fix linter errors
mostafa Dec 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,10 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install --yes --no-install-recommends postgresql-client
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -c "CREATE DATABASE gatewayd_test;"
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));"
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "INSERT INTO test_table (name) VALUES ('test');"
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "SELECT * FROM test_table;" | grep test
psql ${PGURL1} -c "CREATE DATABASE gatewayd_test;"
psql ${PGURL2} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));"
psql ${PGURL2} -c "INSERT INTO test_table (name) VALUES ('test');"
psql ${PGURL2} -c "SELECT * FROM test_table;" | grep test || exit 1
env:
DBNAME: gatewayd_test
PGUSER: postgres
PGPASSWORD: postgres
PGHOST: localhost
PGPORT: 15432
PGURL1: postgresql://postgres:postgres@localhost:15432/postgres
PGURL2: postgresql://postgres:postgres@localhost:15432/gatewayd_test
76 changes: 0 additions & 76 deletions client_test.py

This file was deleted.

32 changes: 28 additions & 4 deletions cmd/config_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func proxyConfig() (bool, bool, *network.Client) {
address := globalConfig.String(ref + ".address")
receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize")

if receiveBufferSize <= 0 {
receiveBufferSize = network.DefaultBufferSize
}

return elastic, reuseElasticClients, &network.Client{
Network: net,
Address: address,
Expand Down Expand Up @@ -181,6 +185,26 @@ func getTCPNoDelay() gnet.TCPSocketOpt {
}

func serverConfig() *ServerConfig {
readBufferCap := globalConfig.Int("server.readBufferCap")
if readBufferCap <= 0 {
readBufferCap = network.DefaultBufferSize
}

writeBufferCap := globalConfig.Int("server.writeBufferCap")
if writeBufferCap <= 0 {
writeBufferCap = network.DefaultBufferSize
}

socketRecvBuffer := globalConfig.Int("server.socketRecvBuffer")
if socketRecvBuffer <= 0 {
socketRecvBuffer = network.DefaultBufferSize
}

socketSendBuffer := globalConfig.Int("server.socketSendBuffer")
if socketSendBuffer <= 0 {
socketSendBuffer = network.DefaultBufferSize
}

return &ServerConfig{
Network: globalConfig.String("server.network"),
Address: globalConfig.String("server.address"),
Expand All @@ -191,10 +215,10 @@ func serverConfig() *ServerConfig {
MultiCore: globalConfig.Bool("server.multiCore"),
LockOSThread: globalConfig.Bool("server.lockOSThread"),
LoadBalancer: getLoadBalancer(globalConfig.String("server.loadBalancer")),
ReadBufferCap: globalConfig.Int("server.readBufferCap"),
WriteBufferCap: globalConfig.Int("server.writeBufferCap"),
SocketRecvBuffer: globalConfig.Int("server.socketRecvBuffer"),
SocketSendBuffer: globalConfig.Int("server.socketSendBuffer"),
ReadBufferCap: readBufferCap,
WriteBufferCap: writeBufferCap,
SocketRecvBuffer: socketRecvBuffer,
SocketSendBuffer: socketSendBuffer,
ReuseAddress: globalConfig.Bool("server.reuseAddress"),
ReusePort: globalConfig.Bool("server.reusePort"),
TCPKeepAlive: globalConfig.Duration("server.tcpKeepAlive"),
Expand Down
7 changes: 5 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ var runCmd = &cobra.Command{
}

// Create and initialize a pool of connections
pool := pool.NewPool()
poolSize, clientConfig := poolConfig()
pool := pool.NewPool(poolSize)

// Add clients to the pool
for i := 0; i < poolSize; i++ {
Expand Down Expand Up @@ -146,7 +146,10 @@ var runCmd = &cobra.Command{
}
}

pool.Put(client.ID, client)
err = pool.Put(client.ID, client)
if err != nil {
logger.Error().Err(err).Msg("Failed to add client to the pool")
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ var (

ErrPluginNotFound = errors.New("plugin not found")
ErrPluginNotReady = errors.New("plugin is not ready")

ErrClientReceiveFailed = errors.New("couldn't receive data from the server")
ErrClientSendFailed = errors.New("couldn't send data to the server")

ErrPutFailed = errors.New("failed to put in pool")

ErrCastFailed = errors.New("failed to cast")
)

const (
Expand Down
12 changes: 6 additions & 6 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ clients:
client1:
network: tcp
address: localhost:5432
receiveBufferSize: 4096
# receiveBufferSize: 16777216

# Pool config
pool:
# Use the logger config passed here
# i.e. don't assume it's the same as the logger config above
logger: loggers.logger
size: 2
size: 10
# Database configs for the connection pool
client: clients.client1

Expand Down Expand Up @@ -52,10 +52,10 @@ server:
multiCore: True
lockOSThread: False
loadBalancer: roundrobin
readBufferCap: 4096
writeBufferCap: 4096
socketRecvBuffer: 4096
socketSendBuffer: 4096
# readBufferCap: 16777216
# writeBufferCap: 16777216
# socketRecvBuffer: 16777216
# socketSendBuffer: 16777216
reuseAddress: True
reusePort: True
tcpKeepAlive: 3s # seconds
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ require (
github.com/hashicorp/go-plugin v1.4.8
github.com/knadh/koanf v1.4.4
github.com/mitchellh/mapstructure v1.5.0
github.com/panjf2000/gnet/v2 v2.2.0
github.com/panjf2000/gnet/v2 v2.2.1
github.com/rs/zerolog v1.28.0
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand All @@ -36,7 +36,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/fergusstrange/embedded-postgres v1.19.0 h1:NqDufJHeA03U7biULlPHZ0pZ10
github.com/fergusstrange/embedded-postgres v1.19.0/go.mod h1:0B+3bPsMvcNgR9nN+bdM2x9YaNYDnf3ksUqYp1OAub0=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down Expand Up @@ -222,8 +224,11 @@ github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/panjf2000/ants/v2 v2.4.8 h1:JgTbolX6K6RreZ4+bfctI0Ifs+3mrE5BIHudQxUDQ9k=
github.com/panjf2000/ants/v2 v2.4.8/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/panjf2000/ants/v2 v2.7.0 h1:Y3Bgpfo9HDkBoHNVFbMfY5mAvi5TAA17y3HbzQ74p5Y=
github.com/panjf2000/gnet/v2 v2.2.0 h1:+6itXhRlHJpv5UGAyN1DebHzK1l0GbZMOsg2Spb1VS0=
github.com/panjf2000/gnet/v2 v2.2.0/go.mod h1:unWr2B4jF0DQPJH3GsXBGQiDcAamM6+Pf5FiK705kc4=
github.com/panjf2000/gnet/v2 v2.2.1 h1:HJVK3vmD6rBgOeTnYkG4czW6jphVHygxLLWTEBU3nqU=
github.com/panjf2000/gnet/v2 v2.2.1/go.mod h1:y8xWR1EEK6pGDuAQ6XULY/WWmPv0Pgbsq2Q4lbXJ6JA=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI=
Expand Down Expand Up @@ -304,6 +309,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
Expand All @@ -316,6 +323,10 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 h1:OvjRkcNHnf6/W5FZXSxODbxwD+X7fspczG7Jn/xQVD4=
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20221215174704-0915cd710c24 h1:6w3iSY8IIkp5OQtbYj8NeuKG1jS9d+kYaubXqsoOiQ8=
golang.org/x/exp v0.0.0-20221215174704-0915cd710c24/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w=
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -391,6 +402,7 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
53 changes: 40 additions & 13 deletions network/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package network

import (
"fmt"
"errors"
"io"
"net"

gerr "github.com/gatewayd-io/gatewayd/errors"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -58,33 +60,37 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo
}

client.Conn = conn
if client.ReceiveBufferSize == 0 {
if receiveBufferSize <= 0 {
client.ReceiveBufferSize = DefaultBufferSize
} else {
client.ReceiveBufferSize = receiveBufferSize
}

logger.Debug().Msgf("New client created: %s", client.Address)
client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger)

return &client
}

func (c *Client) Send(data []byte) error {
if _, err := c.Write(data); err != nil {
func (c *Client) Send(data []byte) (int, error) {
sent, err := c.Conn.Write(data)
if err != nil {
c.logger.Error().Err(err).Msgf("Couldn't send data to the server: %s", err)
return fmt.Errorf("couldn't send data to the server: %w", err)
// TODO: Wrap the original error
return 0, gerr.ErrClientSendFailed
}
c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address)
return nil
return sent, nil
}

func (c *Client) Receive() (int, []byte, error) {
buf := make([]byte, c.ReceiveBufferSize)
read, err := c.Read(buf)
if err != nil {
c.logger.Error().Err(err).Msgf("Couldn't receive data from the server: %s", err)
return 0, nil, fmt.Errorf("couldn't receive data from the server: %w", err)
received, err := c.Conn.Read(buf)
if err != nil && errors.Is(err, io.EOF) {
c.logger.Error().Err(err).Msg("Couldn't receive data from the server")
return 0, nil, err //nolint:wrapcheck
}
c.logger.Debug().Msgf("Received %d bytes from %s", read, c.Address)
return read, buf, nil
return received, buf, err //nolint:wrapcheck
}

func (c *Client) Close() {
Expand All @@ -96,5 +102,26 @@ func (c *Client) Close() {
c.Conn = nil
c.Address = ""
c.Network = ""
c.ReceiveBufferSize = 0
}

// Go returns io.EOF when the server closes the connection.
// So, if I read 0 bytes and the error is io.EOF or net.ErrClosed, I should reconnect.
func (c *Client) IsConnected() bool {
if c == nil {
return false
}

if c != nil && c.Conn == nil || c.ID == "" {
c.Close()
return false
}

buf := make([]byte, 0)
if _, err := c.Read(buf); errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
c.logger.Debug().Msgf("Connection to %s is closed", c.Address)
c.Close()
return false
}

return true
}
Loading