diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 41140754..63a25159 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -36,7 +36,9 @@ #include #include +#include #include +#include namespace pulsar { typedef std::function CreateProducerCallback; @@ -47,6 +49,21 @@ typedef std::function&)> GetPartitio typedef std::function CloseCallback; using CreateProducerV2Callback = std::function)>; +using CreateConsumerV2Callback = std::function)>; +using SubscribeV2Callback = CreateConsumerV2Callback; +using ReaderV2Callback = std::function)>; +using TableViewV2Callback = std::function)>; + +/** + * Use TopicRegex with subscribeV2/subscribeAsyncV2 to distinguish a regex pattern from a single topic name. + */ +struct TopicRegex { + explicit TopicRegex(std::string pattern) : pattern(std::move(pattern)) {} + + std::string pattern; +}; + +using SubscribeTopics = std::variant, TopicRegex>; class ClientImpl; class PulsarFriend; @@ -188,6 +205,13 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback); + + std::variant subscribeV2(const SubscribeTopics& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf); + /** * Subscribe to multiple topics under the same namespace. * @@ -332,6 +356,12 @@ class PULSAR_PUBLIC Client { void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, ReaderV2Callback callback); + + std::variant createReaderV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf); + /** * Create a table view with given {@code TableViewConfiguration} for specified topic. * @@ -362,6 +392,12 @@ class PULSAR_PUBLIC Client { void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, const TableViewCallback& callBack); + void createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf, + TableViewV2Callback callback); + + std::variant createTableViewV2(const std::string& topic, + const TableViewConfiguration& conf); + /** * Get the list of partitions for a given topic. * diff --git a/lib/Client.cc b/lib/Client.cc index 32ab87c0..11dfe267 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -26,15 +26,35 @@ #include "ClientImpl.h" #include "Int64SerDes.h" -#include "LogUtils.h" #include "LookupService.h" #include "TopicName.h" #include "Utils.h" -DECLARE_LOG_OBJECT() - namespace pulsar { +namespace { + +template +void setPromiseValue(std::promise>& promise, const std::variant& value) { + if (const auto* error = std::get_if(&value)) { + promise.set_value(*error); + } else { + promise.set_value(std::get(value)); + } +} + +template +void invokeLegacyCallback(const std::function& callback, + const std::variant& value) { + if (const auto* error = std::get_if(&value)) { + callback(error->result, T()); + } else { + callback(ResultOk, std::get(value)); + } +} + +} // namespace + Client::Client(const std::shared_ptr& impl) : impl_(impl) { impl_->initialize(); } Client::Client(const std::string& serviceUrl) : Client(serviceUrl, ClientConfiguration()) {} @@ -83,13 +103,8 @@ void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfi std::variant Client::createProducerV2(const std::string& topic, const ProducerConfiguration& conf) { std::promise> promise; - createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable { - if (const auto* error = std::get_if(&v)) { - promise.set_value(*error); - } else { - promise.set_value(std::get(v)); - } - }); + createProducerAsyncV2(topic, conf, + [&promise](const auto& v) mutable { setPromiseValue(promise, v); }); return promise.get_future().get(); } @@ -113,8 +128,22 @@ void Client::subscribeAsync(const std::string& topic, const std::string& subscri void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { - LOG_INFO("Subscribing on Topic :" << topic); - impl_->subscribeAsync(topic, subscriptionName, conf, callback); + subscribeAsyncV2(topic, subscriptionName, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void Client::subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback) { + impl_->subscribeAsyncV2(topics, subscriptionName, conf, std::move(callback)); +} + +std::variant Client::subscribeV2(const SubscribeTopics& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf) { + std::promise> promise; + subscribeAsyncV2(topics, subscriptionName, conf, + [&promise](const auto& v) mutable { setPromiseValue(promise, v); }); + return promise.get_future().get(); } Result Client::subscribe(const std::vector& topics, const std::string& subscriptionName, @@ -138,7 +167,8 @@ void Client::subscribeAsync(const std::vector& topics, const std::s void Client::subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { - impl_->subscribeAsync(topics, subscriptionName, conf, callback); + subscribeAsyncV2(topics, subscriptionName, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); } Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, @@ -162,7 +192,8 @@ void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std: void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { - impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback); + subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); } Result Client::createReader(const std::string& topic, const MessageId& startMessageId, @@ -176,7 +207,21 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { - impl_->createReaderAsync(topic, startMessageId, conf, callback); + createReaderAsyncV2(topic, startMessageId, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, ReaderV2Callback callback) { + impl_->createReaderAsyncV2(topic, startMessageId, conf, std::move(callback)); +} + +std::variant Client::createReaderV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf) { + std::promise> promise; + createReaderAsyncV2(topic, startMessageId, conf, + [&promise](const auto& v) mutable { setPromiseValue(promise, v); }); + return promise.get_future().get(); } Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf, @@ -190,7 +235,21 @@ Result Client::createTableView(const std::string& topic, const TableViewConfigur void Client::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, const TableViewCallback& callback) { - impl_->createTableViewAsync(topic, conf, callback); + createTableViewAsyncV2( + topic, conf, [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void Client::createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf, + TableViewV2Callback callback) { + impl_->createTableViewAsyncV2(topic, conf, std::move(callback)); +} + +std::variant Client::createTableViewV2(const std::string& topic, + const TableViewConfiguration& conf) { + std::promise> promise; + createTableViewAsyncV2(topic, conf, + [&promise](const auto& v) mutable { setPromiseValue(promise, v); }); + return promise.get_future().get(); } Result Client::getPartitionsForTopic(const std::string& topic, std::vector& partitions) { diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 1e4f0223..dc79fce0 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -62,6 +62,20 @@ DECLARE_LOG_OBJECT() namespace pulsar { +namespace { + +template +void invokeLegacyCallback(const std::function& callback, + const std::variant& value) { + if (const auto* error = std::get_if(&value)) { + callback(error->result, T()); + } else { + callback(ResultOk, std::get(value)); + } +} + +} // namespace + static const char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; static std::uniform_int_distribution<> hexDigitsDist(0, sizeof(hexDigits) - 1); @@ -286,72 +300,93 @@ void ClientImpl::handleProducerCreated(const Error& error, const ProducerImplBas void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { + createReaderAsyncV2(topic, startMessageId, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void ClientImpl::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, ReaderV2Callback callback) { TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Reader()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Reader()); + callback(Error{ResultInvalidTopicName, ""}); return; } } - getPartitionMetadataAsync(topicName).addListener([this, self{shared_from_this()}, topicName, - startMessageId, conf, - callback](const auto& error, const auto& metadata) { - handleReaderMetadataLookup(error.result, metadata, topicName, startMessageId, conf, callback); - }); + getPartitionMetadataAsync(topicName).addListener( + [this, self{shared_from_this()}, topicName, startMessageId, conf, callback{std::move(callback)}]( + const auto& error, const auto& metadata) { + handleReaderMetadataLookup(error, metadata, topicName, startMessageId, conf, callback); + }); } void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, const TableViewCallback& callback) { + createTableViewAsyncV2( + topic, conf, [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void ClientImpl::createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf, + TableViewV2Callback callback) { TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, TableView()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, TableView()); + callback(Error{ResultInvalidTopicName, ""}); return; } } TableViewImplPtr tableViewPtr = std::make_shared(shared_from_this(), topicName->toString(), conf); - tableViewPtr->start().addListener([callback](Result result, const TableViewImplPtr& tableViewImplPtr) { - if (result == ResultOk) { - callback(result, TableView{tableViewImplPtr}); - } else { - callback(result, {}); - } - }); + tableViewPtr->start().addListener( + [callback{std::move(callback)}](const Error& error, const TableViewImplPtr& tableViewImplPtr) { + if (error.result == ResultOk) { + callback(TableView{tableViewImplPtr}); + } else { + callback(error); + } + }); } -void ClientImpl::handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata, +void ClientImpl::handleReaderMetadataLookup(const Error& error, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, - const ReaderConfiguration& conf, const ReaderCallback& callback) { - if (result != ResultOk) { + const ReaderConfiguration& conf, + const ReaderV2Callback& callback) { + if (error.result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while creating readeron " - << topicName->toString() << " -- " << result); - callback(result, Reader()); + << topicName->toString() << " -- " << error.result); + callback(error); return; } ReaderImplPtr reader; try { + ReaderCallback readerCreatedCallback = [callback](Result result, const Reader& reader) { + if (result == ResultOk) { + callback(reader); + } else { + callback(Error{result, ""}); + } + }; reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(), partitionMetadata->getPartitions(), conf, - getListenerExecutorProvider()->get(), callback)); + getListenerExecutorProvider()->get(), readerCreatedCallback)); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create reader: " << e.what()); - callback(ResultConnectError, {}); + callback(Error{ResultConnectError, e.what()}); return; } ConsumerImplBasePtr consumer = reader->getConsumer(); @@ -375,18 +410,34 @@ void ClientImpl::handleReaderMetadataLookup(Result result, const LookupDataResul void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void ClientImpl::subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback) { + std::visit( + [this, &subscriptionName, &conf, callback{std::move(callback)}](const auto& topicSpec) mutable { + subscribeToTopicsAsyncV2(topicSpec, subscriptionName, conf, std::move(callback)); + }, + topics); +} + +void ClientImpl::subscribeToTopicsAsyncV2(const TopicRegex& topicRegex, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback) { + const auto regexPattern = topicRegex.pattern; TopicNamePtr topicNamePtr = TopicName::get(regexPattern); std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else { lock.unlock(); if (!topicNamePtr) { LOG_ERROR("Topic pattern not valid: " << regexPattern); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } } @@ -411,14 +462,16 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const break; default: LOG_ERROR("RegexSubscriptionMode not valid: " << regexSubscriptionMode); - callback(ResultInvalidConfiguration, Consumer()); + callback(Error{ResultInvalidConfiguration, ""}); return; } getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) - .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), - std::placeholders::_1, std::placeholders::_2, regexPattern, mode, - subscriptionName, conf, callback)); + .addListener([self{shared_from_this()}, regexPattern, mode, subscriptionName, conf, + callback{std::move(callback)}](Result result, const NamespaceTopicsPtr& topics) { + self->createPatternMultiTopicsConsumer(result, topics, regexPattern, mode, subscriptionName, conf, + callback); + }); } void ClientImpl::createPatternMultiTopicsConsumer(Result result, const NamespaceTopicsPtr& topics, @@ -426,7 +479,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace CommandGetTopicsOfNamespace_Mode mode, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const SubscribeCallback& callback) { + SubscribeV2Callback callback) { if (result == ResultOk) { ConsumerImplBasePtr consumer; @@ -441,18 +494,27 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, interceptors); consumer->getConsumerCreatedFuture().addListener( - std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, callback, consumer)); + [self{shared_from_this()}, callback{std::move(callback)}, consumer]( + Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) { + self->handleConsumerCreated(result, consumerImplBaseWeakPtr, callback, consumer); + }); consumer->start(); } else { LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result); - callback(result, Consumer()); + callback(Error{result, ""}); } } void ClientImpl::subscribeAsync(const std::vector& originalTopics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(originalTopics, subscriptionName, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void ClientImpl::subscribeToTopicsAsyncV2(const std::vector& originalTopics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback) { TopicNamePtr topicNamePtr; // Remove duplicates from the list of topics @@ -464,12 +526,12 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else { if (!topics.empty() && !(topicNamePtr = MultiTopicsConsumerImpl::topicNamesValid(topics))) { lock.unlock(); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } } @@ -487,45 +549,54 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, ConsumerImplBasePtr consumer = std::make_shared( shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); - consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, - shared_from_this(), std::placeholders::_1, - std::placeholders::_2, callback, consumer)); + consumer->getConsumerCreatedFuture().addListener( + [self{shared_from_this()}, callback{std::move(callback)}, consumer]( + Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) { + self->handleConsumerCreated(result, consumerImplBaseWeakPtr, callback, consumer); + }); consumer->start(); } void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(topic, subscriptionName, conf, + [callback](const auto& value) { invokeLegacyCallback(callback, value); }); +} + +void ClientImpl::subscribeToTopicsAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback) { + LOG_INFO("Subscribing on Topic :" << topic); TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } else if (conf.isReadCompacted() && (topicName->getDomain().compare("persistent") != 0 || (conf.getConsumerType() != ConsumerExclusive && conf.getConsumerType() != ConsumerFailover))) { lock.unlock(); - callback(ResultInvalidConfiguration, Consumer()); + callback(Error{ResultInvalidConfiguration, ""}); return; } } - getPartitionMetadataAsync(topicName).addListener([this, self{shared_from_this()}, topicName, - subscriptionName, conf, - callback](const auto& error, const auto& metadata) { - handleSubscribe(error.result, metadata, topicName, subscriptionName, conf, callback); - }); + getPartitionMetadataAsync(topicName).addListener( + [this, self{shared_from_this()}, topicName, subscriptionName, conf, callback{std::move(callback)}]( + const auto& error, const auto& metadata) { + handleSubscribe(error, metadata, topicName, subscriptionName, conf, callback); + }); } -void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, +void ClientImpl::handleSubscribe(const Error& error, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& subscriptionName, - ConsumerConfiguration conf, const SubscribeCallback& callback) { - if (result == ResultOk) { + ConsumerConfiguration conf, SubscribeV2Callback callback) { + if (error.result == ResultOk) { // generate random name if not supplied by the customer. if (conf.getConsumerName().empty()) { conf.setConsumerName(generateRandomName()); @@ -537,7 +608,7 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti if (partitionMetadata->getPartitions() > 0) { if (conf.getReceiverQueueSize() == 0) { LOG_ERROR("Can't use partitioned topic if the queue size is 0."); - callback(ResultInvalidConfiguration, Consumer()); + callback(Error{ResultInvalidConfiguration, ""}); return; } consumer = std::make_shared(shared_from_this(), topicName, @@ -552,22 +623,24 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti } } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create consumer: " << e.what()); - callback(ResultConnectError, {}); + callback(Error{ResultConnectError, e.what()}); return; } consumer->getConsumerCreatedFuture().addListener( - std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, callback, consumer)); + [self{shared_from_this()}, callback{std::move(callback)}, consumer]( + Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) { + self->handleConsumerCreated(result, consumerImplBaseWeakPtr, callback, consumer); + }); consumer->start(); } else { - LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString() - << " -- " << result); - callback(result, Consumer()); + LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " + << topicName->toString() << " -- " << error.result); + callback(error); } } void ClientImpl::handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr, - const SubscribeCallback& callback, + const SubscribeV2Callback& callback, const ConsumerImplBasePtr& consumer) { if (result == ResultOk) { auto address = consumer.get(); @@ -576,18 +649,18 @@ void ClientImpl::handleConsumerCreated(Result result, const ConsumerImplBaseWeak auto consumer = existingConsumer.value().lock(); LOG_ERROR("Unexpected existing consumer at the same address: " << address << ", consumer: " << (consumer ? consumer->getName() : "(null)")); - callback(ResultUnknownError, {}); + callback(Error{ResultUnknownError, ""}); return; } - callback(result, Consumer(consumer)); + callback(Consumer(consumer)); } else { // In order to be compatible with the current broker error code confusion. // https://github.com/apache/pulsar/blob/cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java#L240-L241 if (result == ResultProducerBusy) { LOG_ERROR("Failed to create consumer: SubscriptionName cannot be empty."); - callback(ResultInvalidConfiguration, {}); + callback(Error{ResultInvalidConfiguration, "SubscriptionName cannot be empty."}); } else { - callback(result, {}); + callback(Error{result, ""}); } } } diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index c046d8e9..cdabf1fc 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -98,6 +98,9 @@ class ClientImpl : public std::enable_shared_from_this { void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback); + void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); @@ -107,9 +110,15 @@ class ClientImpl : public std::enable_shared_from_this { void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, ReaderV2Callback callback); + void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, const TableViewCallback& callback); + void createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf, + TableViewV2Callback callback); + void getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback); // Use virtual method to test @@ -175,13 +184,22 @@ class ClientImpl : public std::enable_shared_from_this { const TopicNamePtr& topicName, const ProducerConfiguration& conf, CreateProducerV2Callback callback); - void handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, + void handleSubscribe(const Error& error, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& consumerName, - ConsumerConfiguration conf, const SubscribeCallback& callback); + ConsumerConfiguration conf, SubscribeV2Callback callback); + + void subscribeToTopicsAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback); + + void subscribeToTopicsAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback); + + void subscribeToTopicsAsyncV2(const TopicRegex& topicRegex, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeV2Callback callback); - void handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata, + void handleReaderMetadataLookup(const Error& error, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, - const ReaderConfiguration& conf, const ReaderCallback& callback); + const ReaderConfiguration& conf, const ReaderV2Callback& callback); void handleGetPartitions(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const GetPartitionsCallback& callback); @@ -189,7 +207,7 @@ class ClientImpl : public std::enable_shared_from_this { void handleProducerCreated(const Error& error, const ProducerImplBaseWeakPtr& producerWeakPtr, const CreateProducerV2Callback& callback, const ProducerImplBasePtr& producer); void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerWeakPtr, - const SubscribeCallback& callback, const ConsumerImplBasePtr& consumer); + const SubscribeV2Callback& callback, const ConsumerImplBasePtr& consumer); typedef std::shared_ptr SharedInt; @@ -199,7 +217,7 @@ class ClientImpl : public std::enable_shared_from_this { const std::string& regexPattern, CommandGetTopicsOfNamespace_Mode mode, const std::string& consumerName, const ConsumerConfiguration& conf, - const SubscribeCallback& callback); + SubscribeV2Callback callback); const std::string& getPhysicalAddress(const std::string& redirectedClusterURI, const std::string& logicalAddress); diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc index f634fa52..686cd555 100644 --- a/lib/TableViewImpl.cc +++ b/lib/TableViewImpl.cc @@ -32,23 +32,23 @@ TableViewImpl::TableViewImpl(const ClientImplPtr& client, const std::string& top const TableViewConfiguration& conf) : client_(client), topic_(topic), conf_(conf) {} -Future TableViewImpl::start() { - Promise promise; +Future TableViewImpl::start() { + Promise promise; ReaderConfiguration readerConfiguration; readerConfiguration.setSchema(conf_.schemaInfo); readerConfiguration.setReadCompacted(true); readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName); TableViewImplPtr self = shared_from_this(); - ReaderCallback readerCallback = [self, promise](Result res, const Reader& reader) { - if (res == ResultOk) { - self->reader_ = reader.impl_; + ReaderV2Callback readerCallback = [self, promise](const std::variant& result) { + if (const auto* reader = std::get_if(&result)) { + self->reader_ = reader->impl_; self->readAllExistingMessages(promise, TimeUtils::currentTimeMillis(), 0); } else { - promise.setFailed(res); + promise.setFailed(std::get(result)); } }; - client_->createReaderAsync(topic_, MessageId::earliest(), readerConfiguration, readerCallback); + client_->createReaderAsyncV2(topic_, MessageId::earliest(), readerConfiguration, readerCallback); return promise.getFuture(); } @@ -118,14 +118,14 @@ void TableViewImpl::handleMessage(const Message& msg) { } } -void TableViewImpl::readAllExistingMessages(const Promise& promise, long startTime, +void TableViewImpl::readAllExistingMessages(const Promise& promise, long startTime, long messagesRead) { auto weakSelf = weak_from_this(); reader_->hasMessageAvailableAsync( [weakSelf, promise, startTime, messagesRead](Result result, bool hasMessage) { auto self = weakSelf.lock(); if (!self || result != ResultOk) { - promise.setFailed(result); + promise.setFailed(Error{result, ""}); return; } if (hasMessage) { @@ -135,7 +135,7 @@ void TableViewImpl::readAllExistingMessages(const Promise { ~TableViewImpl(){}; - Future start(); + Future start(); bool retrieveValue(const std::string& key, std::string& value); @@ -77,7 +77,7 @@ class TableViewImpl : public std::enable_shared_from_this { SynchronizedHashMap data_; void handleMessage(const Message& msg); - void readAllExistingMessages(const Promise& promise, long startTime, + void readAllExistingMessages(const Promise& promise, long startTime, long messagesRead); void readTailMessage(); }; diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index d38e62c7..e2c4c16c 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -54,6 +54,15 @@ std::string getToken() { return str; } +template +void assertV2Error(const std::variant& result, Result expectedResult, + const std::string& expectedMessage) { + const auto* error = std::get_if(&result); + ASSERT_NE(nullptr, error); + EXPECT_EQ(expectedResult, error->result); + EXPECT_EQ(expectedMessage, error->message); +} + TEST(AuthPluginToken, testToken) { ClientConfiguration config = ClientConfiguration(); std::string token = getToken(); @@ -181,17 +190,17 @@ TEST(AuthPluginToken, testNoAuth) { std::string topicName = "persistent://private/auth/test-token"; std::string subName = "subscription-name"; + const std::string expectedErrorMessage = "Client is not authorized to Get Partition Metadata"; Producer producer; Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultAuthorizationError, result); - std::visit(overloaded{[](Error&& error) { - ASSERT_EQ(ResultAuthorizationError, error.result); - ASSERT_EQ("Client is not authorized to Get Partition Metadata", error.message); - }, - [](auto&&) { FAIL(); }}, - client.createProducerV2(topicName, {})); + assertV2Error(client.createProducerV2(topicName, {}), ResultAuthorizationError, expectedErrorMessage); + assertV2Error(client.subscribeV2(topicName, subName, {}), ResultAuthorizationError, expectedErrorMessage); + assertV2Error(client.createReaderV2(topicName, MessageId::earliest(), {}), ResultAuthorizationError, + expectedErrorMessage); + assertV2Error(client.createTableViewV2(topicName, {}), ResultAuthorizationError, expectedErrorMessage); Consumer consumer; result = client.subscribe(topicName, subName, consumer);