diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index eb3546dd..7d734036 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Me void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) { Message m; - incomingMessages_.pop(m); + if (!incomingMessages_.pop(m)) { + return; + } try { Consumer self{get_shared_this_ptr()}; messageProcessed(m); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index b4c72611..dfbc2765 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1519,4 +1519,45 @@ TEST(ConsumerTest, testDuplicatedTopics) { } } +TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { + Client client(lookupUrl); + + const int MSG_COUNT = 100; + std::string topicName = "persistent://public/default/my-topic-" + std::to_string(time(nullptr)); + + // 1. Create producer send 100 msgs + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); + for (int i = 0; i < MSG_COUNT; ++i) { + std::string msg = "my-message-" + std::to_string(i); + Message message = MessageBuilder().setContent(msg).build(); + ASSERT_EQ(ResultOk, producer.send(message)); + } + + // 2. Create consumer with listener + Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + Latch latchFirstReceiveMsg(1); + Latch latchAfterClosed(1); + consumerConfig.setMessageListener( + [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) { + latchFirstReceiveMsg.countdown(); + LOG_INFO("Consume message: " << msg.getDataAsString()); + latchAfterClosed.wait(); + }); + auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer); + ASSERT_EQ(ResultOk, result); + + // 3. wait first message consumed in listener and then close consumer. + latchFirstReceiveMsg.wait(); + ASSERT_EQ(ResultOk, consumer.close()); + latchAfterClosed.countdown(); + + ASSERT_EQ(ResultOk, producer.close()); + ASSERT_EQ(ResultOk, client.close()); +} + } // namespace pulsar