Skip to content

Commit

Permalink
improve reconnect listener.
Browse files Browse the repository at this point in the history
repeat failed operation
  • Loading branch information
andyollylarkin committed Sep 1, 2023
1 parent 62d27f8 commit 94ac61c
Showing 1 changed file with 68 additions and 52 deletions.
120 changes: 68 additions & 52 deletions pkg/connection/reconnectListenerWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ func (rw *ReconnectListenerWrapper) reconnect() error {
rw.mu.Lock()
defer rw.mu.Unlock()

rw.reconnectLock()
defer rw.reconnectUnlock()

reconSucess := make(chan error, 0)

go func() {
for {
if rw.logger != nil {
rw.logger.Debug("Wait new client", nil)
}

conn, err := rw.listener.Accept()
if err != nil {
if rw.logger != nil {
Expand Down Expand Up @@ -113,88 +117,102 @@ func (rw *ReconnectListenerWrapper) reconnect() error {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (rw *ReconnectListenerWrapper) Read(b []byte) (n int, err error) {
func (rw *ReconnectListenerWrapper) Read(b []byte) (n int, err error) { //nolint:dupl
rw.mu.RLock()
defer rw.mu.RUnlock()

rw.reconnectLockWait()
rw.underlyingConnection.SetReadDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Read(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
// if a write/read error occurs, we will reconnect but lose the message we were trying to send/receive.
// Therefore, after reconnection, the cycle will continue and there will be another attempt to send / read the message.
// If this attempt is successful, then we will exit the loop.
for {
rw.underlyingConnection.SetReadDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Read(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
}

return n, watermillnet.ErrIOTimeout
}

rw.mu.RUnlock()
if rw.logger != nil {
rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err,
watermill.LogFields{"op": "read"})
}

return n, watermillnet.ErrIOTimeout
}
err = rw.reconnect()

if rw.logger != nil {
rw.logger.Info("Unable to communicate with the remote side. Listen new connection",
watermill.LogFields{"op": "read"})
}
rw.mu.RUnlock()
if err != nil {
return n, err
}

rw.reconnectLock()
err = rw.reconnect()
rw.reconnectUnlock()
if rw.logger != nil {
rw.logger.Debug("Reread message",
watermill.LogFields{"op": "write"})
}

if err != nil {
return n, err
continue
} else {
break
}

return n, err
}

rw.mu.RUnlock()

return n, err
}

// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (rw *ReconnectListenerWrapper) Write(b []byte) (n int, err error) {
func (rw *ReconnectListenerWrapper) Write(b []byte) (n int, err error) { //nolint:dupl
rw.mu.RLock()
defer rw.mu.RUnlock()

rw.reconnectLockWait()
rw.underlyingConnection.SetWriteDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Write(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "write"})
// if a write/read error occurs, we will reconnect but lose the message we were trying to send/receive.
// Therefore, after reconnection, the cycle will continue and there will be another attempt to send / read the message.
// If this attempt is successful, then we will exit the loop.
for {
rw.underlyingConnection.SetWriteDeadline(time.Now().Add(rw.readWriteTimeout))
n, err = rw.underlyingConnection.Write(b)

if err != nil { //nolint: nestif
// dont reconnect when timeout happens
if internal.IsTimeoutError(err) {
if rw.logger != nil {
rw.logger.Info("Timeout", watermill.LogFields{"op": "read"})
}

return n, watermillnet.ErrIOTimeout
}

rw.mu.RUnlock()
if rw.logger != nil {
rw.logger.Error("Unable to communicate with the remote side. attempt to reconnect", err,
watermill.LogFields{"op": "write"})
}

return n, watermillnet.ErrIOTimeout
}
err = rw.reconnect()

if rw.logger != nil {
rw.logger.Info("Unable to communicate with the remote side. Listen new connection",
watermill.LogFields{"op": "write"})
}
rw.mu.RUnlock()
if err != nil {
return n, err
}

rw.reconnectLock()
err = rw.reconnect()
rw.reconnectUnlock()
if rw.logger != nil {
rw.logger.Debug("Resend message",
watermill.LogFields{"op": "write"})
}

if err != nil {
return n, err
continue
} else {
break
}

return n, err
}

rw.mu.RUnlock()

return n, err
}

Expand Down Expand Up @@ -289,9 +307,7 @@ func (rw *ReconnectListenerWrapper) Connect(addr net.Addr) error {

rw.mu.RUnlock()

rw.reconnectLock()
err = rw.reconnect()
rw.reconnectUnlock()

if err != nil {
return err
Expand Down

0 comments on commit 94ac61c

Please sign in to comment.