Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PULSAR_PUBLIC Client {
* configuration.
*
* @param serviceUrl the Pulsar endpoint to use (eg: pulsar://localhost:6650)
* @throw std::invalid_argument if `serviceUrl` is invalid
*/
Client(const std::string& serviceUrl);

Expand All @@ -60,6 +61,7 @@ class PULSAR_PUBLIC Client {
* @param serviceUrl the Pulsar endpoint to use (eg:
* http://brokerv2-pdev.messaging.corp.gq1.yahoo.com:4080 for Sandbox access)
* @param clientConfiguration the client configuration to use
* @throw std::invalid_argument if `serviceUrl` is invalid
*/
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);

Expand Down
138 changes: 61 additions & 77 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,61 @@ DECLARE_LOG_OBJECT()

namespace pulsar {

/*
* @param lookupUrl service url to do lookup
* Constructor
*/
BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl)
: serviceUrl_(lookupUrl), cnxPool_(cnxPool) {}

BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl,
const std::string& listenerName)
: serviceUrl_(lookupUrl), listenerName_(listenerName), cnxPool_(cnxPool) {}
auto BinaryProtoLookupService::getBroker(const TopicName& topicName) -> LookupResultFuture {
return findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString());
}

/*
* @param topicName topic name to get broker for
*
* Looks up the owner broker for the given topic name
*/
Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(const std::string& topic) {
TopicNamePtr topicName = TopicName::get(topic);
if (!topicName) {
LOG_ERROR("Unable to parse topic - " << topic);
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
std::string lookupName = topicName->toString();
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
listenerName_, std::placeholders::_1, std::placeholders::_2, promise));
auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
const std::string& topic) -> LookupResultFuture {
LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic);
auto promise = std::make_shared<Promise<Result, LookupResult>>();
// NOTE: we can use move capture for topic since C++14
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address](
Result result,
const ClientConnectionWeakPtr& weakCnx) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
auto cnx = weakCnx.lock();
if (!cnx) {
LOG_ERROR("Connection to " << address << " is expired before lookup");
promise->setFailed(ResultNotConnected);
return;
}
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
cnx->newTopicLookup(topic, false, listenerName_, newRequestId(), lookupPromise);
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address](
Result result, const LookupDataResultPtr& data) {
if (result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
promise->setFailed(result);
return;
}

const auto responseBrokerAddress =
(serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl());
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
findBroker(responseBrokerAddress, data->isAuthoritative(), topic)
.addListener([promise](Result result, const LookupResult& value) {
if (result == ResultOk) {
promise->setValue(value);
} else {
promise->setFailed(result);
}
});
} else {
LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl());
if (data->shouldProxyThroughServiceUrl()) {
// logicalAddress is the proxy's address, we should still connect through proxy
promise->setValue({responseBrokerAddress, address});
} else {
promise->setValue({responseBrokerAddress, responseBrokerAddress});
}
}
});
});
return promise->getFuture();
}

Expand All @@ -73,55 +99,13 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
return promise->getFuture();
}
std::string lookupName = topicName->toString();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
lookupName, std::placeholders::_1, std::placeholders::_2, promise));
const auto address = serviceNameResolver_.resolveHost();
cnxPool_.getConnectionAsync(address, address)
.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
lookupName, std::placeholders::_1, std::placeholders::_2, promise));
return promise->getFuture();
}

void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& topicName, bool authoritative,
const std::string& listenerName, Result result,
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultConnectError);
return;
}
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
conn->newTopicLookup(topicName, authoritative, listenerName, requestId, lookupPromise);
lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handleLookup, this, topicName,
std::placeholders::_1, std::placeholders::_2, clientCnx,
promise));
}

void BinaryProtoLookupService::handleLookup(const std::string& topicName, Result result,
LookupDataResultPtr data,
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (data) {
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << topicName << " redirected to " << data->getBrokerUrl());

const std::string& logicalAddress = data->getBrokerUrl();
const std::string& physicalAddress =
data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
Future<Result, ClientConnectionWeakPtr> future =
cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(std::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, topicName,
data->isAuthoritative(), listenerName_, std::placeholders::_1,
std::placeholders::_2, promise));
} else {
LOG_DEBUG("Lookup response for " << topicName << ", lookup-broker-url " << data->getBrokerUrl());
promise->setValue(data);
}
} else {
LOG_DEBUG("Lookup failed for " << topicName << ", result " << result);
promise->setFailed(result);
}
}

void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
Expand Down Expand Up @@ -166,9 +150,9 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}
std::string namespaceName = nsName->toString();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
return promise->getFuture();
}

Expand Down
29 changes: 11 additions & 18 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,33 @@
#include "Backoff.h"
#include <lib/LookupService.h>
#include <mutex>
#include "ServiceNameResolver.h"

namespace pulsar {
class LookupDataResult;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
/*
* constructor
*/
BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl);
BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
const std::string& listenerName)
: serviceNameResolver_(serviceNameResolver), cnxPool_(pool), listenerName_(listenerName) {}

BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& serviceUrl,
const std::string& listenerName);
LookupResultFuture getBroker(const TopicName& topicName) override;

Future<Result, LookupDataResultPtr> lookupAsync(const std::string& topicName);
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName);

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName);
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;

private:
std::mutex mutex_;
uint64_t requestIdGenerator_ = 0;

std::string serviceUrl_;
std::string listenerName_;
ServiceNameResolver& serviceNameResolver_;
ConnectionPool& cnxPool_;
std::string listenerName_;

void sendTopicLookupRequest(const std::string& topicName, bool authoritative,
const std::string& listenerName, Result result,
const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise);

void handleLookup(const std::string& topicName, Result result, LookupDataResultPtr data,
const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise);
// TODO: limit the redirect count, see https://github.com/apache/pulsar/pull/7096
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic);

void sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
const ClientConnectionWeakPtr& clientCnx,
Expand Down
80 changes: 32 additions & 48 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,12 @@ typedef std::unique_lock<std::mutex> Lock;

typedef std::vector<std::string> StringList;

static const std::string https("https");
static const std::string pulsarSsl("pulsar+ssl");

static const ClientConfiguration detectTls(const std::string& serviceUrl,
const ClientConfiguration& clientConfiguration) {
ClientConfiguration conf(clientConfiguration);
if (serviceUrl.compare(0, https.size(), https) == 0 ||
serviceUrl.compare(0, pulsarSsl.size(), pulsarSsl) == 0) {
conf.setUseTls(true);
}
return conf;
}

ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections)
: mutex_(),
state_(Open),
serviceUrl_(serviceUrl),
clientConfiguration_(detectTls(serviceUrl, clientConfiguration)),
serviceNameResolver_(serviceUrl),
clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
memoryLimitController_(clientConfiguration.getMemoryLimit()),
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
Expand Down Expand Up @@ -120,15 +107,16 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
}
LogUtils::setLoggerFactory(std::move(loggerFactory));

if (serviceUrl_.compare(0, 4, "http") == 0) {
if (serviceNameResolver_.useHttp()) {
LOG_DEBUG("Using HTTP Lookup");
lookupServicePtr_ =
std::make_shared<HTTPLookupService>(std::cref(serviceUrl_), std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
lookupServicePtr_ = std::make_shared<HTTPLookupService>(std::ref(serviceNameResolver_),
std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(
std::ref(pool_), std::ref(serviceUrl), std::cref(clientConfiguration_.getListenerName()));
lookupServicePtr_ =
std::make_shared<BinaryProtoLookupService>(std::ref(serviceNameResolver_), std::ref(pool_),
std::cref(clientConfiguration_.getListenerName()));
}
}

Expand Down Expand Up @@ -414,36 +402,32 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co

Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
Promise<Result, ClientConnectionWeakPtr> promise;
lookupServicePtr_->lookupAsync(topic).addListener(std::bind(&ClientImpl::handleLookup, shared_from_this(),
std::placeholders::_1, std::placeholders::_2,
promise));
return promise.getFuture();
}

void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
Promise<Result, ClientConnectionWeakPtr> promise) {
if (data) {
const std::string& logicalAddress =
clientConfiguration_.isUseTls() ? data->getBrokerUrlTls() : data->getBrokerUrl();
LOG_DEBUG("Getting connection to broker: " << logicalAddress);
const std::string& physicalAddress =
data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
Future<Result, ClientConnectionWeakPtr> future =
pool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(std::bind(&ClientImpl::handleNewConnection, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, promise));
} else {
promise.setFailed(result);
const auto topicNamePtr = TopicName::get(topic);
if (!topicNamePtr) {
LOG_ERROR("Unable to parse topic - " << topic);
promise.setFailed(ResultInvalidTopicName);
return promise.getFuture();
}
}

void ClientImpl::handleNewConnection(Result result, const ClientConnectionWeakPtr& conn,
Promise<Result, ClientConnectionWeakPtr> promise) {
if (result == ResultOk) {
promise.setValue(conn);
} else {
promise.setFailed(ResultConnectError);
}
auto self = shared_from_this();
lookupServicePtr_->getBroker(*topicNamePtr)
.addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
if (result != ResultOk) {
promise.setFailed(result);
return;
}
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
if (result == ResultOk) {
promise.setValue(weakCnx);
} else {
promise.setFailed(result);
}
});
});

return promise.getFuture();
}

void ClientImpl::handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
Expand Down
7 changes: 2 additions & 5 deletions pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "ConsumerImplBase.h"
#include <atomic>
#include <vector>
#include "ServiceNameResolver.h"

namespace pulsar {

Expand Down Expand Up @@ -69,10 +70,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

Future<Result, ClientConnectionWeakPtr> getConnection(const std::string& topic);
void handleLookup(Result result, LookupDataResultPtr data,
Promise<Result, ClientConnectionWeakPtr> promise);
void handleNewConnection(Result result, const ClientConnectionWeakPtr& conn,
Promise<Result, ClientConnectionWeakPtr> promise);

void closeAsync(CloseCallback callback);
void shutdown();
Expand Down Expand Up @@ -134,7 +131,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
std::mutex mutex_;

State state_;
std::string serviceUrl_;
ServiceNameResolver serviceNameResolver_;
ClientConfiguration clientConfiguration_;
MemoryLimitController memoryLimitController_;

Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class PULSAR_PUBLIC ConnectionPool {
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress);

Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
return getConnectionAsync(address, address);
}

private:
ClientConfiguration clientConfiguration_;
ExecutorServiceProviderPtr executorProvider_;
Expand Down
Loading