Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@

#include <memory>
#include <string>
#include <utility>
#include <variant>
#include <vector>

namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
Expand All @@ -47,6 +49,21 @@ typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitio
typedef std::function<void(Result)> CloseCallback;

using CreateProducerV2Callback = std::function<void(std::variant<Error, Producer>)>;
using CreateConsumerV2Callback = std::function<void(std::variant<Error, Consumer>)>;
using SubscribeV2Callback = CreateConsumerV2Callback;
using ReaderV2Callback = std::function<void(std::variant<Error, Reader>)>;
using TableViewV2Callback = std::function<void(std::variant<Error, TableView>)>;

/**
* Use TopicRegex with subscribeV2/subscribeAsyncV2 to distinguish a regex pattern from a single topic name.
*/
struct TopicRegex {
explicit TopicRegex(std::string pattern) : pattern(std::move(pattern)) {}

std::string pattern;
};

using SubscribeTopics = std::variant<std::string, std::vector<std::string>, TopicRegex>;

class ClientImpl;
class PulsarFriend;
Expand Down Expand Up @@ -188,6 +205,13 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback);

void subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeV2Callback callback);

std::variant<Error, Consumer> subscribeV2(const SubscribeTopics& topics,
const std::string& subscriptionName,
const ConsumerConfiguration& conf);

/**
* Subscribe to multiple topics under the same namespace.
*
Expand Down Expand Up @@ -332,6 +356,12 @@ class PULSAR_PUBLIC Client {
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallback& callback);

void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, ReaderV2Callback callback);

std::variant<Error, Reader> createReaderV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf);

/**
* Create a table view with given {@code TableViewConfiguration} for specified topic.
*
Expand Down Expand Up @@ -362,6 +392,12 @@ class PULSAR_PUBLIC Client {
void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
const TableViewCallback& callBack);

void createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf,
TableViewV2Callback callback);

std::variant<Error, TableView> createTableViewV2(const std::string& topic,
const TableViewConfiguration& conf);

/**
* Get the list of partitions for a given topic.
*
Expand Down
91 changes: 75 additions & 16 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,35 @@

#include "ClientImpl.h"
#include "Int64SerDes.h"
#include "LogUtils.h"
#include "LookupService.h"
#include "TopicName.h"
#include "Utils.h"

DECLARE_LOG_OBJECT()

namespace pulsar {

namespace {

template <typename T>
void setPromiseValue(std::promise<std::variant<Error, T>>& promise, const std::variant<Error, T>& value) {
if (const auto* error = std::get_if<Error>(&value)) {
promise.set_value(*error);
} else {
promise.set_value(std::get<T>(value));
}
}

template <typename T>
void invokeLegacyCallback(const std::function<void(Result, T)>& callback,
const std::variant<Error, T>& value) {
if (const auto* error = std::get_if<Error>(&value)) {
callback(error->result, T());
} else {
callback(ResultOk, std::get<T>(value));
}
}

} // namespace

Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) { impl_->initialize(); }

Client::Client(const std::string& serviceUrl) : Client(serviceUrl, ClientConfiguration()) {}
Expand Down Expand Up @@ -83,13 +103,8 @@ void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfi
std::variant<Error, Producer> Client::createProducerV2(const std::string& topic,
const ProducerConfiguration& conf) {
std::promise<std::variant<Error, Producer>> promise;
createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
if (const auto* error = std::get_if<Error>(&v)) {
promise.set_value(*error);
} else {
promise.set_value(std::get<Producer>(v));
}
});
createProducerAsyncV2(topic, conf,
[&promise](const auto& v) mutable { setPromiseValue<Producer>(promise, v); });
return promise.get_future().get();
}

Expand All @@ -113,8 +128,22 @@ void Client::subscribeAsync(const std::string& topic, const std::string& subscri

void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
LOG_INFO("Subscribing on Topic :" << topic);
impl_->subscribeAsync(topic, subscriptionName, conf, callback);
subscribeAsyncV2(topic, subscriptionName, conf,
[callback](const auto& value) { invokeLegacyCallback<Consumer>(callback, value); });
}

void Client::subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeV2Callback callback) {
impl_->subscribeAsyncV2(topics, subscriptionName, conf, std::move(callback));
}

std::variant<Error, Consumer> Client::subscribeV2(const SubscribeTopics& topics,
const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
std::promise<std::variant<Error, Consumer>> promise;
subscribeAsyncV2(topics, subscriptionName, conf,
[&promise](const auto& v) mutable { setPromiseValue<Consumer>(promise, v); });
return promise.get_future().get();
}

Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
Expand All @@ -138,7 +167,8 @@ void Client::subscribeAsync(const std::vector<std::string>& topics, const std::s

void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
subscribeAsyncV2(topics, subscriptionName, conf,
[callback](const auto& value) { invokeLegacyCallback<Consumer>(callback, value); });
}

Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
Expand All @@ -162,7 +192,8 @@ void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std:

void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback);
subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf,
[callback](const auto& value) { invokeLegacyCallback<Consumer>(callback, value); });
}

Result Client::createReader(const std::string& topic, const MessageId& startMessageId,
Expand All @@ -176,7 +207,21 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess

void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallback& callback) {
impl_->createReaderAsync(topic, startMessageId, conf, callback);
createReaderAsyncV2(topic, startMessageId, conf,
[callback](const auto& value) { invokeLegacyCallback<Reader>(callback, value); });
}

void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, ReaderV2Callback callback) {
impl_->createReaderAsyncV2(topic, startMessageId, conf, std::move(callback));
}

std::variant<Error, Reader> Client::createReaderV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf) {
std::promise<std::variant<Error, Reader>> promise;
createReaderAsyncV2(topic, startMessageId, conf,
[&promise](const auto& v) mutable { setPromiseValue<Reader>(promise, v); });
return promise.get_future().get();
}

Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf,
Expand All @@ -190,7 +235,21 @@ Result Client::createTableView(const std::string& topic, const TableViewConfigur

void Client::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
const TableViewCallback& callback) {
impl_->createTableViewAsync(topic, conf, callback);
createTableViewAsyncV2(
topic, conf, [callback](const auto& value) { invokeLegacyCallback<TableView>(callback, value); });
}

void Client::createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf,
TableViewV2Callback callback) {
impl_->createTableViewAsyncV2(topic, conf, std::move(callback));
}

std::variant<Error, TableView> Client::createTableViewV2(const std::string& topic,
const TableViewConfiguration& conf) {
std::promise<std::variant<Error, TableView>> promise;
createTableViewAsyncV2(topic, conf,
[&promise](const auto& v) mutable { setPromiseValue<TableView>(promise, v); });
return promise.get_future().get();
}

Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::string>& partitions) {
Expand Down
Loading
Loading