From 038ca5d6821015abef3915d4dea58a1e5989458c Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 16 Oct 2025 20:01:48 +0800 Subject: [PATCH 1/3] Fix nullptr after listener consumer closed --- lib/MultiTopicsConsumerImpl.cc | 4 +++- tests/ConsumerTest.cc | 42 ++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index eb3546dd..7d734036 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Me void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) { Message m; - incomingMessages_.pop(m); + if (!incomingMessages_.pop(m)) { + return; + } try { Consumer self{get_shared_this_ptr()}; messageProcessed(m); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index b4c72611..941fee27 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1519,4 +1519,46 @@ TEST(ConsumerTest, testDuplicatedTopics) { } } +TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { + Client client(lookupUrl); + + const int MSG_COUNT = 100; + std::string topicName = "persistent://public/default/my-topic-" + std::to_string(time(nullptr)); + + // 1. Create producer send 100 msgs + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); + for (int i = 0; i < MSG_COUNT; ++i) { + std::string msg = "my-message-" + std::to_string(i); + Message message = MessageBuilder().setContent(msg).build(); + ASSERT_EQ(ResultOk, producer.send(message)); + } + ASSERT_EQ(ResultOk, producer.flush()); + + // 2. Create consumer with listener + Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + Latch latchFirstReceiveMsg(1); + Latch latchAfterClosed(1); + consumerConfig.setMessageListener([&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) { + latchFirstReceiveMsg.countdown(); + std::cout << "Consume message: " << msg.getDataAsString() << std::endl; + latchAfterClosed.wait(); + }); + auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer); + ASSERT_EQ(ResultOk, result); + + // 3. wait first message consumed in listener and then close consumer. + latchFirstReceiveMsg.wait(); + ASSERT_EQ(ResultOk, consumer.close()); + latchAfterClosed.countdown(); + + ASSERT_EQ(ResultOk, producer.close()); + ASSERT_EQ(ResultOk, client.close()); +} + + } // namespace pulsar From 5f2058ecbae44a90b43fdf673a9663ee60475522 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 16 Oct 2025 20:14:46 +0800 Subject: [PATCH 2/3] code format --- tests/ConsumerTest.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 941fee27..1ea24b66 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1543,11 +1543,12 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); Latch latchFirstReceiveMsg(1); Latch latchAfterClosed(1); - consumerConfig.setMessageListener([&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) { - latchFirstReceiveMsg.countdown(); - std::cout << "Consume message: " << msg.getDataAsString() << std::endl; - latchAfterClosed.wait(); - }); + consumerConfig.setMessageListener( + [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) { + latchFirstReceiveMsg.countdown(); + std::cout << "Consume message: " << msg.getDataAsString() << std::endl; + latchAfterClosed.wait(); + }); auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer); ASSERT_EQ(ResultOk, result); @@ -1560,5 +1561,4 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { ASSERT_EQ(ResultOk, client.close()); } - } // namespace pulsar From 5b7f57ce254ddb3d42083de951480f267d34fa1b Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 16 Oct 2025 20:43:25 +0800 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Yunze Xu --- tests/ConsumerTest.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 1ea24b66..dfbc2765 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1535,7 +1535,6 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { Message message = MessageBuilder().setContent(msg).build(); ASSERT_EQ(ResultOk, producer.send(message)); } - ASSERT_EQ(ResultOk, producer.flush()); // 2. Create consumer with listener Consumer consumer; @@ -1546,7 +1545,7 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { consumerConfig.setMessageListener( [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) { latchFirstReceiveMsg.countdown(); - std::cout << "Consume message: " << msg.getDataAsString() << std::endl; + LOG_INFO("Consume message: " << msg.getDataAsString()); latchAfterClosed.wait(); }); auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer);