Skip to content

Commit

Permalink
Fix channel data race (#558)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolongran <xiaolongran@tencent.com>


### Motivation

In `internalSendRequest`, We will add the request to be sent to the `pendingReqs` map, even when the current connection status is `connectionClosed`, we will append the request, which will cause the current request's callback to be called twice

First:

```
func (c *connection) internalSendRequest(req *request) {
	c.pendingLock.Lock()
	if req.id != nil {
		c.pendingReqs[*req.id] = req
	}
	c.pendingLock.Unlock()
	if c.getState() == connectionClosed {
		c.log.Warnf("internalSendRequest failed for connectionClosed")
                // In Here, call req.callback *************
		if req.callback != nil {
			req.callback(req.cmd, ErrConnectionClosed)
		}
	} else {
		c.writeCommand(req.cmd)
	}
}
```

Twice:

```
func (c *connection) run() {
	// All reads come from the reader goroutine
	go c.reader.readFromConnection()
	go c.runPingCheck()

	c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
		cap(c.incomingRequestsCh), len(c.incomingRequestsCh))

	defer func() {
		// all the accesses to the pendingReqs should be happened in this run loop thread,
		// including the final cleanup, to avoid the issue #239
		c.pendingLock.Lock()
		for id, req := range c.pendingReqs {
                         // In Here, call req.callback **********
			req.callback(nil, errors.New("connection closed"))
			delete(c.pendingReqs, id)
		}
		c.pendingLock.Unlock()
		c.Close()
	}()
       ....
}
```

In fact, when the current connection is in the `connectionClosed` state, we don’t need to append the request to the `pendingReqs` map, so we don’t need to process the request when it’s closed.


### Modifications

When the connection is closed, the current request to be sent is not added to the `pendingReqs` map.
  • Loading branch information
wolfstudy committed Jun 30, 2021
1 parent 0f3e504 commit ea3e054
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,17 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
}

func (c *connection) internalSendRequest(req *request) {
c.pendingLock.Lock()
if req.id != nil {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
if c.getState() == connectionClosed {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
}
} else {
c.pendingLock.Lock()
if req.id != nil {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
c.writeCommand(req.cmd)
}
}
Expand Down

0 comments on commit ea3e054

Please sign in to comment.