diff --git a/pkg/connection/reconnectListenerWrapper.go b/pkg/connection/reconnectListenerWrapper.go index 79d3ed9..16c483f 100644 --- a/pkg/connection/reconnectListenerWrapper.go +++ b/pkg/connection/reconnectListenerWrapper.go @@ -75,6 +75,9 @@ func (rw *ReconnectListenerWrapper) reconnect() error { rw.mu.Lock() defer rw.mu.Unlock() + rw.reconnectLock() + defer rw.reconnectUnlock() + reconSucess := make(chan error, 0) go func() { @@ -82,6 +85,7 @@ func (rw *ReconnectListenerWrapper) reconnect() error { if rw.logger != nil { rw.logger.Debug("Wait new client", nil) } + conn, err := rw.listener.Accept() if err != nil { if rw.logger != nil { @@ -113,88 +117,102 @@ func (rw *ReconnectListenerWrapper) reconnect() error { // Read reads data from the connection. // Read can be made to time out and return an error after a fixed // time limit; see SetDeadline and SetReadDeadline. -func (rw *ReconnectListenerWrapper) Read(b []byte) (n int, err error) { +func (rw *ReconnectListenerWrapper) Read(b []byte) (n int, err error) { //nolint:dupl rw.mu.RLock() + defer rw.mu.RUnlock() rw.reconnectLockWait() - rw.underlyingConnection.SetReadDeadline(time.Now().Add(rw.readWriteTimeout)) - n, err = rw.underlyingConnection.Read(b) - if err != nil { //nolint: nestif - // dont reconnect when timeout happens - if internal.IsTimeoutError(err) { - if rw.logger != nil { - rw.logger.Info("Timeout", watermill.LogFields{"op": "read"}) + // if a write/read error occurs, we will reconnect but lose the message we were trying to send/receive. + // Therefore, after reconnection, the cycle will continue and there will be another attempt to send / read the message. + // If this attempt is successful, then we will exit the loop. + for { + rw.underlyingConnection.SetReadDeadline(time.Now().Add(rw.readWriteTimeout)) + n, err = rw.underlyingConnection.Read(b) + + if err != nil { //nolint: nestif + // dont reconnect when timeout happens + if internal.IsTimeoutError(err) { + if rw.logger != nil { + rw.logger.Info("Timeout", watermill.LogFields{"op": "read"}) + } + + return n, watermillnet.ErrIOTimeout } - rw.mu.RUnlock() + if rw.logger != nil { + rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err, + watermill.LogFields{"op": "read"}) + } - return n, watermillnet.ErrIOTimeout - } + err = rw.reconnect() - if rw.logger != nil { - rw.logger.Info("Unable to communicate with the remote side. Listen new connection", - watermill.LogFields{"op": "read"}) - } - rw.mu.RUnlock() + if err != nil { + return n, err + } - rw.reconnectLock() - err = rw.reconnect() - rw.reconnectUnlock() + if rw.logger != nil { + rw.logger.Debug("Reread message", + watermill.LogFields{"op": "write"}) + } - if err != nil { - return n, err + continue + } else { + break } - - return n, err } - rw.mu.RUnlock() - return n, err } // Write writes data to the connection. // Write can be made to time out and return an error after a fixed // time limit; see SetDeadline and SetWriteDeadline. -func (rw *ReconnectListenerWrapper) Write(b []byte) (n int, err error) { +func (rw *ReconnectListenerWrapper) Write(b []byte) (n int, err error) { //nolint:dupl rw.mu.RLock() + defer rw.mu.RUnlock() rw.reconnectLockWait() - rw.underlyingConnection.SetWriteDeadline(time.Now().Add(rw.readWriteTimeout)) - n, err = rw.underlyingConnection.Write(b) - if err != nil { //nolint: nestif - // dont reconnect when timeout happens - if internal.IsTimeoutError(err) { - if rw.logger != nil { - rw.logger.Info("Timeout", watermill.LogFields{"op": "write"}) + // if a write/read error occurs, we will reconnect but lose the message we were trying to send/receive. + // Therefore, after reconnection, the cycle will continue and there will be another attempt to send / read the message. + // If this attempt is successful, then we will exit the loop. + for { + rw.underlyingConnection.SetWriteDeadline(time.Now().Add(rw.readWriteTimeout)) + n, err = rw.underlyingConnection.Write(b) + + if err != nil { //nolint: nestif + // dont reconnect when timeout happens + if internal.IsTimeoutError(err) { + if rw.logger != nil { + rw.logger.Info("Timeout", watermill.LogFields{"op": "read"}) + } + + return n, watermillnet.ErrIOTimeout } - rw.mu.RUnlock() + if rw.logger != nil { + rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err, + watermill.LogFields{"op": "write"}) + } - return n, watermillnet.ErrIOTimeout - } + err = rw.reconnect() - if rw.logger != nil { - rw.logger.Info("Unable to communicate with the remote side. Listen new connection", - watermill.LogFields{"op": "write"}) - } - rw.mu.RUnlock() + if err != nil { + return n, err + } - rw.reconnectLock() - err = rw.reconnect() - rw.reconnectUnlock() + if rw.logger != nil { + rw.logger.Debug("Resend message", + watermill.LogFields{"op": "write"}) + } - if err != nil { - return n, err + continue + } else { + break } - - return n, err } - rw.mu.RUnlock() - return n, err } @@ -289,9 +307,7 @@ func (rw *ReconnectListenerWrapper) Connect(addr net.Addr) error { rw.mu.RUnlock() - rw.reconnectLock() err = rw.reconnect() - rw.reconnectUnlock() if err != nil { return err