From 6d38dceff67f75a0ce26a34305121d8f3870eb6b Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Fri, 30 Dec 2022 22:46:10 +0100 Subject: [PATCH 1/3] Use unformatted log msg function --- network/proxy.go | 10 +++++----- network/server.go | 4 ++-- plugin/hooks.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index 4aaee962..a7e12715 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -93,7 +93,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError { client, err := pr.TryReconnect(client) if err != nil { - pr.logger.Error().Err(err).Msgf("Failed to connect to the client") + pr.logger.Error().Err(err).Msg("Failed to connect to the client") } if err := pr.busyConnections.Put(gconn, client); err != nil { @@ -120,13 +120,13 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) *gerr.GatewayDError { if !client.IsConnected() { _, err := pr.TryReconnect(client) if err != nil { - pr.logger.Error().Err(err).Msgf("Failed to reconnect to the client") + pr.logger.Error().Err(err).Msg("Failed to reconnect to the client") } } // If the client is not in the pool, put it back err := pr.availableConnections.Put(client.ID, client) if err != nil { - pr.logger.Error().Err(err).Msgf("Failed to put the client back in the pool") + pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool") } } else { return gerr.ErrClientNotConnected @@ -223,7 +223,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { // Send the query to the server sent, err := client.Send(request) if err != nil { - pr.logger.Error().Err(err).Msgf("Error sending data to database") + pr.logger.Error().Err(err).Msg("Error sending data to database") } pr.logger.Debug().Fields( map[string]interface{}{ @@ -286,7 +286,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { return err }) if origErr != nil { - pr.logger.Error().Err(err).Msgf("Error writing to client") + pr.logger.Error().Err(err).Msg("Error writing to client") return gerr.ErrServerSendFailed.Wrap(err) } diff --git a/network/server.go b/network/server.go index eba7abf5..a691a894 100644 --- a/network/server.go +++ b/network/server.go @@ -333,7 +333,7 @@ func NewServer( if hardLimit == 0 { server.HardLimit = limits.Max - logger.Debug().Msgf("Hard limit is not set, using the current system hard limit") + logger.Debug().Msg("Hard limit is not set, using the current system hard limit") } else { server.HardLimit = hardLimit logger.Debug().Msgf("Hard limit is set to %d", hardLimit) @@ -341,7 +341,7 @@ func NewServer( if tickInterval == 0 { server.TickInterval = DefaultTickInterval - logger.Debug().Msgf("Tick interval is not set, using the default value") + logger.Debug().Msg("Tick interval is not set, using the default value") } else { server.TickInterval = tickInterval } diff --git a/plugin/hooks.go b/plugin/hooks.go index 7b4af199..d90a90aa 100644 --- a/plugin/hooks.go +++ b/plugin/hooks.go @@ -150,7 +150,7 @@ func (h *HookConfig) Run( "hookType": hookType, "priority": prio, }, - ).Msgf("Hook returned invalid value, ignoring") + ).Msg("Hook returned invalid value, ignoring") if idx == 0 { returnVal = params } @@ -161,7 +161,7 @@ func (h *HookConfig) Run( "hookType": hookType, "priority": prio, }, - ).Msgf("Hook returned invalid value, aborting") + ).Msg("Hook returned invalid value, aborting") if idx == 0 { return args, nil } @@ -173,7 +173,7 @@ func (h *HookConfig) Run( "hookType": hookType, "priority": prio, }, - ).Msgf("Hook returned invalid value, removing") + ).Msg("Hook returned invalid value, removing") removeList = append(removeList, prio) if idx == 0 { returnVal = params From b4c4c11bd1cdaded2ce205a2a5cb8868d51e6195 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 31 Dec 2022 00:04:37 +0100 Subject: [PATCH 2/3] Detect disconnects from server and reconnect --- network/proxy.go | 26 ++++++++++++++++++++++++++ network/server.go | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/network/proxy.go b/network/proxy.go index a7e12715..b656554a 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -2,6 +2,8 @@ package network import ( "context" + "errors" + "io" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" @@ -245,6 +247,30 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { }, ).Msg("Received data from database") + // The connection to the server is closed, so we MUST reconnect, + // otherwise the client will be stuck. + if received == 0 && err != nil && errors.Is(err.Unwrap(), io.EOF) { + pr.logger.Debug().Fields( + map[string]interface{}{ + "function": "proxy.passthrough", + "local": client.Conn.LocalAddr().String(), + "remote": client.Conn.RemoteAddr().String(), + }).Msgf("Client disconnected") + + client.Close() + client = NewClient( + pr.ClientConfig.Network, + pr.ClientConfig.Address, + pr.ClientConfig.ReceiveBufferSize, + pr.ClientConfig.ReceiveChunkSize, + pr.ClientConfig.ReceiveDeadline, + pr.ClientConfig.SendDeadline, + pr.logger, + ) + pr.busyConnections.Remove(gconn) + pr.busyConnections.Put(gconn, client) + } + egress := map[string]interface{}{ "response": response[:received], // Will be converted to base64-encoded string "error": "", diff --git a/network/server.go b/network/server.go index a691a894..f09d7da3 100644 --- a/network/server.go +++ b/network/server.go @@ -196,7 +196,7 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { case errors.Is(err, gerr.ErrClientNotConnected): case errors.Is(err, gerr.ErrClientSendFailed): case errors.Is(err, gerr.ErrClientReceiveFailed): - case errors.Is(err, io.EOF): + case errors.Is(err.Unwrap(), io.EOF): return gnet.Close } } From 231129237b517adffd75ac7d999580ac5a29136b Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sat, 31 Dec 2022 00:09:18 +0100 Subject: [PATCH 3/3] Check the result of put --- network/proxy.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/network/proxy.go b/network/proxy.go index b656554a..84047c29 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -268,7 +268,10 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { pr.logger, ) pr.busyConnections.Remove(gconn) - pr.busyConnections.Put(gconn, client) + if err := pr.busyConnections.Put(gconn, client); err != nil { + // This should never happen + return err + } } egress := map[string]interface{}{