Skip to content

Commit

Permalink
Fix panic() in internal/connection when writing to a closed channel d…
Browse files Browse the repository at this point in the history
…uring close (#539)

The race is as follows:
T1 - calls SendRequestNoWait(), checks the connection state, and prepares
     to enter the select statement
T2 - calls TriggerClose() closes cnx and the closeCh
T3 - run() go-routine for processing incomingRequestsCh drops into
     case <-closeCh: and calls failLeftRequestsWhenClose() which drains
     and closes incomingRequestsCh
T1 - resumes and drops into the select where both closeCh and
     incomingRequestsCh are closed.

When two cases of a `select` are valid, the case executed is chosen at
random; see https://tour.golang.org/concurrency/5

This commit introduces a connectionClosing state and a wait group to track
writes by the SendRequest() methods.
* TriggerClose() moves the connection into the connectionClosing state
  before the closeCh is closed.
* The failLeftRequestsWhenClosed() method waits on the waitgroup for
  outstanding SendRequest() methods to complete before it closes
  the incomingRequestsCh
* The SendRequest() methods first add to the waitgroup before checking the
  connection state; if the state is either closing or closed, SendRequest()
  returns an error.

With the above it is not possible for thread to attempt to add a request
to the incomingRequestsCh without being tracked by the waitgroup, and the
incomingRequestsCh will not be closed until operations tracked by the
waitgroup have completed.

Signed-off-by: Daniel Ferstay <dferstay@splunk.com>

Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
Co-authored-by: xiaolongran <xiaolongran@tencent.com>
  • Loading branch information
3 people committed Jun 30, 2021
1 parent 1a9f356 commit 8139a2c
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type connectionState int32
const (
connectionInit = iota
connectionReady
connectionClosing
connectionClosed
)

Expand All @@ -98,6 +99,8 @@ func (s connectionState) String() string {
return "Initializing"
case connectionReady:
return "Ready"
case connectionClosing:
return "Closing"
case connectionClosed:
return "Closed"
default:
Expand Down Expand Up @@ -142,6 +145,7 @@ type connection struct {

requestIDGenerator uint64

incomingRequestsWG sync.WaitGroup
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
closeCh chan interface{}
Expand Down Expand Up @@ -333,10 +337,15 @@ func (c *connection) waitUntilReady() error {
}

func (c *connection) failLeftRequestsWhenClose() {
// wait for outstanding incoming requests to complete before draining
// and closing the channel
c.incomingRequestsWG.Wait()

reqLen := len(c.incomingRequestsCh)
for i := 0; i < reqLen; i++ {
c.internalSendRequest(<-c.incomingRequestsCh)
}

close(c.incomingRequestsCh)
}

Expand Down Expand Up @@ -549,8 +558,13 @@ func (c *connection) Write(data Buffer) {

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
if c.getState() == connectionClosed {
c.incomingRequestsWG.Add(1)
defer c.incomingRequestsWG.Done()

state := c.getState()
if state == connectionClosed || state == connectionClosing {
callback(req, ErrConnectionClosed)

} else {
select {
case <-c.closeCh:
Expand All @@ -566,7 +580,11 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
}

func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
if c.getState() == connectionClosed {
c.incomingRequestsWG.Add(1)
defer c.incomingRequestsWG.Done()

state := c.getState()
if state == connectionClosed || state == connectionClosing {
return ErrConnectionClosed
}

Expand Down Expand Up @@ -760,6 +778,8 @@ func (c *connection) UnregisterListener(id uint64) {
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
c.closeOnce.Do(func() {
c.setState(connectionClosing)

cnx := c.cnx
if cnx != nil {
cnx.Close()
Expand All @@ -780,9 +800,10 @@ func (c *connection) Close() {
}

c.log.Info("Connection closed")
c.TriggerClose()
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
c.TriggerClose()

c.pingTicker.Stop()
c.pingCheckTicker.Stop()

Expand Down

0 comments on commit 8139a2c

Please sign in to comment.