Skip to content

Commit

Permalink
add resend/reread message
Browse files Browse the repository at this point in the history
  • Loading branch information
andyollylarkin committed Aug 28, 2023
1 parent 0614c37 commit d03a9e3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 48 deletions.
14 changes: 5 additions & 9 deletions internal/errors.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package internal

import "net"
import (
"errors"
"os"
)

func IsTimeoutError(err error) bool {
if err != nil {
e, ok := err.(*net.OpError)
if ok && e.Timeout() {
return true
}
}

return false
return errors.Is(err, os.ErrDeadlineExceeded)
}
105 changes: 66 additions & 39 deletions pkg/connection/reconnectWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (rw *ReconnectWrapper) reconnectUnlock() {
}

func (rw *ReconnectWrapper) reconnect() error {
rw.reconnectLock()
defer rw.reconnectUnlock()

err := retry.Do(rw.ctx, rw.backoffPolicy, rw.connectContextAdapter)
if err != nil {
return err
Expand All @@ -118,35 +121,48 @@ func (rw *ReconnectWrapper) reconnect() error {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (rw *ReconnectWrapper) Read(b []byte) (n int, err error) {
func (rw *ReconnectWrapper) Read(b []byte) (n int, err error) { //nolint:dupl
rw.mu.RLock()
defer rw.mu.RUnlock()

rw.reconnectLockWait()
rw.underlyingConnection.SetReadDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Read(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
// if a write/read error occurs, we will reconnect but lose the message we were trying to send/receive.
// Therefore, after reconnection, the cycle will continue and there will be another attempt to send / read the message.
// If this attempt is successful, then we will exit the loop.
for {
rw.underlyingConnection.SetReadDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Read(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
}

return n, watermillnet.ErrIOTimeout
}

if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err,
watermill.LogFields{"op": "read"})
}

return n, watermillnet.ErrIOTimeout
}
err = rw.reconnect()

if rw.logger != nil {
rw.logger.Info("Unable to communicate with the remote side. attempt to reconnect",
watermill.LogFields{"op": "read"})
}
if err != nil {
return n, err
}

rw.reconnectLock()
err = rw.reconnect()
rw.reconnectUnlock()
if rw.logger != nil {
rw.logger.Debug("Reread message",
watermill.LogFields{"op": "write"})
}

if err != nil {
return n, err
continue
} else {
break
}
}

Expand All @@ -156,35 +172,48 @@ func (rw *ReconnectWrapper) Read(b []byte) (n int, err error) {
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (rw *ReconnectWrapper) Write(b []byte) (n int, err error) {
func (rw *ReconnectWrapper) Write(b []byte) (n int, err error) { //nolint:dupl
rw.mu.RLock()
defer rw.mu.RUnlock()

rw.reconnectLockWait()
rw.underlyingConnection.SetWriteDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Write(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
// if a write/read error occurs, we will reconnect but lose the message we were trying to send/receive.
// Therefore, after reconnection, the cycle will continue and there will be another attempt to send / read the message.
// If this attempt is successful, then we will exit the loop.
for {
rw.underlyingConnection.SetWriteDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Write(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
}

return n, watermillnet.ErrIOTimeout
}

if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err,
watermill.LogFields{"op": "write"})
}

return n, watermillnet.ErrIOTimeout
}
err = rw.reconnect()

if rw.logger != nil {
rw.logger.Info("Unable to communicate with the remote side. attempt to reconnect",
watermill.LogFields{"op": "write"})
}
if err != nil {
return n, err
}

rw.reconnectLock()
err = rw.reconnect()
rw.reconnectUnlock()
if rw.logger != nil {
rw.logger.Debug("Resend message",
watermill.LogFields{"op": "write"})
}

if err != nil {
return n, err
continue
} else {
break
}
}

Expand Down Expand Up @@ -277,13 +306,11 @@ func (rw *ReconnectWrapper) Connect(addr net.Addr) error {

if err != nil {
if rw.logger != nil {
rw.logger.Info("Unable to communicate with the remote side. attempt to reconnect",
rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err,
watermill.LogFields{"op": "connect"})
}

rw.reconnectLock()
err = rw.reconnect()
rw.reconnectUnlock()

if err != nil {
return err
Expand Down

0 comments on commit d03a9e3

Please sign in to comment.