diff --git a/network/proxy.go b/network/proxy.go index 4aaee962..84047c29 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -2,6 +2,8 @@ package network import ( "context" + "errors" + "io" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" @@ -93,7 +95,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError { client, err := pr.TryReconnect(client) if err != nil { - pr.logger.Error().Err(err).Msgf("Failed to connect to the client") + pr.logger.Error().Err(err).Msg("Failed to connect to the client") } if err := pr.busyConnections.Put(gconn, client); err != nil { @@ -120,13 +122,13 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) *gerr.GatewayDError { if !client.IsConnected() { _, err := pr.TryReconnect(client) if err != nil { - pr.logger.Error().Err(err).Msgf("Failed to reconnect to the client") + pr.logger.Error().Err(err).Msg("Failed to reconnect to the client") } } // If the client is not in the pool, put it back err := pr.availableConnections.Put(client.ID, client) if err != nil { - pr.logger.Error().Err(err).Msgf("Failed to put the client back in the pool") + pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool") } } else { return gerr.ErrClientNotConnected @@ -223,7 +225,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { // Send the query to the server sent, err := client.Send(request) if err != nil { - pr.logger.Error().Err(err).Msgf("Error sending data to database") + pr.logger.Error().Err(err).Msg("Error sending data to database") } pr.logger.Debug().Fields( map[string]interface{}{ @@ -245,6 +247,33 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { }, ).Msg("Received data from database") + // The connection to the server is closed, so we MUST reconnect, + // otherwise the client will be stuck. + if received == 0 && err != nil && errors.Is(err.Unwrap(), io.EOF) { + pr.logger.Debug().Fields( + map[string]interface{}{ + "function": "proxy.passthrough", + "local": client.Conn.LocalAddr().String(), + "remote": client.Conn.RemoteAddr().String(), + }).Msgf("Client disconnected") + + client.Close() + client = NewClient( + pr.ClientConfig.Network, + pr.ClientConfig.Address, + pr.ClientConfig.ReceiveBufferSize, + pr.ClientConfig.ReceiveChunkSize, + pr.ClientConfig.ReceiveDeadline, + pr.ClientConfig.SendDeadline, + pr.logger, + ) + pr.busyConnections.Remove(gconn) + if err := pr.busyConnections.Put(gconn, client); err != nil { + // This should never happen + return err + } + } + egress := map[string]interface{}{ "response": response[:received], // Will be converted to base64-encoded string "error": "", @@ -286,7 +315,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { return err }) if origErr != nil { - pr.logger.Error().Err(err).Msgf("Error writing to client") + pr.logger.Error().Err(err).Msg("Error writing to client") return gerr.ErrServerSendFailed.Wrap(err) } diff --git a/network/server.go b/network/server.go index eba7abf5..f09d7da3 100644 --- a/network/server.go +++ b/network/server.go @@ -196,7 +196,7 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { case errors.Is(err, gerr.ErrClientNotConnected): case errors.Is(err, gerr.ErrClientSendFailed): case errors.Is(err, gerr.ErrClientReceiveFailed): - case errors.Is(err, io.EOF): + case errors.Is(err.Unwrap(), io.EOF): return gnet.Close } } @@ -333,7 +333,7 @@ func NewServer( if hardLimit == 0 { server.HardLimit = limits.Max - logger.Debug().Msgf("Hard limit is not set, using the current system hard limit") + logger.Debug().Msg("Hard limit is not set, using the current system hard limit") } else { server.HardLimit = hardLimit logger.Debug().Msgf("Hard limit is set to %d", hardLimit) @@ -341,7 +341,7 @@ func NewServer( if tickInterval == 0 { server.TickInterval = DefaultTickInterval - logger.Debug().Msgf("Tick interval is not set, using the default value") + logger.Debug().Msg("Tick interval is not set, using the default value") } else { server.TickInterval = tickInterval } diff --git a/plugin/hooks.go b/plugin/hooks.go index 7b4af199..d90a90aa 100644 --- a/plugin/hooks.go +++ b/plugin/hooks.go @@ -150,7 +150,7 @@ func (h *HookConfig) Run( "hookType": hookType, "priority": prio, }, - ).Msgf("Hook returned invalid value, ignoring") + ).Msg("Hook returned invalid value, ignoring") if idx == 0 { returnVal = params } @@ -161,7 +161,7 @@ func (h *HookConfig) Run( "hookType": hookType, "priority": prio, }, - ).Msgf("Hook returned invalid value, aborting") + ).Msg("Hook returned invalid value, aborting") if idx == 0 { return args, nil } @@ -173,7 +173,7 @@ func (h *HookConfig) Run( "hookType": hookType, "priority": prio, }, - ).Msgf("Hook returned invalid value, removing") + ).Msg("Hook returned invalid value, removing") removeList = append(removeList, prio) if idx == 0 { returnVal = params