Skip to content

Commit

Permalink
MB-54959 - adjust log level for watcher on valid closes
Browse files Browse the repository at this point in the history
bg: we create metadata provider object on all indexer nodes periodically
this starts watcher for each node. upon cleanup, peer pipe logs an error
that connection is terminated but this is expected behaviour. this error
log is creating alerts in customer log monitors

exp: if we are killing the peer pipe connection on a valid condition
don't log it as error

stubs: in my experiments, when we kill a server (with -9/-15 flag)
the connection is still gracefully terminated and go returns io.EOF
because of this we cannot diff between indexer crash and conn close
at gometa level. metadata provider will still be aware if conn was
expected to die or closed abruptly

assumptions: -
Change-Id: I015ac0e036a1ad07ffacb20d3ef760149047d45d
  • Loading branch information
NightWing1998 committed Jan 25, 2023
1 parent 2a14cff commit e041ad6
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions common/peerPipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync"

Expand Down Expand Up @@ -212,9 +213,26 @@ func (p *PeerPipe) doReceive() {
// read packet len
lenBuf, err := p.readBytes(8, nil)
if err != nil {
// if encountering an error, kill the pipe.
log.Current.Errorf("PeerPipe.doRecieve() : ecounter error when received mesasage from Peer %s. Error = %s. Kill Pipe.",
// err indicates conn termination. conditions of termination:
// 1. when other peer gracefully closes the connection, we get io.EOF (indexer restart closes network gracefully)
// 2. if we close the connection, we get "use of closed network connection" as read error
// 3. any network error (for eg: read: operation timed out)
// to diff between 2 & 3, we can read isClosed from PeerPipe with lock

isClosed := func () bool {
p.mutex.Lock()
defer p.mutex.Unlock()
return p.isClosed
}()

if err == io.EOF || isClosed {
log.Current.Infof("PeerPipe.doRecieve() : connection terminated for %s. Reason = %s. Cleaning Pipe.",
p.GetAddr(), err.Error())
} else {
// if encountering an error, kill the pipe.
log.Current.Errorf("PeerPipe.doRecieve() : ecounter error when received mesasage from Peer %s. Error = %s. Kill Pipe.",
p.GetAddr(), err.Error())
}
return
}
size := binary.BigEndian.Uint64(lenBuf)
Expand All @@ -230,8 +248,20 @@ func (p *PeerPipe) doReceive() {
buf, err := p.readBytes(size, readahead)
if err != nil {
// if encountering an error, kill the pipe.
log.Current.Errorf("PeerPipe.doRecieve() : ecounter error when received mesasage from Peer %s. Error = %s. Kill Pipe.",
isClosed := func () bool {
p.mutex.Lock()
defer p.mutex.Unlock()
return p.isClosed
}()

if err == io.EOF || isClosed {
log.Current.Infof("PeerPipe.doRecieve() : connection terminated for %s. Reason = %s. Cleaning Pipe.",
p.GetAddr(), err.Error())
} else {
// if encountering an error, kill the pipe.
log.Current.Errorf("PeerPipe.doRecieve() : ecounter error when received mesasage from Peer %s. Error = %s. Kill Pipe.",
p.GetAddr(), err.Error())
}
return
}
// unmarshall the content and put it in the channel
Expand Down

0 comments on commit e041ad6

Please sign in to comment.