Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package network

import (
"context"
"errors"
"io"

gerr "github.com/gatewayd-io/gatewayd/errors"
"github.com/gatewayd-io/gatewayd/plugin"
Expand Down Expand Up @@ -93,7 +95,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError {

client, err := pr.TryReconnect(client)
if err != nil {
pr.logger.Error().Err(err).Msgf("Failed to connect to the client")
pr.logger.Error().Err(err).Msg("Failed to connect to the client")
}

if err := pr.busyConnections.Put(gconn, client); err != nil {
Expand All @@ -120,13 +122,13 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) *gerr.GatewayDError {
if !client.IsConnected() {
_, err := pr.TryReconnect(client)
if err != nil {
pr.logger.Error().Err(err).Msgf("Failed to reconnect to the client")
pr.logger.Error().Err(err).Msg("Failed to reconnect to the client")
}
}
// If the client is not in the pool, put it back
err := pr.availableConnections.Put(client.ID, client)
if err != nil {
pr.logger.Error().Err(err).Msgf("Failed to put the client back in the pool")
pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool")
}
} else {
return gerr.ErrClientNotConnected
Expand Down Expand Up @@ -223,7 +225,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
// Send the query to the server
sent, err := client.Send(request)
if err != nil {
pr.logger.Error().Err(err).Msgf("Error sending data to database")
pr.logger.Error().Err(err).Msg("Error sending data to database")
}
pr.logger.Debug().Fields(
map[string]interface{}{
Expand All @@ -245,6 +247,33 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
).Msg("Received data from database")

// The connection to the server is closed, so we MUST reconnect,
// otherwise the client will be stuck.
if received == 0 && err != nil && errors.Is(err.Unwrap(), io.EOF) {
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.passthrough",
"local": client.Conn.LocalAddr().String(),
"remote": client.Conn.RemoteAddr().String(),
}).Msgf("Client disconnected")

client.Close()
client = NewClient(
pr.ClientConfig.Network,
pr.ClientConfig.Address,
pr.ClientConfig.ReceiveBufferSize,
pr.ClientConfig.ReceiveChunkSize,
pr.ClientConfig.ReceiveDeadline,
pr.ClientConfig.SendDeadline,
pr.logger,
)
pr.busyConnections.Remove(gconn)
if err := pr.busyConnections.Put(gconn, client); err != nil {
// This should never happen
return err
}
}

egress := map[string]interface{}{
"response": response[:received], // Will be converted to base64-encoded string
"error": "",
Expand Down Expand Up @@ -286,7 +315,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
return err
})
if origErr != nil {
pr.logger.Error().Err(err).Msgf("Error writing to client")
pr.logger.Error().Err(err).Msg("Error writing to client")
return gerr.ErrServerSendFailed.Wrap(err)
}

Expand Down
6 changes: 3 additions & 3 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action {
case errors.Is(err, gerr.ErrClientNotConnected):
case errors.Is(err, gerr.ErrClientSendFailed):
case errors.Is(err, gerr.ErrClientReceiveFailed):
case errors.Is(err, io.EOF):
case errors.Is(err.Unwrap(), io.EOF):
return gnet.Close
}
}
Expand Down Expand Up @@ -333,15 +333,15 @@ func NewServer(

if hardLimit == 0 {
server.HardLimit = limits.Max
logger.Debug().Msgf("Hard limit is not set, using the current system hard limit")
logger.Debug().Msg("Hard limit is not set, using the current system hard limit")
} else {
server.HardLimit = hardLimit
logger.Debug().Msgf("Hard limit is set to %d", hardLimit)
}

if tickInterval == 0 {
server.TickInterval = DefaultTickInterval
logger.Debug().Msgf("Tick interval is not set, using the default value")
logger.Debug().Msg("Tick interval is not set, using the default value")
} else {
server.TickInterval = tickInterval
}
Expand Down
6 changes: 3 additions & 3 deletions plugin/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (h *HookConfig) Run(
"hookType": hookType,
"priority": prio,
},
).Msgf("Hook returned invalid value, ignoring")
).Msg("Hook returned invalid value, ignoring")
if idx == 0 {
returnVal = params
}
Expand All @@ -161,7 +161,7 @@ func (h *HookConfig) Run(
"hookType": hookType,
"priority": prio,
},
).Msgf("Hook returned invalid value, aborting")
).Msg("Hook returned invalid value, aborting")
if idx == 0 {
return args, nil
}
Expand All @@ -173,7 +173,7 @@ func (h *HookConfig) Run(
"hookType": hookType,
"priority": prio,
},
).Msgf("Hook returned invalid value, removing")
).Msg("Hook returned invalid value, removing")
removeList = append(removeList, prio)
if idx == 0 {
returnVal = params
Expand Down