Skip to content

Commit

Permalink
Fix panic() in internal/connection when writing to a closed channel d…
Browse files Browse the repository at this point in the history
…uring close

The race is as follows:
T1 - calls SendRequestNoWait(), checks the connection state, and prepares
     to enter the select statement
T2 - calls TriggerClose() closes cnx and the closeCh
T3 - run() go-routine for processing incomingRequestsCh drops into
     case <-closeCh: and calls failLeftRequestsWhenClose() which drains
     and closes incomingRequestsCh
T1 - resumes and drops into the select where both closeCh and
     incomingRequestsCh are closed.

When two cases of a `select` are valid, the case executed is chosen at
random; see https://tour.golang.org/concurrency/5

This commit introduces a connectionClosing state and a wait group to track
writes by the SendRequest() methods.
* TriggerClose() moves the connection into the connectionClosing state
  before the closeCh is closed.
* The failLeftRequestsWhenClosed() method waits on the waitgroup for
  outstanding SendRequest() methods to complete before it closes
  the incomingRequestsCh
* The SendRequest() methods first add to the waitgroup before checking the
  connection state; if the state is either closing or closed, SendRequest()
  returns an error.

With the above it is not possible for thread to attempt to add a request
to the incomingRequestsCh without being tracked by the waitgroup, and the
incomingRequestsCh will not be closed until operations tracked by the
waitgroup have completed.

Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
  • Loading branch information
Daniel Ferstay committed Jun 11, 2021
1 parent 8a78d2c commit 6b3f8b0
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type connectionState int32
const (
connectionInit = iota
connectionReady
connectionClosing
connectionClosed
)

Expand All @@ -98,6 +99,8 @@ func (s connectionState) String() string {
return "Initializing"
case connectionReady:
return "Ready"
case connectionClosing:
return "Closing"
case connectionClosed:
return "Closed"
default:
Expand Down Expand Up @@ -142,6 +145,7 @@ type connection struct {

requestIDGenerator uint64

incomingRequestsWG sync.WaitGroup
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
closeCh chan interface{}
Expand Down Expand Up @@ -335,6 +339,9 @@ func (c *connection) failLeftRequestsWhenClose() {
for i := 0; i < reqLen; i++ {
c.internalSendRequest(<-c.incomingRequestsCh)
}

c.incomingRequestsWG.Wait()

close(c.incomingRequestsCh)
}

Expand Down Expand Up @@ -546,38 +553,36 @@ func (c *connection) Write(data Buffer) {

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
if c.getState() == connectionClosed {
c.incomingRequestsWG.Add(1)
defer c.incomingRequestsWG.Done()

state := c.getState()
if state == connectionClosed || state == connectionClosing {
callback(req, ErrConnectionClosed)
} else {
select {
case <-c.closeCh:
callback(req, ErrConnectionClosed)

case c.incomingRequestsCh <- &request{
} else {
c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
callback: callback,
}:
}
}
}

func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
if c.getState() == connectionClosed {
return ErrConnectionClosed
}
c.incomingRequestsWG.Add(1)
defer c.incomingRequestsWG.Done()

select {
case <-c.closeCh:
state := c.getState()
if state == connectionClosed || state == connectionClosing {
return ErrConnectionClosed

case c.incomingRequestsCh <- &request{
}
c.incomingRequestsCh <- &request{
id: nil,
cmd: req,
callback: nil,
}:
return nil
}
return nil
}

func (c *connection) internalSendRequest(req *request) {
Expand All @@ -586,7 +591,8 @@ func (c *connection) internalSendRequest(req *request) {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
if c.getState() == connectionClosed {
state := c.getState()
if state == connectionClosed || state == connectionClosing {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
Expand Down Expand Up @@ -755,6 +761,8 @@ func (c *connection) UnregisterListener(id uint64) {
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
c.closeOnce.Do(func() {
c.setState(connectionClosing)

cnx := c.cnx
if cnx != nil {
cnx.Close()
Expand All @@ -775,9 +783,10 @@ func (c *connection) Close() {
}

c.log.Info("Connection closed")
c.TriggerClose()
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
c.TriggerClose()

c.pingTicker.Stop()
c.pingCheckTicker.Stop()

Expand Down

0 comments on commit 6b3f8b0

Please sign in to comment.