diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 9cc12382b..15dfceca8 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -149,7 +149,9 @@ type connection struct { pendingLock sync.Mutex pendingReqs map[uint64]*request - listeners map[uint64]ConnectionListener + + listenersLock sync.RWMutex + listeners map[uint64]ConnectionListener consumerHandlersLock sync.RWMutex consumerHandlers map[uint64]ConsumerHandler @@ -343,7 +345,8 @@ func (c *connection) run() { go c.reader.readFromConnection() go c.runPingCheck() - c.log.Debugf("Connection run start channel %+v, requestLength %d", c, len(c.incomingRequestsCh)) + c.log.Debugf("Connection run starting with request capacity=%d queued=%d", + cap(c.incomingRequestsCh), len(c.incomingRequestsCh)) defer func() { // all the accesses to the pendingReqs should be happened in this run loop thread, @@ -631,9 +634,9 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) { func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { producerID := response.GetProducerId() - c.Lock() + c.listenersLock.RLock() producer, ok := c.listeners[producerID] - c.Unlock() + c.listenersLock.RUnlock() if ok { producer.ReceivedSendReceipt(response) @@ -712,9 +715,6 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) consumerID := closeConsumer.GetConsumerId() c.log.Infof("Broker notification of Closed consumer: %d", consumerID) - c.Lock() - defer c.Unlock() - if consumer, ok := c.consumerHandler(consumerID); ok { consumer.ConnectionClosed() c.DeleteConsumeHandler(consumerID) @@ -727,31 +727,36 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId()) producerID := closeProducer.GetProducerId() - c.Lock() - defer c.Unlock() - if producer, ok := c.listeners[producerID]; ok { - producer.ConnectionClosed() + c.listenersLock.Lock() + producer, ok := c.listeners[producerID] + if ok { delete(c.listeners, producerID) + } + c.listenersLock.Unlock() + + // did we find a producer? + if ok { + producer.ConnectionClosed() } else { c.log.WithField("producerID", producerID).Warn("Producer with ID not found while closing producer") } } func (c *connection) RegisterListener(id uint64, listener ConnectionListener) { - c.Lock() - defer c.Unlock() + c.listenersLock.Lock() + defer c.listenersLock.Unlock() c.listeners[id] = listener } func (c *connection) UnregisterListener(id uint64) { - c.Lock() - defer c.Unlock() + c.listenersLock.Lock() + defer c.listenersLock.Unlock() delete(c.listeners, id) } -// Triggers the connection close by forcing the socket to close and +// TriggerClose the connection close by forcing the socket to close and // broadcasting the notification on the close channel func (c *connection) TriggerClose() { c.closeOnce.Do(func() { @@ -781,7 +786,14 @@ func (c *connection) Close() { c.pingTicker.Stop() c.pingCheckTicker.Stop() - for _, listener := range c.listeners { + listeners := make(map[uint64]ConnectionListener) + c.listenersLock.RLock() + for id, listener := range c.listeners { + listeners[id] = listener + } + c.listenersLock.RUnlock() + + for _, listener := range listeners { listener.ConnectionClosed() }