diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index aa75128b..eec3b34a 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -78,7 +78,25 @@ typedef std::unique_lock Lock; typedef std::vector StringList; +static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl, + const ClientConfiguration& clientConfiguration, + ConnectionPool& pool, const AuthenticationPtr& auth) { + if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) { + LOG_DEBUG("Using HTTP Lookup"); + return std::make_shared(serviceUrl, std::cref(clientConfiguration), + std::cref(auth)); + } else { + LOG_DEBUG("Using Binary Lookup"); + return std::make_shared(serviceUrl, std::ref(pool), + std::cref(clientConfiguration)); + } +} + ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration) + : ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {} + +ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, + LookupServiceFactory&& lookupServiceFactory) : mutex_(), state_(Open), clientConfiguration_(ClientConfiguration(clientConfiguration) @@ -95,7 +113,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& consumerIdGenerator_(0), closingError(ResultOk), useProxy_(false), - lookupCount_(0L) { + lookupCount_(0L), + lookupServiceFactory_(std::move(lookupServiceFactory)) { std::unique_ptr loggerFactory = clientConfiguration_.impl_->takeLogger(); if (loggerFactory) { LogUtils::setLoggerFactory(std::move(loggerFactory)); @@ -106,19 +125,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& ClientImpl::~ClientImpl() { shutdown(); } LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) { - LookupServicePtr underlyingLookupServicePtr; - if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) { - LOG_DEBUG("Using HTTP Lookup"); - underlyingLookupServicePtr = std::make_shared( - serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr())); - } else { - LOG_DEBUG("Using Binary Lookup"); - underlyingLookupServicePtr = std::make_shared( - serviceUrl, std::ref(pool_), std::cref(clientConfiguration_)); - } - auto lookupServicePtr = RetryableLookupService::create( - underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); + lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()), + clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); return lookupServicePtr; } @@ -767,6 +776,7 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } + lookupServicePtr_->close(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 000e4433..0b4d5969 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -54,6 +54,8 @@ using ClientConnectionPtr = std::shared_ptr; class LookupService; using LookupServicePtr = std::shared_ptr; +using LookupServiceFactory = std::function; class ProducerImplBase; using ProducerImplBaseWeakPtr = std::weak_ptr; @@ -70,6 +72,11 @@ std::string generateRandomName(); class ClientImpl : public std::enable_shared_from_this { public: ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); + + // only for tests + ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, + LookupServiceFactory&& lookupServiceFactory); + virtual ~ClientImpl(); /** @@ -205,6 +212,7 @@ class ClientImpl : public std::enable_shared_from_this { std::atomic closingError; std::atomic useProxy_; std::atomic lookupCount_; + LookupServiceFactory lookupServiceFactory_; friend class Client; }; diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h index dfba7eb0..cf4ff1fb 100644 --- a/lib/ResultUtils.h +++ b/lib/ResultUtils.h @@ -49,7 +49,8 @@ inline bool isResultRetryable(Result result) { ResultLookupError, ResultTooManyLookupRequestException, ResultProducerBlockedQuotaExceededException, - ResultProducerBlockedQuotaExceededError}; + ResultProducerBlockedQuotaExceededError, + ResultAlreadyClosed}; return fatalResults.find(static_cast(result)) == fatalResults.cend(); } diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 8bc40bf3..bbcf4f07 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -18,8 +18,6 @@ */ #pragma once -#include - #include "LookupDataResult.h" #include "LookupService.h" #include "NamespaceName.h" @@ -41,10 +39,10 @@ class RetryableLookupService : public LookupService { : RetryableLookupService(std::forward(args)...) {} void close() override { - lookupCache_->clear(); - partitionLookupCache_->clear(); - namespaceLookupCache_->clear(); - getSchemaCache_->clear(); + lookupCache_->close(); + partitionLookupCache_->close(); + namespaceLookupCache_->close(); + getSchemaCache_->close(); } template @@ -89,7 +87,7 @@ class RetryableLookupService : public LookupService { RetryableLookupService(std::shared_ptr lookupService, TimeDuration timeout, ExecutorServiceProviderPtr executorProvider) - : lookupService_(lookupService), + : lookupService_(std::move(lookupService)), lookupCache_(RetryableOperationCache::create(executorProvider, timeout)), partitionLookupCache_( RetryableOperationCache::create(executorProvider, timeout)), diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index e42460dd..f2d390dc 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -58,6 +58,11 @@ class RetryableOperationCache : public std::enable_shared_from_this run(const std::string& key, std::function()>&& func) { std::unique_lock lock{mutex_}; + if (closed_) { + Promise promise; + promise.setFailed(ResultAlreadyClosed); + return promise.getFuture(); + } auto it = operations_.find(key); if (it == operations_.end()) { DeadlineTimerPtr timer; @@ -92,11 +97,15 @@ class RetryableOperationCache : public std::enable_shared_from_this lock{mutex_}; + if (closed_) { + return; + } operations.swap(operations_); + closed_ = true; } // cancel() could trigger the listener to erase the key from operations, so we should use a swap way // to release the lock here @@ -110,6 +119,7 @@ class RetryableOperationCache : public std::enable_shared_from_this>> operations_; + bool closed_{false}; mutable std::mutex mutex_; DECLARE_LOG_OBJECT() diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index ff3a7e04..92aa8204 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -500,3 +500,65 @@ TEST(LookupServiceTest, testRedirectionLimit) { } } } + +class MockLookupService : public BinaryProtoLookupService { + public: + using BinaryProtoLookupService::BinaryProtoLookupService; + + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + bool expected = true; + if (firstTime_.compare_exchange_strong(expected, false)) { + // Trigger the retry + LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally"); + Promise promise; + promise.setFailed(ResultRetryable); + return promise.getFuture(); + } + return BinaryProtoLookupService::getPartitionMetadataAsync(topicName); + } + + private: + std::atomic_bool firstTime_{true}; +}; + +TEST(LookupServiceTest, testAfterClientShutdown) { + auto client = std::make_shared("pulsar://localhost:6650", ClientConfiguration{}, + [](const std::string& serviceUrl, const ClientConfiguration&, + ConnectionPool& pool, const AuthenticationPtr&) { + return std::make_shared( + serviceUrl, pool, ClientConfiguration{}); + }); + std::promise promise; + client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{}, + [&promise](Result result, const Consumer&) { promise.set_value(result); }); + // When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in + // MockLookupService. Verify shutdown will cancel it and return ResultDisconnected. + client->shutdown(); + EXPECT_EQ(ResultDisconnected, promise.get_future().get()); + + // A new subscribeAsync call will fail immediately in the current thread + Result result = ResultOk; + client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{}, + [&result](Result innerResult, const Consumer&) { result = innerResult; }); + EXPECT_EQ(ResultAlreadyClosed, result); +} + +TEST(LookupServiceTest, testRetryAfterDestroyed) { + auto executorProvider = std::make_shared(1); + ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); + + auto internalLookupService = + std::make_shared("pulsar://localhost:6650", pool, ClientConfiguration{}); + auto lookupService = + RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider); + + // Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the + // lookup service. It's expected the request fails immediately with ResultAlreadyClosed. + lookupService->close(); + Result result = ResultOk; + lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed")) + .addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; }); + EXPECT_EQ(ResultAlreadyClosed, result); + pool.close(); + executorProvider->close(); +} diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index 2a6948e3..c9b8a1d7 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -118,13 +118,13 @@ TEST_F(RetryableOperationCacheTest, testTimeout) { } } -TEST_F(RetryableOperationCacheTest, testClear) { +TEST_F(RetryableOperationCacheTest, testClose) { auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(30)); for (int i = 0; i < 10; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100})); } ASSERT_EQ(getSize(*cache), 10); - cache->clear(); + cache->close(); for (auto&& future : futures_) { int value; // All cancelled futures complete with ResultDisconnected and the default int value