From f2ddba82de7a1b9d35b699984b561026080bd566 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 10 Aug 2022 15:11:48 +0800 Subject: [PATCH] [feat][cpp] Support multiple brokers in service URL ### Motivation It's a catchup for https://github.com/apache/pulsar/pull/3249. Currently C++ client doesn't have strict validation on the service URL. If multiple brokers are specified, only the 1st broker will be used. ### Modifications - Add `ServiceURI` to support configuring multiple brokers in the service URL. See `ServiceURITest` for how to configure it. - Add `ServiceNameResolver` whose `resolveHost` method selects the next broker in a round robin way. - Since the broker's address for topic lookup is no longer the service URL itself, to handle the case when proxy is enabled, a `getBroker` method (like the same method in Java's `LookupService`), which returns the future of a pair of logical and physical addresses, is added to replace `lookupAsync`. - Apply `ServiceNameResolver` into `ClientImpl` and the `LookupService` implementations. - Rename `BinaryLookupServiceTest` to `LookupServiceTest` and add `testMultiAddresses` to test both `BinaryProtoLookupService` and `HTTPLookupService` that all available brokers will be accessed in a round robin way. ### TODO This is the 1st part to support multiple brokers. Even with this patch, if one of these hosts is not available, the topic lookup will fail immediately instead of trying other broker. Since this patch already includes many code changes, I will push another patch to fix it. --- pulsar-client-cpp/include/pulsar/Client.h | 2 + .../lib/BinaryProtoLookupService.cc | 138 ++++++++---------- .../lib/BinaryProtoLookupService.h | 29 ++-- pulsar-client-cpp/lib/ClientImpl.cc | 80 ++++------ pulsar-client-cpp/lib/ClientImpl.h | 7 +- pulsar-client-cpp/lib/ConnectionPool.h | 4 + pulsar-client-cpp/lib/HTTPLookupService.cc | 71 ++++----- pulsar-client-cpp/lib/HTTPLookupService.h | 11 +- pulsar-client-cpp/lib/LookupService.h | 23 ++- pulsar-client-cpp/lib/PulsarScheme.h | 82 +++++++++++ pulsar-client-cpp/lib/ServiceNameResolver.h | 57 ++++++++ pulsar-client-cpp/lib/ServiceURI.cc | 101 +++++++++++++ pulsar-client-cpp/lib/ServiceURI.h | 50 +++++++ pulsar-client-cpp/lib/TopicName.cc | 12 +- pulsar-client-cpp/lib/TopicName.h | 12 +- pulsar-client-cpp/python/pulsar_test.py | 6 +- pulsar-client-cpp/run-unit-tests.sh | 4 +- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 28 +--- ...kupServiceTest.cc => LookupServiceTest.cc} | 78 +++++++++- pulsar-client-cpp/tests/ServiceURITest.cc | 75 ++++++++++ 20 files changed, 631 insertions(+), 239 deletions(-) create mode 100644 pulsar-client-cpp/lib/PulsarScheme.h create mode 100644 pulsar-client-cpp/lib/ServiceNameResolver.h create mode 100644 pulsar-client-cpp/lib/ServiceURI.cc create mode 100644 pulsar-client-cpp/lib/ServiceURI.h rename pulsar-client-cpp/tests/{BinaryLookupServiceTest.cc => LookupServiceTest.cc} (61%) create mode 100644 pulsar-client-cpp/tests/ServiceURITest.cc diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 49dd011963b74..3edb03b5bad75 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -50,6 +50,7 @@ class PULSAR_PUBLIC Client { * configuration. * * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650) + * @throw std::invalid_argument if `serviceUrl` is invalid */ Client(const std::string& serviceUrl); @@ -60,6 +61,7 @@ class PULSAR_PUBLIC Client { * @param serviceUrl the Pulsar endpoint to use (eg: * http://brokerv2-pdev.messaging.corp.gq1.yahoo.com:4080 for Sandbox access) * @param clientConfiguration the client configuration to use + * @throw std::invalid_argument if `serviceUrl` is invalid */ Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index 716ff91c58a27..bff29237c7164 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -29,35 +29,61 @@ DECLARE_LOG_OBJECT() namespace pulsar { -/* - * @param lookupUrl service url to do lookup - * Constructor - */ -BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl) - : serviceUrl_(lookupUrl), cnxPool_(cnxPool) {} - -BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl, - const std::string& listenerName) - : serviceUrl_(lookupUrl), listenerName_(listenerName), cnxPool_(cnxPool) {} +auto BinaryProtoLookupService::getBroker(const TopicName& topicName) -> LookupResultFuture { + return findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString()); +} -/* - * @param topicName topic name to get broker for - * - * Looks up the owner broker for the given topic name - */ -Future BinaryProtoLookupService::lookupAsync(const std::string& topic) { - TopicNamePtr topicName = TopicName::get(topic); - if (!topicName) { - LOG_ERROR("Unable to parse topic - " << topic); - LookupDataResultPromisePtr promise = std::make_shared(); - promise->setFailed(ResultInvalidTopicName); - return promise->getFuture(); - } - std::string lookupName = topicName->toString(); - LookupDataResultPromisePtr promise = std::make_shared(); - Future future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); - future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false, - listenerName_, std::placeholders::_1, std::placeholders::_2, promise)); +auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative, + const std::string& topic) -> LookupResultFuture { + LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic); + auto promise = std::make_shared>(); + // NOTE: we can use move capture for topic since C++14 + cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address]( + Result result, + const ClientConnectionWeakPtr& weakCnx) { + if (result != ResultOk) { + promise->setFailed(result); + return; + } + auto cnx = weakCnx.lock(); + if (!cnx) { + LOG_ERROR("Connection to " << address << " is expired before lookup"); + promise->setFailed(ResultNotConnected); + return; + } + auto lookupPromise = std::make_shared(); + cnx->newTopicLookup(topic, false, listenerName_, newRequestId(), lookupPromise); + lookupPromise->getFuture().addListener([this, cnx, promise, topic, address]( + Result result, const LookupDataResultPtr& data) { + if (result != ResultOk || !data) { + LOG_ERROR("Lookup failed for " << topic << ", result " << result); + promise->setFailed(result); + return; + } + + const auto responseBrokerAddress = + (serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl()); + if (data->isRedirect()) { + LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress); + findBroker(responseBrokerAddress, data->isAuthoritative(), topic) + .addListener([promise](Result result, const LookupResult& value) { + if (result == ResultOk) { + promise->setValue(value); + } else { + promise->setFailed(result); + } + }); + } else { + LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl()); + if (data->shouldProxyThroughServiceUrl()) { + // logicalAddress is the proxy's address, we should still connect through proxy + promise->setValue({responseBrokerAddress, address}); + } else { + promise->setValue({responseBrokerAddress, responseBrokerAddress}); + } + } + }); + }); return promise->getFuture(); } @@ -73,55 +99,13 @@ Future BinaryProtoLookupService::getPartitionMetada return promise->getFuture(); } std::string lookupName = topicName->toString(); - Future future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); - future.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, - lookupName, std::placeholders::_1, std::placeholders::_2, promise)); + const auto address = serviceNameResolver_.resolveHost(); + cnxPool_.getConnectionAsync(address, address) + .addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, + lookupName, std::placeholders::_1, std::placeholders::_2, promise)); return promise->getFuture(); } -void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicName, bool authoritative, - const std::string& listenerName, Result result, - const ClientConnectionWeakPtr& clientCnx, - LookupDataResultPromisePtr promise) { - if (result != ResultOk) { - promise->setFailed(ResultConnectError); - return; - } - LookupDataResultPromisePtr lookupPromise = std::make_shared(); - ClientConnectionPtr conn = clientCnx.lock(); - uint64_t requestId = newRequestId(); - conn->newTopicLookup(topicName, authoritative, listenerName, requestId, lookupPromise); - lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handleLookup, this, topicName, - std::placeholders::_1, std::placeholders::_2, clientCnx, - promise)); -} - -void BinaryProtoLookupService::handleLookup(const std::string& topicName, Result result, - LookupDataResultPtr data, - const ClientConnectionWeakPtr& clientCnx, - LookupDataResultPromisePtr promise) { - if (data) { - if (data->isRedirect()) { - LOG_DEBUG("Lookup request is for " << topicName << " redirected to " << data->getBrokerUrl()); - - const std::string& logicalAddress = data->getBrokerUrl(); - const std::string& physicalAddress = - data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress; - Future future = - cnxPool_.getConnectionAsync(logicalAddress, physicalAddress); - future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, topicName, - data->isAuthoritative(), listenerName_, std::placeholders::_1, - std::placeholders::_2, promise)); - } else { - LOG_DEBUG("Lookup response for " << topicName << ", lookup-broker-url " << data->getBrokerUrl()); - promise->setValue(data); - } - } else { - LOG_DEBUG("Lookup failed for " << topicName << ", result " << result); - promise->setFailed(result); - } -} - void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result, const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise) { @@ -166,9 +150,9 @@ Future BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } std::string namespaceName = nsName->toString(); - Future future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); - future.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this, - namespaceName, std::placeholders::_1, std::placeholders::_2, promise)); + cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) + .addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this, + namespaceName, std::placeholders::_1, std::placeholders::_2, promise)); return promise->getFuture(); } diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h index bbf763259561b..d068c3d0e646e 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h @@ -26,40 +26,33 @@ #include "Backoff.h" #include #include +#include "ServiceNameResolver.h" namespace pulsar { class LookupDataResult; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: - /* - * constructor - */ - BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl); + BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool, + const std::string& listenerName) + : serviceNameResolver_(serviceNameResolver), cnxPool_(pool), listenerName_(listenerName) {} - BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl, - const std::string& listenerName); + LookupResultFuture getBroker(const TopicName& topicName) override; - Future lookupAsync(const std::string& topicName); + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; - Future getPartitionMetadataAsync(const TopicNamePtr& topicName); - - Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName); + Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; private: std::mutex mutex_; uint64_t requestIdGenerator_ = 0; - std::string serviceUrl_; - std::string listenerName_; + ServiceNameResolver& serviceNameResolver_; ConnectionPool& cnxPool_; + std::string listenerName_; - void sendTopicLookupRequest(const std::string& topicName, bool authoritative, - const std::string& listenerName, Result result, - const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise); - - void handleLookup(const std::string& topicName, Result result, LookupDataResultPtr data, - const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise); + // TODO: limit the redirect count, see https://github.com/apache/pulsar/pull/7096 + LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic); void sendPartitionMetadataLookupRequest(const std::string& topicName, Result result, const ClientConnectionWeakPtr& clientCnx, diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index d15e247347a6f..c68d6a951b45e 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -73,25 +73,12 @@ typedef std::unique_lock Lock; typedef std::vector StringList; -static const std::string https("https"); -static const std::string pulsarSsl("pulsar+ssl"); - -static const ClientConfiguration detectTls(const std::string& serviceUrl, - const ClientConfiguration& clientConfiguration) { - ClientConfiguration conf(clientConfiguration); - if (serviceUrl.compare(0, https.size(), https) == 0 || - serviceUrl.compare(0, pulsarSsl.size(), pulsarSsl) == 0) { - conf.setUseTls(true); - } - return conf; -} - ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections) : mutex_(), state_(Open), - serviceUrl_(serviceUrl), - clientConfiguration_(detectTls(serviceUrl, clientConfiguration)), + serviceNameResolver_(serviceUrl), + clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())), memoryLimitController_(clientConfiguration.getMemoryLimit()), ioExecutorProvider_(std::make_shared(clientConfiguration_.getIOThreads())), listenerExecutorProvider_( @@ -120,15 +107,16 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& } LogUtils::setLoggerFactory(std::move(loggerFactory)); - if (serviceUrl_.compare(0, 4, "http") == 0) { + if (serviceNameResolver_.useHttp()) { LOG_DEBUG("Using HTTP Lookup"); - lookupServicePtr_ = - std::make_shared(std::cref(serviceUrl_), std::cref(clientConfiguration_), - std::cref(clientConfiguration_.getAuthPtr())); + lookupServicePtr_ = std::make_shared(std::ref(serviceNameResolver_), + std::cref(clientConfiguration_), + std::cref(clientConfiguration_.getAuthPtr())); } else { LOG_DEBUG("Using Binary Lookup"); - lookupServicePtr_ = std::make_shared( - std::ref(pool_), std::ref(serviceUrl), std::cref(clientConfiguration_.getListenerName())); + lookupServicePtr_ = + std::make_shared(std::ref(serviceNameResolver_), std::ref(pool_), + std::cref(clientConfiguration_.getListenerName())); } } @@ -414,36 +402,32 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co Future ClientImpl::getConnection(const std::string& topic) { Promise promise; - lookupServicePtr_->lookupAsync(topic).addListener(std::bind(&ClientImpl::handleLookup, shared_from_this(), - std::placeholders::_1, std::placeholders::_2, - promise)); - return promise.getFuture(); -} -void ClientImpl::handleLookup(Result result, LookupDataResultPtr data, - Promise promise) { - if (data) { - const std::string& logicalAddress = - clientConfiguration_.isUseTls() ? data->getBrokerUrlTls() : data->getBrokerUrl(); - LOG_DEBUG("Getting connection to broker: " << logicalAddress); - const std::string& physicalAddress = - data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress; - Future future = - pool_.getConnectionAsync(logicalAddress, physicalAddress); - future.addListener(std::bind(&ClientImpl::handleNewConnection, shared_from_this(), - std::placeholders::_1, std::placeholders::_2, promise)); - } else { - promise.setFailed(result); + const auto topicNamePtr = TopicName::get(topic); + if (!topicNamePtr) { + LOG_ERROR("Unable to parse topic - " << topic); + promise.setFailed(ResultInvalidTopicName); + return promise.getFuture(); } -} -void ClientImpl::handleNewConnection(Result result, const ClientConnectionWeakPtr& conn, - Promise promise) { - if (result == ResultOk) { - promise.setValue(conn); - } else { - promise.setFailed(ResultConnectError); - } + auto self = shared_from_this(); + lookupServicePtr_->getBroker(*topicNamePtr) + .addListener([this, self, promise](Result result, const LookupService::LookupResult& data) { + if (result != ResultOk) { + promise.setFailed(result); + return; + } + pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress) + .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) { + if (result == ResultOk) { + promise.setValue(weakCnx); + } else { + promise.setFailed(result); + } + }); + }); + + return promise.getFuture(); } void ClientImpl::handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata, diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 847872abfd29b..1f046e2427c04 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -31,6 +31,7 @@ #include "ConsumerImplBase.h" #include #include +#include "ServiceNameResolver.h" namespace pulsar { @@ -69,10 +70,6 @@ class ClientImpl : public std::enable_shared_from_this { void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback); Future getConnection(const std::string& topic); - void handleLookup(Result result, LookupDataResultPtr data, - Promise promise); - void handleNewConnection(Result result, const ClientConnectionWeakPtr& conn, - Promise promise); void closeAsync(CloseCallback callback); void shutdown(); @@ -134,7 +131,7 @@ class ClientImpl : public std::enable_shared_from_this { std::mutex mutex_; State state_; - std::string serviceUrl_; + ServiceNameResolver serviceNameResolver_; ClientConfiguration clientConfiguration_; MemoryLimitController memoryLimitController_; diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h index 5032e809d55a1..21d439e732683 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.h +++ b/pulsar-client-cpp/lib/ConnectionPool.h @@ -63,6 +63,10 @@ class PULSAR_PUBLIC ConnectionPool { Future getConnectionAsync(const std::string& logicalAddress, const std::string& physicalAddress); + Future getConnectionAsync(const std::string& address) { + return getConnectionAsync(address, address); + } + private: ClientConfiguration clientConfiguration_; ExecutorServiceProviderPtr executorProvider_; diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 340f67c050e53..5b98663239201 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -48,47 +48,47 @@ HTTPLookupService::CurlInitializer::~CurlInitializer() { curl_global_cleanup(); HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer; -HTTPLookupService::HTTPLookupService(const std::string &lookupUrl, +HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver, const ClientConfiguration &clientConfiguration, const AuthenticationPtr &authData) : executorProvider_(std::make_shared(NUMBER_OF_LOOKUP_THREADS)), + serviceNameResolver_(serviceNameResolver), authenticationPtr_(authData), lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()), tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()), isUseTls_(clientConfiguration.isUseTls()), tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()), - tlsValidateHostname_(clientConfiguration.isValidateHostName()) { - if (lookupUrl[lookupUrl.length() - 1] == '/') { - // Remove trailing '/' - adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1); - } else { - adminUrl_ = lookupUrl; - } -} + tlsValidateHostname_(clientConfiguration.isValidateHostName()) {} -Future HTTPLookupService::lookupAsync(const std::string &topic) { - LookupPromise promise; - std::shared_ptr topicName = TopicName::get(topic); - if (!topicName) { - LOG_ERROR("Unable to parse topic - " << topic); - promise.setFailed(ResultInvalidTopicName); - return promise.getFuture(); - } +auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFuture { + LookupResultPromise promise; + const auto &url = serviceNameResolver_.resolveHost(); std::stringstream completeUrlStream; - if (topicName->isV2Topic()) { - completeUrlStream << adminUrl_ << V2_PATH << topicName->getDomain() << "/" << topicName->getProperty() - << '/' << topicName->getNamespacePortion() << '/' - << topicName->getEncodedLocalName(); + if (topicName.isV2Topic()) { + completeUrlStream << url << V2_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/' + << topicName.getNamespacePortion() << '/' << topicName.getEncodedLocalName(); } else { - completeUrlStream << adminUrl_ << V1_PATH << topicName->getDomain() << "/" << topicName->getProperty() - << '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' - << topicName->getEncodedLocalName(); + completeUrlStream << url << V1_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/' + << topicName.getCluster() << '/' << topicName.getNamespacePortion() << '/' + << topicName.getEncodedLocalName(); } - executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest, - shared_from_this(), promise, completeUrlStream.str(), - Lookup)); + const auto completeUrl = completeUrlStream.str(); + auto self = shared_from_this(); + executorProvider_->get()->postWork([this, self, promise, completeUrl] { + std::string responseData; + Result result = sendHTTPRequest(completeUrl, responseData); + + if (result != ResultOk) { + promise.setFailed(result); + } else { + const auto lookupDataResultPtr = parseLookupData(responseData); + const auto brokerAddress = (serviceNameResolver_.useTls() ? lookupDataResultPtr->getBrokerUrlTls() + : lookupDataResultPtr->getBrokerUrl()); + promise.setValue({brokerAddress, brokerAddress}); + } + }); return promise.getFuture(); } @@ -97,15 +97,15 @@ Future HTTPLookupService::getPartitionMetadataAsync LookupPromise promise; std::stringstream completeUrlStream; + const auto &url = serviceNameResolver_.resolveHost(); if (topicName->isV2Topic()) { - completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << topicName->getDomain() << '/' - << topicName->getProperty() << '/' << topicName->getNamespacePortion() << '/' + completeUrlStream << url << ADMIN_PATH_V2 << topicName->getDomain() << '/' << topicName->getProperty() + << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME; } else { - completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << topicName->getDomain() << '/' - << topicName->getProperty() << '/' << topicName->getCluster() << '/' - << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() - << '/' << PARTITION_METHOD_NAME; + completeUrlStream << url << ADMIN_PATH_V1 << topicName->getDomain() << '/' << topicName->getProperty() + << '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' + << topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME; } completeUrlStream << "?checkAllowAutoCreation=true"; @@ -120,11 +120,12 @@ Future HTTPLookupService::getTopicsOfNamespaceAsync( NamespaceTopicsPromise promise; std::stringstream completeUrlStream; + const auto &url = serviceNameResolver_.resolveHost(); if (nsName->isV2()) { - completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/' + completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/' << "topics"; } else { - completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/' + completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/' << "destinations"; } diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h b/pulsar-client-cpp/lib/HTTPLookupService.h index 3d0d39ee90a97..f401e879a5b3d 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.h +++ b/pulsar-client-cpp/lib/HTTPLookupService.h @@ -23,6 +23,7 @@ #include #include #include +#include namespace pulsar { class HTTPLookupService : public LookupService, public std::enable_shared_from_this { @@ -42,7 +43,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t typedef Promise LookupPromise; ExecutorServiceProviderPtr executorProvider_; - std::string adminUrl_; + ServiceNameResolver& serviceNameResolver_; AuthenticationPtr authenticationPtr_; int lookupTimeoutInSeconds_; std::string tlsTrustCertsFilePath_; @@ -60,13 +61,13 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Result sendHTTPRequest(std::string completeUrl, std::string& responseData); public: - HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&); + HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&); - Future lookupAsync(const std::string&); + LookupResultFuture getBroker(const TopicName& topicName) override; - Future getPartitionMetadataAsync(const TopicNamePtr&); + Future getPartitionMetadataAsync(const TopicNamePtr&) override; - Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName); + Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h index 2fdb51a2946ff..50f2d84f87659 100644 --- a/pulsar-client-cpp/lib/LookupService.h +++ b/pulsar-client-cpp/lib/LookupService.h @@ -25,6 +25,7 @@ #include #include +#include #include namespace pulsar { @@ -34,12 +35,26 @@ typedef std::shared_ptr> NamespaceTopicsProm class LookupService { public: - /* - * @param topicName - topic name + struct LookupResult { + std::string logicalAddress; + std::string physicalAddress; + + friend std::ostream& operator<<(std::ostream& os, const LookupResult& lookupResult) { + return os << "logical address: " << lookupResult.logicalAddress + << ", physical address: " << lookupResult.physicalAddress; + } + }; + using LookupResultFuture = Future; + using LookupResultPromise = Promise; + + /** + * Call broker lookup-api to get broker which serves namespace bundle that contains the given topic. * - * Looks up the owner broker for the given topic name + * @param topicName the topic name + * @return a pair of addresses, representing the logical and physical addresses of the broker that serves + * the topic */ - virtual Future lookupAsync(const std::string& topicName) = 0; + virtual LookupResultFuture getBroker(const TopicName& topicName) = 0; /* * @param topicName - pointer to topic name diff --git a/pulsar-client-cpp/lib/PulsarScheme.h b/pulsar-client-cpp/lib/PulsarScheme.h new file mode 100644 index 0000000000000..e292687275fbb --- /dev/null +++ b/pulsar-client-cpp/lib/PulsarScheme.h @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +namespace pulsar { + +enum PulsarScheme +{ + PULSAR, + PULSAR_SSL, + HTTP, + HTTPS +}; + +namespace scheme { + +inline PulsarScheme toScheme(const std::string& scheme) { + if (scheme == "pulsar") { + return PulsarScheme::PULSAR; + } else if (scheme == "pulsar+ssl") { + return PulsarScheme::PULSAR_SSL; + } else if (scheme == "http") { + return PulsarScheme::HTTP; + } else if (scheme == "https") { + return PulsarScheme::HTTPS; + } else { + throw std::invalid_argument("Invalid scheme: " + scheme); + } +} + +inline const char* getSchemeString(PulsarScheme scheme) { + switch (scheme) { + case PulsarScheme::PULSAR: + return "pulsar://"; + case PulsarScheme::PULSAR_SSL: + return "pulsar+ssl://"; + case PulsarScheme::HTTP: + return "http://"; + case PulsarScheme::HTTPS: + return "https://"; + default: + return "unknown://"; + } +} + +inline short getDefaultPort(PulsarScheme scheme) { + switch (scheme) { + case PulsarScheme::PULSAR: + return 6650; + case PulsarScheme::PULSAR_SSL: + return 6651; + case PulsarScheme::HTTP: + return 8080; + case PulsarScheme::HTTPS: + return 8081; + default: + throw std::invalid_argument("Unexpected scheme: " + std::to_string(scheme)); + } +} + +} // namespace scheme + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/ServiceNameResolver.h b/pulsar-client-cpp/lib/ServiceNameResolver.h new file mode 100644 index 0000000000000..60351d8037fa3 --- /dev/null +++ b/pulsar-client-cpp/lib/ServiceNameResolver.h @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include "ServiceURI.h" + +namespace pulsar { + +class ServiceNameResolver { + public: + ServiceNameResolver(const std::string& uriString) + : serviceUri_(uriString), numAddresses_(serviceUri_.getServiceHosts().size()) { + assert(numAddresses_ > 0); // the validation has been done in ServiceURI + } + + ServiceNameResolver(const ServiceNameResolver&) = delete; + ServiceNameResolver& operator=(const ServiceNameResolver&) = delete; + + bool useTls() const noexcept { + return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL || + serviceUri_.getScheme() == PulsarScheme::HTTPS; + } + + bool useHttp() const noexcept { + return serviceUri_.getScheme() == PulsarScheme::HTTP || + serviceUri_.getScheme() == PulsarScheme::HTTPS; + } + + const std::string& resolveHost() { + return serviceUri_.getServiceHosts()[(numAddresses_ == 1) ? 0 : (index_++ % numAddresses_)]; + } + + private: + const ServiceURI serviceUri_; + const size_t numAddresses_; + std::atomic_size_t index_{0}; +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/ServiceURI.cc b/pulsar-client-cpp/lib/ServiceURI.cc new file mode 100644 index 0000000000000..ec515b2444a39 --- /dev/null +++ b/pulsar-client-cpp/lib/ServiceURI.cc @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "ServiceURI.h" +#include + +namespace pulsar { + +static void processAddress(std::string& address, PulsarScheme scheme) { + const auto posOfSlash = address.find('/'); + if (posOfSlash != std::string::npos) { + // ignore the path part + address.erase(posOfSlash); + } + auto fail = [&address] { throw std::invalid_argument("invalid hostname: " + address); }; + + const auto posOfColon = address.find(':'); + if (posOfColon != std::string::npos) { + if (address.find(':', posOfColon + 1) != std::string::npos) { + fail(); + } + try { + const auto port = std::stoi(address.substr(posOfColon + 1)); + if (port < 0 || port > 65535) { + throw std::invalid_argument(""); + } + } catch (const std::invalid_argument& ignored) { + fail(); + } + } else { + address = address + ":" + std::to_string(scheme::getDefaultPort(scheme)); + } + if (!address.empty()) { + address = scheme::getSchemeString(scheme) + address; + } +} + +auto ServiceURI::parse(const std::string& uriString) -> DataType { + size_t pos = uriString.find("://"); + if (pos == std::string::npos) { + throw std::invalid_argument("The scheme part is missing: " + uriString); + } + if (pos == 0) { + throw std::invalid_argument("Expected scheme name at index 0: " + uriString); + } + const auto scheme = scheme::toScheme(uriString.substr(0, pos)); + + pos += 3; // now it points to the end of "://" + if (pos < uriString.size() && uriString[pos] == '/') { + throw std::invalid_argument("authority component is missing in service uri: " + uriString); + } + + std::vector addresses; + while (pos < uriString.size()) { + const size_t endPos = uriString.find(',', pos); + if (endPos == std::string::npos) { + addresses.emplace_back(uriString.substr(pos, endPos - pos)); + break; + } + addresses.emplace_back(uriString.substr(pos, endPos - pos)); + pos = endPos + 1; + } + + bool hasEmptyAddress = false; + for (auto& address : addresses) { + processAddress(address, scheme); + if (address.empty()) { + hasEmptyAddress = true; + } + } + if (hasEmptyAddress) { + auto originalAddresses = addresses; + addresses.clear(); + for (const auto& address : originalAddresses) { + if (!address.empty()) { + addresses.emplace_back(address); + } + } + } + if (addresses.empty()) { + throw std::invalid_argument("No service url is provided yet"); + } + return std::make_pair(scheme, addresses); +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/ServiceURI.h b/pulsar-client-cpp/lib/ServiceURI.h new file mode 100644 index 0000000000000..4f459d987f600 --- /dev/null +++ b/pulsar-client-cpp/lib/ServiceURI.h @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include "PulsarScheme.h" + +namespace pulsar { + +class ServiceURI { + public: + /** + * @param uriString the URL string that is used to create a pulsar::Client object + * @throws std::invalid_argument if `uriString` is invalid + */ + ServiceURI(const std::string& uriString) : data_(parse(uriString)) {} + + PulsarScheme getScheme() const noexcept { return data_.first; } + + const std::vector& getServiceHosts() const noexcept { return data_.second; } + + private: + // The 2 elements of the pair are: + // 1. The Scheme of the lookup protocol + // 2. The available addresses, each item is like "pulsar://localhost:6650" + using DataType = std::pair>; + const DataType data_; + + static DataType parse(const std::string& uriString); +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc index 2e6232cf001de..70b7b7e507242 100644 --- a/pulsar-client-cpp/lib/TopicName.cc +++ b/pulsar-client-cpp/lib/TopicName.cc @@ -150,19 +150,19 @@ std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) { return nameAfterEncoding; } -bool TopicName::isV2Topic() { return isV2Topic_; } +bool TopicName::isV2Topic() const { return isV2Topic_; } -std::string TopicName::getDomain() { return domain_; } +std::string TopicName::getDomain() const { return domain_; } -std::string TopicName::getProperty() { return property_; } +std::string TopicName::getProperty() const { return property_; } -std::string TopicName::getCluster() { return cluster_; } +std::string TopicName::getCluster() const { return cluster_; } -std::string TopicName::getNamespacePortion() { return namespacePortion_; } +std::string TopicName::getNamespacePortion() const { return namespacePortion_; } std::string TopicName::getLocalName() { return localName_; } -std::string TopicName::getEncodedLocalName() { return getEncodedName(localName_); } +std::string TopicName::getEncodedLocalName() const { return getEncodedName(localName_); } bool TopicName::operator==(const TopicName& other) { return (this->topicName_.compare(other.topicName_) == 0); diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h index 1d5deab553dca..d8620ea1fee76 100644 --- a/pulsar-client-cpp/lib/TopicName.h +++ b/pulsar-client-cpp/lib/TopicName.h @@ -47,14 +47,14 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId { int partition_ = -1; public: - bool isV2Topic(); + bool isV2Topic() const; std::string getLookupName(); - std::string getDomain(); - std::string getProperty(); - std::string getCluster(); - std::string getNamespacePortion(); + std::string getDomain() const; + std::string getProperty() const; + std::string getCluster() const; + std::string getNamespacePortion() const; std::string getLocalName(); - std::string getEncodedLocalName(); + std::string getEncodedLocalName() const; std::string toString() const; bool isPersistent() const; NamespaceNamePtr getNamespaceName(); diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 127ecc4247cca..43c7f4d3f3fad 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -118,10 +118,8 @@ def test_consumer_config(self): self.assertEqual(conf.replicate_subscription_state_enabled(), True) def test_connect_error(self): - with self.assertRaises(pulsar.ConnectError): - client = Client("fakeServiceUrl") - client.create_producer("connect-error-topic") - client.close() + with self.assertRaises(ValueError): + Client("fakeServiceUrl") def test_exception_inheritance(self): assert issubclass(pulsar.ConnectError, pulsar.PulsarException) diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh index cd5e28814e6b0..d4e7ec7284d67 100755 --- a/pulsar-client-cpp/run-unit-tests.sh +++ b/pulsar-client-cpp/run-unit-tests.sh @@ -47,7 +47,9 @@ if [ -f /gtest-parallel/gtest-parallel ]; then fi python3 /gtest-parallel/gtest-parallel $tests --dump_json_test_results=/tmp/gtest_parallel_results.json \ --workers=$gtest_workers --retry_failed=$RETRY_FAILED -d /tmp \ - ./main + ./main --gtest_filter='-CustomLoggerTest*' + # The customized logger might affect other tests + ./main --gtest_filter='CustomLoggerTest*' RES=$? else ./main diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index da5c60952dd18..6bb31d0490d6c 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -496,28 +497,11 @@ TEST(BasicEndToEndTest, testSubscribeCloseUnsubscribeSherpaScenario) { } TEST(BasicEndToEndTest, testInvalidUrlPassed) { - Client client("localhost:4080"); - std::string topicName = "testInvalidUrlPassed"; - std::string subName = "test-sub"; - Producer producer; - Result result = client.createProducer(topicName, producer); - ASSERT_EQ(ResultConnectError, result); - - Client client1("test://localhost"); - result = client1.createProducer(topicName, producer); - ASSERT_EQ(ResultConnectError, result); - - Client client2("test://:4080"); - result = client2.createProducer(topicName, producer); - ASSERT_EQ(ResultConnectError, result); - - Client client3(""); - result = client3.createProducer(topicName, producer); - ASSERT_EQ(ResultConnectError, result); - - Client client4("Dream of the day when this will be a valid URL"); - result = client4.createProducer(topicName, producer); - ASSERT_EQ(ResultConnectError, result); + EXPECT_THROW({ Client{"localhost:4080"}; }, std::invalid_argument); + EXPECT_THROW({ Client{"test://localhost"}; }, std::invalid_argument); + EXPECT_THROW({ Client{"test://:4080"}; }, std::invalid_argument); + EXPECT_THROW({ Client{""}; }, std::invalid_argument); + EXPECT_THROW({ Client{"Dream of the day when this will be a valid URL"}; }, std::invalid_argument); } void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::string topicName) { diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/LookupServiceTest.cc similarity index 61% rename from pulsar-client-cpp/tests/BinaryLookupServiceTest.cc rename to pulsar-client-cpp/tests/LookupServiceTest.cc index b880df3b29856..14c5695a82866 100644 --- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc +++ b/pulsar-client-cpp/tests/LookupServiceTest.cc @@ -17,6 +17,7 @@ * under the License. */ #include +#include #include #include @@ -26,17 +27,23 @@ #include "HttpHelper.h" #include #include +#include "LogUtils.h" + +#include using namespace pulsar; -TEST(BinaryLookupServiceTest, basicLookup) { +DECLARE_LOG_OBJECT() + +TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared(1); AuthenticationPtr authData = AuthFactory::Disabled(); std::string url = "pulsar://localhost:6650"; ClientConfiguration conf; ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); ConnectionPool pool_(conf, ioExecutorProvider_, authData, true); - BinaryProtoLookupService lookupService(pool_, url); + ServiceNameResolver serviceNameResolver(url); + BinaryProtoLookupService lookupService(serviceNameResolver, pool_, ""); TopicNamePtr topicName = TopicName::get("topic"); @@ -46,15 +53,17 @@ TEST(BinaryLookupServiceTest, basicLookup) { ASSERT_TRUE(lookupData != NULL); ASSERT_EQ(0, lookupData->getPartitions()); - Future future = lookupService.lookupAsync("topic"); - result = future.get(lookupData); + const auto topicNamePtr = TopicName::get("topic"); + auto future = lookupService.getBroker(*topicNamePtr); + LookupService::LookupResult lookupResult; + result = future.get(lookupResult); ASSERT_EQ(ResultOk, result); - ASSERT_TRUE(lookupData != NULL); - ASSERT_EQ(url, lookupData->getBrokerUrl()); + ASSERT_EQ(url, lookupResult.logicalAddress); + ASSERT_EQ(url, lookupResult.physicalAddress); } -TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { +TEST(LookupServiceTest, basicGetNamespaceTopics) { std::string url = "pulsar://localhost:6650"; std::string adminUrl = "http://localhost:8080/"; Result result; @@ -98,7 +107,8 @@ TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { ClientConfiguration conf; ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); ConnectionPool pool_(conf, ioExecutorProvider_, authData, true); - BinaryProtoLookupService lookupService(pool_, url); + ServiceNameResolver serviceNameResolver(url); + BinaryProtoLookupService lookupService(serviceNameResolver, pool_, ""); TopicNamePtr topicName = TopicName::get(topicName1); NamespaceNamePtr nsName = topicName->getNamespaceName(); @@ -117,3 +127,55 @@ TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { client.shutdown(); } + +static void testMultiAddresses(LookupService& lookupService) { + std::vector results; + constexpr int numRequests = 6; + + auto verifySuccessCount = [&results] { + // Only half of them succeeded + ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2); + ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultConnectError), numRequests / 2); + }; + + for (int i = 0; i < numRequests; i++) { + const auto topicNamePtr = TopicName::get("topic"); + LookupService::LookupResult lookupResult; + const auto result = lookupService.getBroker(*topicNamePtr).get(lookupResult); + LOG_INFO("getBroker [" << i << "] " << result << ", " << lookupResult); + results.emplace_back(result); + } + verifySuccessCount(); + + results.clear(); + for (int i = 0; i < numRequests; i++) { + LookupDataResultPtr data; + const auto result = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data); + LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result); + results.emplace_back(result); + } + verifySuccessCount(); + + results.clear(); + for (int i = 0; i < numRequests; i++) { + NamespaceTopicsPtr data; + const auto result = + lookupService.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName()).get(data); + LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result); + results.emplace_back(result); + } + verifySuccessCount(); +} + +TEST(LookupServiceTest, testMultiAddresses) { + ConnectionPool pool({}, std::make_shared(1), AuthFactory::Disabled(), true); + ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999"); + BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, ""); + testMultiAddresses(binaryLookupService); + + // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test + ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999"); + auto httpLookupServicePtr = std::make_shared( + std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled()); + testMultiAddresses(*httpLookupServicePtr); +} diff --git a/pulsar-client-cpp/tests/ServiceURITest.cc b/pulsar-client-cpp/tests/ServiceURITest.cc new file mode 100644 index 0000000000000..9d4c88fc4972e --- /dev/null +++ b/pulsar-client-cpp/tests/ServiceURITest.cc @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include "lib/ServiceURI.h" + +using namespace pulsar; + +static void verifyServiceURIFailure(const std::string& uriString, const std::string& errorMsg) { + try { + ServiceURI uri{uriString}; + std::cerr << uriString << " should be invalid" << std::endl; + FAIL(); + } catch (const std::invalid_argument& e) { + EXPECT_EQ(errorMsg, e.what()); + } +} + +static void verifyServiceURI(const std::string& uriString, PulsarScheme expectedScheme, + const std::vector& expectedServiceHosts) { + ServiceURI uri{uriString}; + EXPECT_EQ(uri.getScheme(), expectedScheme); + EXPECT_EQ(uri.getServiceHosts(), expectedServiceHosts); +} + +TEST(ServiceURITest, testInvalidServiceUris) { + verifyServiceURIFailure("localhost:6650", "The scheme part is missing: localhost:6650"); + verifyServiceURIFailure("unknown://localhost:6650", "Invalid scheme: unknown"); + verifyServiceURIFailure("://localhost:6650", "Expected scheme name at index 0: ://localhost:6650"); + verifyServiceURIFailure("pulsar:///", "authority component is missing in service uri: pulsar:///"); + verifyServiceURIFailure("pulsar://localhost:6650:6651", "invalid hostname: localhost:6650:6651"); + verifyServiceURIFailure("pulsar://localhost:xyz/", "invalid hostname: localhost:xyz"); + verifyServiceURIFailure("pulsar://localhost:-6650/", "invalid hostname: localhost:-6650"); +} + +TEST(ServiceURITest, testPathIgnored) { + verifyServiceURI("pulsar://localhost:6650", PulsarScheme::PULSAR, {"pulsar://localhost:6650"}); + verifyServiceURI("pulsar://localhost:6650/", PulsarScheme::PULSAR, {"pulsar://localhost:6650"}); +} + +TEST(ServiceURITest, testMultipleHostsComma) { + verifyServiceURI("pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace", PulsarScheme::PULSAR, + {"pulsar://host1:6650", "pulsar://host2:6650", "pulsar://host3:6650"}); +} + +TEST(ServiceURITest, testMultipleHostsWithoutPulsarPorts) { + verifyServiceURI("pulsar://host1,host2,host3/path/to/namespace", PulsarScheme::PULSAR, + {"pulsar://host1:6650", "pulsar://host2:6650", "pulsar://host3:6650"}); + verifyServiceURI("pulsar+ssl://host1,host2,host3/path/to/namespace", PulsarScheme::PULSAR_SSL, + {"pulsar+ssl://host1:6651", "pulsar+ssl://host2:6651", "pulsar+ssl://host3:6651"}); + verifyServiceURI("http://host1,host2,host3/path/to/namespace", PulsarScheme::HTTP, + {"http://host1:8080", "http://host2:8080", "http://host3:8080"}); + verifyServiceURI("https://host1,host2,host3/path/to/namespace", PulsarScheme::HTTPS, + {"https://host1:8081", "https://host2:8081", "https://host3:8081"}); +} + +TEST(ServiceURITest, testMultipleHostsMixed) { + verifyServiceURI("pulsar://host1:6640,host2,host3:6660/path/to/namespace", PulsarScheme::PULSAR, + {"pulsar://host1:6640", "pulsar://host2:6650", "pulsar://host3:6660"}); +}