From fbe65bbf1ce472c0bbfa708cae50af7360a014f2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 5 Nov 2025 14:08:33 +0800 Subject: [PATCH 1/5] Fix topic lookup segmentation fault after client is closed --- lib/ClientImpl.cc | 37 ++++++++++------- lib/ClientImpl.h | 8 ++++ lib/ResultUtils.h | 3 +- lib/RetryableLookupService.h | 12 +++--- lib/RetryableOperationCache.h | 9 +++- tests/LookupServiceTest.cc | 62 ++++++++++++++++++++++++++++ tests/RetryableOperationCacheTest.cc | 2 +- 7 files changed, 109 insertions(+), 24 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index aa75128b..49b02492 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -78,7 +78,25 @@ typedef std::unique_lock Lock; typedef std::vector StringList; +static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl, + const ClientConfiguration& clientConfiguration, + ConnectionPool& pool, const AuthenticationPtr& auth) { + if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) { + LOG_DEBUG("Using HTTP Lookup"); + return std::make_shared(serviceUrl, std::cref(clientConfiguration), + std::cref(auth)); + } else { + LOG_DEBUG("Using Binary Lookup"); + return std::make_shared(serviceUrl, std::ref(pool), + std::cref(clientConfiguration)); + } +} + ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration) + : ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {} + +ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, + LookupServiceFactory&& lookupServiceFactory) : mutex_(), state_(Open), clientConfiguration_(ClientConfiguration(clientConfiguration) @@ -95,7 +113,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& consumerIdGenerator_(0), closingError(ResultOk), useProxy_(false), - lookupCount_(0L) { + lookupCount_(0L), + lookupServiceFactory_(std::move(lookupServiceFactory)) { std::unique_ptr loggerFactory = clientConfiguration_.impl_->takeLogger(); if (loggerFactory) { LogUtils::setLoggerFactory(std::move(loggerFactory)); @@ -106,19 +125,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& ClientImpl::~ClientImpl() { shutdown(); } LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) { - LookupServicePtr underlyingLookupServicePtr; - if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) { - LOG_DEBUG("Using HTTP Lookup"); - underlyingLookupServicePtr = std::make_shared( - serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr())); - } else { - LOG_DEBUG("Using Binary Lookup"); - underlyingLookupServicePtr = std::make_shared( - serviceUrl, std::ref(pool_), std::cref(clientConfiguration_)); - } - auto lookupServicePtr = RetryableLookupService::create( - underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); + lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()), + clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); return lookupServicePtr; } @@ -665,7 +674,6 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { state_ = Closing; memoryLimitController_.close(); - lookupServicePtr_->close(); for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } @@ -767,6 +775,7 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } + lookupServicePtr_->close(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 000e4433..0b4d5969 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -54,6 +54,8 @@ using ClientConnectionPtr = std::shared_ptr; class LookupService; using LookupServicePtr = std::shared_ptr; +using LookupServiceFactory = std::function; class ProducerImplBase; using ProducerImplBaseWeakPtr = std::weak_ptr; @@ -70,6 +72,11 @@ std::string generateRandomName(); class ClientImpl : public std::enable_shared_from_this { public: ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); + + // only for tests + ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, + LookupServiceFactory&& lookupServiceFactory); + virtual ~ClientImpl(); /** @@ -205,6 +212,7 @@ class ClientImpl : public std::enable_shared_from_this { std::atomic closingError; std::atomic useProxy_; std::atomic lookupCount_; + LookupServiceFactory lookupServiceFactory_; friend class Client; }; diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h index dfba7eb0..cf4ff1fb 100644 --- a/lib/ResultUtils.h +++ b/lib/ResultUtils.h @@ -49,7 +49,8 @@ inline bool isResultRetryable(Result result) { ResultLookupError, ResultTooManyLookupRequestException, ResultProducerBlockedQuotaExceededException, - ResultProducerBlockedQuotaExceededError}; + ResultProducerBlockedQuotaExceededError, + ResultAlreadyClosed}; return fatalResults.find(static_cast(result)) == fatalResults.cend(); } diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 8bc40bf3..bbcf4f07 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -18,8 +18,6 @@ */ #pragma once -#include - #include "LookupDataResult.h" #include "LookupService.h" #include "NamespaceName.h" @@ -41,10 +39,10 @@ class RetryableLookupService : public LookupService { : RetryableLookupService(std::forward(args)...) {} void close() override { - lookupCache_->clear(); - partitionLookupCache_->clear(); - namespaceLookupCache_->clear(); - getSchemaCache_->clear(); + lookupCache_->close(); + partitionLookupCache_->close(); + namespaceLookupCache_->close(); + getSchemaCache_->close(); } template @@ -89,7 +87,7 @@ class RetryableLookupService : public LookupService { RetryableLookupService(std::shared_ptr lookupService, TimeDuration timeout, ExecutorServiceProviderPtr executorProvider) - : lookupService_(lookupService), + : lookupService_(std::move(lookupService)), lookupCache_(RetryableOperationCache::create(executorProvider, timeout)), partitionLookupCache_( RetryableOperationCache::create(executorProvider, timeout)), diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index e42460dd..ef163316 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -58,6 +58,11 @@ class RetryableOperationCache : public std::enable_shared_from_this run(const std::string& key, std::function()>&& func) { std::unique_lock lock{mutex_}; + if (closed_) { + Promise promise; + promise.setFailed(ResultAlreadyClosed); + return promise.getFuture(); + } auto it = operations_.find(key); if (it == operations_.end()) { DeadlineTimerPtr timer; @@ -92,11 +97,12 @@ class RetryableOperationCache : public std::enable_shared_from_this lock{mutex_}; operations.swap(operations_); + closed_ = true; } // cancel() could trigger the listener to erase the key from operations, so we should use a swap way // to release the lock here @@ -110,6 +116,7 @@ class RetryableOperationCache : public std::enable_shared_from_this>> operations_; + bool closed_{false}; mutable std::mutex mutex_; DECLARE_LOG_OBJECT() diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index ff3a7e04..0a222a90 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -36,9 +37,11 @@ #include "lib/Future.h" #include "lib/HTTPLookupService.h" #include "lib/LogUtils.h" +#include "lib/LookupDataResult.h" #include "lib/RetryableLookupService.h" #include "lib/TimeUtils.h" #include "lib/Utils.h" +#include "pulsar/Result.h" DECLARE_LOG_OBJECT() @@ -500,3 +503,62 @@ TEST(LookupServiceTest, testRedirectionLimit) { } } } + +static std::atomic_bool firstTime{true}; + +class MockLookupService : public BinaryProtoLookupService { + public: + using BinaryProtoLookupService::BinaryProtoLookupService; + + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + bool expected = true; + if (firstTime.compare_exchange_strong(expected, false)) { + // Trigger the retry + LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally"); + Promise promise; + promise.setFailed(ResultRetryable); + return promise.getFuture(); + } + return BinaryProtoLookupService::getPartitionMetadataAsync(topicName); + } +}; + +TEST(LookupServiceTest, testAfterClientShutdown) { + auto client = std::make_shared("pulsar://localhost:6650", ClientConfiguration{}, + [](const std::string& serviceUrl, const ClientConfiguration&, + ConnectionPool& pool, const AuthenticationPtr&) { + return std::make_shared( + serviceUrl, pool, ClientConfiguration{}); + }); + std::promise promise; + client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{}, + [&promise](Result result, const Consumer&) { promise.set_value(result); }); + client->shutdown(); + EXPECT_EQ(ResultDisconnected, promise.get_future().get()); + + firstTime = true; + std::promise promise2; + client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{}, + [&promise2](Result result, const Consumer&) { promise2.set_value(result); }); + EXPECT_EQ(ResultAlreadyClosed, promise2.get_future().get()); +} + +TEST(LookupServiceTest, testRetryAfterDestroyed) { + auto executorProvider = std::make_shared(1); + ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); + + auto internalLookupService = + std::make_shared("pulsar://localhost:6650", pool, ClientConfiguration{}); + auto lookupService = + RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider); + + // Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the + // lookup service. + lookupService->close(); + std::atomic result{ResultUnknownError}; + lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed")) + .addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; }); + EXPECT_EQ(ResultAlreadyClosed, result.load()); + pool.close(); + executorProvider->close(); +} diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index 2a6948e3..81035f05 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -124,7 +124,7 @@ TEST_F(RetryableOperationCacheTest, testClear) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100})); } ASSERT_EQ(getSize(*cache), 10); - cache->clear(); + cache->close(); for (auto&& future : futures_) { int value; // All cancelled futures complete with ResultDisconnected and the default int value From f04b6162f73a480dd458a6372394a053bcb15a76 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 5 Nov 2025 20:57:08 +0800 Subject: [PATCH 2/5] remove unused includes --- tests/LookupServiceTest.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 0a222a90..6d5223f9 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -37,11 +37,9 @@ #include "lib/Future.h" #include "lib/HTTPLookupService.h" #include "lib/LogUtils.h" -#include "lib/LookupDataResult.h" #include "lib/RetryableLookupService.h" #include "lib/TimeUtils.h" #include "lib/Utils.h" -#include "pulsar/Result.h" DECLARE_LOG_OBJECT() From 3433821e9dfd977338919c87096fb80dbc2c9068 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 5 Nov 2025 21:06:29 +0800 Subject: [PATCH 3/5] Restore close in closeAsync --- lib/ClientImpl.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 49b02492..eec3b34a 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -674,6 +674,7 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { state_ = Closing; memoryLimitController_.close(); + lookupServicePtr_->close(); for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } From c593959dc5ab9fe7a9e275002bf5c37a2ab44227 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Nov 2025 10:51:22 +0800 Subject: [PATCH 4/5] Make close idempotent and improve tests --- lib/RetryableOperationCache.h | 3 +++ tests/LookupServiceTest.cc | 24 +++++++++++++----------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index ef163316..f2d390dc 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -101,6 +101,9 @@ class RetryableOperationCache : public std::enable_shared_from_this lock{mutex_}; + if (closed_) { + return; + } operations.swap(operations_); closed_ = true; } diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 6d5223f9..92aa8204 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -502,15 +501,13 @@ TEST(LookupServiceTest, testRedirectionLimit) { } } -static std::atomic_bool firstTime{true}; - class MockLookupService : public BinaryProtoLookupService { public: using BinaryProtoLookupService::BinaryProtoLookupService; Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { bool expected = true; - if (firstTime.compare_exchange_strong(expected, false)) { + if (firstTime_.compare_exchange_strong(expected, false)) { // Trigger the retry LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally"); Promise promise; @@ -519,6 +516,9 @@ class MockLookupService : public BinaryProtoLookupService { } return BinaryProtoLookupService::getPartitionMetadataAsync(topicName); } + + private: + std::atomic_bool firstTime_{true}; }; TEST(LookupServiceTest, testAfterClientShutdown) { @@ -531,14 +531,16 @@ TEST(LookupServiceTest, testAfterClientShutdown) { std::promise promise; client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{}, [&promise](Result result, const Consumer&) { promise.set_value(result); }); + // When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in + // MockLookupService. Verify shutdown will cancel it and return ResultDisconnected. client->shutdown(); EXPECT_EQ(ResultDisconnected, promise.get_future().get()); - firstTime = true; - std::promise promise2; + // A new subscribeAsync call will fail immediately in the current thread + Result result = ResultOk; client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{}, - [&promise2](Result result, const Consumer&) { promise2.set_value(result); }); - EXPECT_EQ(ResultAlreadyClosed, promise2.get_future().get()); + [&result](Result innerResult, const Consumer&) { result = innerResult; }); + EXPECT_EQ(ResultAlreadyClosed, result); } TEST(LookupServiceTest, testRetryAfterDestroyed) { @@ -551,12 +553,12 @@ TEST(LookupServiceTest, testRetryAfterDestroyed) { RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider); // Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the - // lookup service. + // lookup service. It's expected the request fails immediately with ResultAlreadyClosed. lookupService->close(); - std::atomic result{ResultUnknownError}; + Result result = ResultOk; lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed")) .addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; }); - EXPECT_EQ(ResultAlreadyClosed, result.load()); + EXPECT_EQ(ResultAlreadyClosed, result); pool.close(); executorProvider->close(); } From 75d65980a3c51caeede097e00a4fabc22c3c83af Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Nov 2025 10:56:29 +0800 Subject: [PATCH 5/5] Rename test --- tests/RetryableOperationCacheTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index 81035f05..c9b8a1d7 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -118,7 +118,7 @@ TEST_F(RetryableOperationCacheTest, testTimeout) { } } -TEST_F(RetryableOperationCacheTest, testClear) { +TEST_F(RetryableOperationCacheTest, testClose) { auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(30)); for (int i = 0; i < 10; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100}));