Skip to content

Commit

Permalink
Release locks before calling producer consumer response callbacks. (#542
Browse files Browse the repository at this point in the history
)
  • Loading branch information
cckellogg committed Jun 17, 2021
1 parent cb72395 commit 579984e
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ type connection struct {

pendingLock sync.Mutex
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener

listenersLock sync.RWMutex
listeners map[uint64]ConnectionListener

consumerHandlersLock sync.RWMutex
consumerHandlers map[uint64]ConsumerHandler
Expand Down Expand Up @@ -343,7 +345,8 @@ func (c *connection) run() {
go c.reader.readFromConnection()
go c.runPingCheck()

c.log.Debugf("Connection run start channel %+v, requestLength %d", c, len(c.incomingRequestsCh))
c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
cap(c.incomingRequestsCh), len(c.incomingRequestsCh))

defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
Expand Down Expand Up @@ -631,9 +634,9 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) {
func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
producerID := response.GetProducerId()

c.Lock()
c.listenersLock.RLock()
producer, ok := c.listeners[producerID]
c.Unlock()
c.listenersLock.RUnlock()

if ok {
producer.ReceivedSendReceipt(response)
Expand Down Expand Up @@ -712,9 +715,6 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)

c.Lock()
defer c.Unlock()

if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
c.DeleteConsumeHandler(consumerID)
Expand All @@ -727,31 +727,36 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()

c.Lock()
defer c.Unlock()
if producer, ok := c.listeners[producerID]; ok {
producer.ConnectionClosed()
c.listenersLock.Lock()
producer, ok := c.listeners[producerID]
if ok {
delete(c.listeners, producerID)
}
c.listenersLock.Unlock()

// did we find a producer?
if ok {
producer.ConnectionClosed()
} else {
c.log.WithField("producerID", producerID).Warn("Producer with ID not found while closing producer")
}
}

func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
c.Lock()
defer c.Unlock()
c.listenersLock.Lock()
defer c.listenersLock.Unlock()

c.listeners[id] = listener
}

func (c *connection) UnregisterListener(id uint64) {
c.Lock()
defer c.Unlock()
c.listenersLock.Lock()
defer c.listenersLock.Unlock()

delete(c.listeners, id)
}

// Triggers the connection close by forcing the socket to close and
// TriggerClose the connection close by forcing the socket to close and
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
c.closeOnce.Do(func() {
Expand Down Expand Up @@ -781,7 +786,14 @@ func (c *connection) Close() {
c.pingTicker.Stop()
c.pingCheckTicker.Stop()

for _, listener := range c.listeners {
listeners := make(map[uint64]ConnectionListener)
c.listenersLock.RLock()
for id, listener := range c.listeners {
listeners[id] = listener
}
c.listenersLock.RUnlock()

for _, listener := range listeners {
listener.ConnectionClosed()
}

Expand Down

0 comments on commit 579984e

Please sign in to comment.