& keyNames, const CryptoKeyReaderPtr& keyReader);
/*
* Remove a key Remove the key identified by the keyName from the list of keys.
@@ -84,7 +84,7 @@ class MessageCrypto {
*
* @return true if success
*/
- bool encrypt(const std::set& encKeys, const CryptoKeyReaderPtr keyReader,
+ bool encrypt(const std::set& encKeys, const CryptoKeyReaderPtr& keyReader,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload, SharedBuffer& encryptedPayload);
/*
@@ -98,7 +98,7 @@ class MessageCrypto {
* @return true if success
*/
bool decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
- const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload);
+ const CryptoKeyReaderPtr& keyReader, SharedBuffer& decryptedPayload);
private:
typedef std::unique_lock Lock;
@@ -131,7 +131,7 @@ class MessageCrypto {
unsigned char keyDigest[], unsigned int& digestLen);
void removeExpiredDataKey();
- Result addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader);
+ Result addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr& keyReader);
bool decryptDataKey(const proto::EncryptionKeys& encKeys, const CryptoKeyReader& keyReader);
bool decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata,
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index 4650b99e..3974c4c2 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -122,7 +122,7 @@ void MessageImpl::convertPayloadToKeyValue(const pulsar::SchemaInfo& schemaInfo)
getKeyValueEncodingType(schemaInfo));
}
-KeyValueEncodingType MessageImpl::getKeyValueEncodingType(SchemaInfo schemaInfo) {
+KeyValueEncodingType MessageImpl::getKeyValueEncodingType(const SchemaInfo& schemaInfo) {
if (schemaInfo.getSchemaType() != KEY_VALUE) {
throw std::invalid_argument("Schema not key value type.");
}
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 55b9612e..6467b359 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -76,7 +76,7 @@ class MessageImpl {
void setSchemaVersion(const std::string& value);
void convertKeyValueToPayload(const SchemaInfo& schemaInfo);
void convertPayloadToKeyValue(const SchemaInfo& schemaInfo);
- KeyValueEncodingType getKeyValueEncodingType(SchemaInfo schemaInfo);
+ KeyValueEncodingType getKeyValueEncodingType(const SchemaInfo& schemaInfo);
friend class PulsarWrapper;
friend class MessageBuilder;
diff --git a/lib/MultiTopicsBrokerConsumerStatsImpl.cc b/lib/MultiTopicsBrokerConsumerStatsImpl.cc
index 4f969222..159043bc 100644
--- a/lib/MultiTopicsBrokerConsumerStatsImpl.cc
+++ b/lib/MultiTopicsBrokerConsumerStatsImpl.cc
@@ -152,7 +152,7 @@ BrokerConsumerStats MultiTopicsBrokerConsumerStatsImpl::getBrokerConsumerStats(i
return statsList_[index];
}
-void MultiTopicsBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int index) {
+void MultiTopicsBrokerConsumerStatsImpl::add(const BrokerConsumerStats& stats, int index) {
statsList_[index] = stats;
}
diff --git a/lib/MultiTopicsBrokerConsumerStatsImpl.h b/lib/MultiTopicsBrokerConsumerStatsImpl.h
index 481318e5..942d3a0d 100644
--- a/lib/MultiTopicsBrokerConsumerStatsImpl.h
+++ b/lib/MultiTopicsBrokerConsumerStatsImpl.h
@@ -76,7 +76,7 @@ class PULSAR_PUBLIC MultiTopicsBrokerConsumerStatsImpl : public BrokerConsumerSt
/** Returns the BrokerConsumerStatsImpl at of ith partition */
BrokerConsumerStats getBrokerConsumerStats(int index);
- void add(BrokerConsumerStats stats, int index);
+ void add(const BrokerConsumerStats &stats, int index);
void clear();
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 8a431738..050b0b10 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -40,25 +40,23 @@ using namespace pulsar;
using std::chrono::milliseconds;
using std::chrono::seconds;
-MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
+MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName,
int numPartitions, const std::string& subscriptionName,
const ConsumerConfiguration& conf,
- LookupServicePtr lookupServicePtr,
+ const LookupServicePtr& lookupServicePtr,
const ConsumerInterceptorsPtr& interceptors,
- const Commands::SubscriptionMode subscriptionMode,
- boost::optional startMessageId)
+ Commands::SubscriptionMode subscriptionMode,
+ const boost::optional& startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
lookupServicePtr, interceptors, subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
}
-MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics,
- const std::string& subscriptionName, TopicNamePtr topicName,
- const ConsumerConfiguration& conf,
- LookupServicePtr lookupServicePtr,
- const ConsumerInterceptorsPtr& interceptors,
- const Commands::SubscriptionMode subscriptionMode,
- boost::optional startMessageId)
+MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
+ const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName,
+ const TopicNamePtr& topicName, const ConsumerConfiguration& conf,
+ const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors,
+ Commands::SubscriptionMode subscriptionMode, const boost::optional& startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
@@ -120,7 +118,7 @@ void MultiTopicsConsumerImpl::start() {
// subscribe for each passed in topic
auto weakSelf = weak_from_this();
for (std::vector::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
- auto topic = *itr;
+ const auto& topic = *itr;
subscribeOneTopicAsync(topic).addListener(
[this, weakSelf, topic, topicsNeedCreate](Result result, const Consumer& consumer) {
auto self = weakSelf.lock();
@@ -131,9 +129,9 @@ void MultiTopicsConsumerImpl::start() {
}
}
-void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
- const std::string& topic,
- std::shared_ptr> topicsNeedCreate) {
+void MultiTopicsConsumerImpl::handleOneTopicSubscribed(
+ Result result, const Consumer& consumer, const std::string& topic,
+ const std::shared_ptr>& topicsNeedCreate) {
if (result != ResultOk) {
state_ = Failed;
// Use the first failed result
@@ -205,9 +203,9 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
return topicPromise->getFuture();
}
-void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
- const std::string& consumerName,
- ConsumerSubResultPromisePtr topicSubResultPromise) {
+void MultiTopicsConsumerImpl::subscribeTopicPartitions(
+ int numPartitions, const TopicNamePtr& topicName, const std::string& consumerName,
+ const ConsumerSubResultPromisePtr& topicSubResultPromise) {
std::shared_ptr consumer;
ConsumerConfiguration config = conf_.clone();
// Pause messageListener until all child topics are subscribed.
@@ -223,7 +221,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
auto weakSelf = weak_from_this();
- config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+ config.setMessageListener([this, weakSelf](const Consumer& consumer, const Message& msg) {
auto self = weakSelf.lock();
if (self) {
messageReceived(consumer, msg);
@@ -295,9 +293,9 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
}
void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
- Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
- std::shared_ptr> partitionsNeedCreate,
- ConsumerSubResultPromisePtr topicSubResultPromise) {
+ Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr,
+ const std::shared_ptr>& partitionsNeedCreate,
+ const ConsumerSubResultPromisePtr& topicSubResultPromise) {
if (state_ == Failed) {
// one of the consumer creation failed, and we are cleaning up
topicSubResultPromise->setFailed(ResultAlreadyClosed);
@@ -325,12 +323,12 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
}
}
-void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
+void MultiTopicsConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) {
LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing");
auto callback = [this, originalCallback](Result result) {
if (result == ResultOk) {
- shutdown();
+ internalShutdown();
LOG_INFO(getName() << "Unsubscribed successfully");
} else {
state_ = Ready;
@@ -350,7 +348,7 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback)
auto self = get_shared_this_ptr();
consumers_.forEachValue(
- [this, self, callback](const ConsumerImplPtr& consumer, SharedFuture future) {
+ [this, self, callback](const ConsumerImplPtr& consumer, const SharedFuture& future) {
consumer->unsubscribeAsync([this, self, callback, future](Result result) {
if (result != ResultOk) {
state_ = Failed;
@@ -367,7 +365,8 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback)
[callback] { callback(ResultOk); });
}
-void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
+void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
+ const ResultCallback& callback) {
Lock lock(mutex_);
std::map::iterator it = topicsPartitions_.find(topic);
if (it == topicsPartitions_.end()) {
@@ -412,8 +411,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
}
void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
- Result result, std::shared_ptr> consumerUnsubed, int numberPartitions,
- TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback) {
+ Result result, const std::shared_ptr>& consumerUnsubed, int numberPartitions,
+ const TopicNamePtr& topicNamePtr, const std::string& topicPartitionName, const ResultCallback& callback) {
(*consumerUnsubed)++;
if (result != ResultOk) {
@@ -448,12 +447,12 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
}
}
-void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
+void MultiTopicsConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
std::weak_ptr weakSelf{get_shared_this_ptr()};
auto callback = [weakSelf, originalCallback](Result result) {
auto self = weakSelf.lock();
if (self) {
- self->shutdown();
+ self->internalShutdown();
if (result != ResultOk) {
LOG_WARN(self->getName() << "Failed to close consumer: " << result);
if (result != ResultAlreadyClosed) {
@@ -511,7 +510,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
batchReceiveTimer_->cancel();
}
-void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
+void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Message& msg) {
if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
return;
}
@@ -555,7 +554,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
}
}
-void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
+void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) {
Message m;
incomingMessages_.pop(m);
try {
@@ -603,7 +602,7 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
}
}
-void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback callback) {
+void MultiTopicsConsumerImpl::receiveAsync(const ReceiveCallback& callback) {
Message msg;
// fail the callback if consumer is closing or closed
@@ -650,7 +649,7 @@ void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const
callback(result, msg);
}
-void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, const ResultCallback& callback) {
if (state_ != Ready) {
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId);
callback(ResultAlreadyClosed);
@@ -674,7 +673,8 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
}
}
-void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList,
+ const ResultCallback& callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
@@ -682,7 +682,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
std::unordered_map topicToMessageId;
for (const MessageId& messageId : messageIdList) {
- auto topicName = messageId.getTopicName();
+ const auto& topicName = messageId.getTopicName();
if (topicName.empty()) {
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
callback(ResultOperationNotSupported);
@@ -716,7 +716,8 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
}
}
-void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
+void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
+ const ResultCallback& callback) {
msgId.getTopicName();
auto optConsumer = consumers_.find(msgId.getTopicName());
if (optConsumer) {
@@ -734,7 +735,7 @@ void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
}
}
-MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { shutdown(); }
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { internalShutdown(); }
Future MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
return multiTopicsConsumerCreatedPromise_.getFuture();
@@ -745,7 +746,9 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
-void MultiTopicsConsumerImpl::shutdown() {
+void MultiTopicsConsumerImpl::shutdown() { internalShutdown(); }
+
+void MultiTopicsConsumerImpl::internalShutdown() {
cancelTimers();
incomingMessages_.clear();
topicsPartitions_.clear();
@@ -812,7 +815,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set> topicToMessageId;
for (const MessageId& messageId : messageIds) {
- auto topicName = messageId.getTopicName();
+ const auto& topicName = messageId.getTopicName();
topicToMessageId[topicName].emplace(messageId);
}
@@ -828,7 +831,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::setgetBrokerConsumerStatsAsync(
- [this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
- auto self = weakSelf.lock();
- if (self) {
- handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
- }
- });
+ consumer->getBrokerConsumerStatsAsync([this, weakSelf, latchPtr, statsPtr, index, callback](
+ Result result, const BrokerConsumerStats& stats) {
+ auto self = weakSelf.lock();
+ if (self) {
+ handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+ }
+ });
});
}
-void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
+void MultiTopicsConsumerImpl::getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) {
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
}
-void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
- LatchPtr latchPtr,
- MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
- BrokerConsumerStatsCallback callback) {
+void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res,
+ const BrokerConsumerStats& brokerConsumerStats,
+ const LatchPtr& latchPtr,
+ const MultiTopicsBrokerConsumerStatsPtr& statsPtr,
+ size_t index,
+ const BrokerConsumerStatsCallback& callback) {
Lock lock(mutex_);
if (res == ResultOk) {
latchPtr->countdown();
@@ -907,7 +912,7 @@ void MultiTopicsConsumerImpl::afterSeek() {
});
}
-void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
+void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callback) {
if (msgId == MessageId::earliest() || msgId == MessageId::latest()) {
return seekAllAsync(msgId, callback);
}
@@ -933,7 +938,7 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
});
}
-void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
+void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) {
seekAllAsync(timestamp, callback);
}
@@ -992,7 +997,7 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
});
}
}
-void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
+void MultiTopicsConsumerImpl::handleGetPartitions(const TopicNamePtr& topicName, Result result,
const LookupDataResultPtr& lookupDataResult,
int currentNumPartitions) {
if (state_ != Ready) {
@@ -1024,9 +1029,9 @@ void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result
}
void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
- int numPartitions, TopicNamePtr topicName, int partitionIndex,
- ConsumerSubResultPromisePtr topicSubResultPromise,
- std::shared_ptr> partitionsNeedCreate) {
+ int numPartitions, const TopicNamePtr& topicName, int partitionIndex,
+ const ConsumerSubResultPromisePtr& topicSubResultPromise,
+ const std::shared_ptr>& partitionsNeedCreate) {
ConsumerConfiguration config = conf_.clone();
auto client = client_.lock();
if (!client) {
@@ -1035,7 +1040,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
}
ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
auto weakSelf = weak_from_this();
- config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+ config.setMessageListener([this, weakSelf](const Consumer& consumer, const Message& msg) {
auto self = weakSelf.lock();
if (self) {
messageReceived(consumer, msg);
@@ -1120,7 +1125,7 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
}
}
-void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
+void MultiTopicsConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) {
if (incomingMessagesSize_ > 0) {
callback(ResultOk, true);
return;
@@ -1130,24 +1135,25 @@ void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallba
auto needCallBack = std::make_shared>(consumers_.size());
auto self = get_shared_this_ptr();
- consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
- consumer->hasMessageAvailableAsync(
- [self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
- if (result != ResultOk) {
- LOG_ERROR("Filed when acknowledge list: " << result);
- // set needCallBack is -1 to avoid repeated callback.
- needCallBack->store(-1);
- callback(result, false);
- return;
- }
-
- if (hasMsg) {
- hasMessageAvailable->store(hasMsg);
- }
-
- if (--(*needCallBack) == 0) {
- callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
- }
- });
- });
+ consumers_.forEachValue(
+ [self, needCallBack, callback, hasMessageAvailable](const ConsumerImplPtr& consumer) {
+ consumer->hasMessageAvailableAsync(
+ [self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
+ if (result != ResultOk) {
+ LOG_ERROR("Filed when acknowledge list: " << result);
+ // set needCallBack is -1 to avoid repeated callback.
+ needCallBack->store(-1);
+ callback(result, false);
+ return;
+ }
+
+ if (hasMsg) {
+ hasMessageAvailable->store(hasMsg);
+ }
+
+ if (--(*needCallBack) == 0) {
+ callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
+ }
+ });
+ });
}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 6763942f..e92ec0ec 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -53,18 +53,19 @@ using LookupServicePtr = std::shared_ptr;
class MultiTopicsConsumerImpl;
class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
- MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
+ MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
- LookupServicePtr lookupServicePtr, const ConsumerInterceptorsPtr& interceptors,
+ const LookupServicePtr& lookupServicePtr,
+ const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
- boost::optional startMessageId = boost::none);
+ const boost::optional& startMessageId = boost::none);
- MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics,
- const std::string& subscriptionName, TopicNamePtr topicName,
- const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
+ MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector& topics,
+ const std::string& subscriptionName, const TopicNamePtr& topicName,
+ const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_,
const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
- boost::optional startMessageId = boost::none);
+ const boost::optional& startMessageId = boost::none);
~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
@@ -73,14 +74,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const std::string& getTopic() const override;
Result receive(Message& msg) override;
Result receive(Message& msg, int timeout) override;
- void receiveAsync(ReceiveCallback callback) override;
- void unsubscribeAsync(ResultCallback callback) override;
- void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override;
- void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override;
- void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
- void closeAsync(ResultCallback callback) override;
+ void receiveAsync(const ReceiveCallback& callback) override;
+ void unsubscribeAsync(const ResultCallback& callback) override;
+ void acknowledgeAsync(const MessageId& msgId, const ResultCallback& callback) override;
+ void acknowledgeAsync(const MessageIdList& messageIdList, const ResultCallback& callback) override;
+ void acknowledgeCumulativeAsync(const MessageId& msgId, const ResultCallback& callback) override;
+ void closeAsync(const ResultCallback& callback) override;
void start() override;
void shutdown() override;
+ void internalShutdown();
bool isClosed() override;
bool isOpen() override;
Result pauseMessageListener() override;
@@ -89,20 +91,21 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void redeliverUnacknowledgedMessages(const std::set& messageIds) override;
const std::string& getName() const override;
int getNumOfPrefetchedMessages() const override;
- void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
- void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override;
- void seekAsync(const MessageId& msgId, ResultCallback callback) override;
- void seekAsync(uint64_t timestamp, ResultCallback callback) override;
+ void getBrokerConsumerStatsAsync(const BrokerConsumerStatsCallback& callback) override;
+ void getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) override;
+ void seekAsync(const MessageId& msgId, const ResultCallback& callback) override;
+ void seekAsync(uint64_t timestamp, const ResultCallback& callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
- void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
+ void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) override;
- void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
- size_t, BrokerConsumerStatsCallback);
+ void handleGetConsumerStats(Result, const BrokerConsumerStats&, const LatchPtr&,
+ const MultiTopicsBrokerConsumerStatsPtr&, size_t,
+ const BrokerConsumerStatsCallback&);
// return first topic name when all topics name valid, or return null pointer
static std::shared_ptr topicNamesValid(const std::vector& topics);
- void unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback);
+ void unsubscribeOneTopicAsync(const std::string& topic, const ResultCallback& callback);
Future subscribeOneTopicAsync(const std::string& topic);
protected:
@@ -135,32 +138,35 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
unsigned int partitionIndex);
- void notifyResult(CloseCallback closeCallback);
- void messageReceived(Consumer consumer, const Message& msg);
+ void notifyResult(const CloseCallback& closeCallback);
+ void messageReceived(const Consumer& consumer, const Message& msg);
void messageProcessed(Message& msg);
- void internalListener(Consumer consumer);
+ void internalListener(const Consumer& consumer);
void receiveMessages();
void failPendingReceiveCallback();
void notifyPendingReceivedCallback(Result result, const Message& message,
const ReceiveCallback& callback);
- void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
- std::shared_ptr> topicsNeedCreate);
- void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, const std::string& consumerName,
- ConsumerSubResultPromisePtr topicSubResultPromise);
- void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
- std::shared_ptr> partitionsNeedCreate,
- ConsumerSubResultPromisePtr topicSubResultPromise);
- void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr> consumerUnsubed,
- int numberPartitions, TopicNamePtr topicNamePtr,
- std::string& topicPartitionName, ResultCallback callback);
+ void handleOneTopicSubscribed(Result result, const Consumer& consumer, const std::string& topic,
+ const std::shared_ptr>& topicsNeedCreate);
+ void subscribeTopicPartitions(int numPartitions, const TopicNamePtr& topicName,
+ const std::string& consumerName,
+ const ConsumerSubResultPromisePtr& topicSubResultPromise);
+ void handleSingleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr,
+ const std::shared_ptr>& partitionsNeedCreate,
+ const ConsumerSubResultPromisePtr& topicSubResultPromise);
+ void handleOneTopicUnsubscribedAsync(Result result,
+ const std::shared_ptr>& consumerUnsubed,
+ int numberPartitions, const TopicNamePtr& topicNamePtr,
+ const std::string& topicPartitionName,
+ const ResultCallback& callback);
void runPartitionUpdateTask();
void topicPartitionUpdate();
- void handleGetPartitions(TopicNamePtr topicName, Result result,
+ void handleGetPartitions(const TopicNamePtr& topicName, Result result,
const LookupDataResultPtr& lookupDataResult, int currentNumPartitions);
- void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
- ConsumerSubResultPromisePtr topicSubResultPromise,
- std::shared_ptr> partitionsNeedCreate);
+ void subscribeSingleNewConsumer(int numPartitions, const TopicNamePtr& topicName, int partitionIndex,
+ const ConsumerSubResultPromisePtr& topicSubResultPromise,
+ const std::shared_ptr>& partitionsNeedCreate);
// impl consumer base virtual method
bool hasEnoughMessagesForBatchReceive() const override;
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
@@ -181,7 +187,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
requires std::convertible_to ||
std::same_as>, MessageId>
#endif
- void seekAllAsync(const SeekArg& seekArg, ResultCallback callback);
+ void seekAllAsync(const SeekArg& seekArg, const ResultCallback& callback);
void beforeSeek();
void afterSeek();
@@ -200,7 +206,8 @@ template
requires std::convertible_to ||
std::same_as>, MessageId>
#endif
- inline void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg, ResultCallback callback) {
+ inline void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg,
+ const ResultCallback& callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
@@ -209,7 +216,8 @@ template
auto weakSelf = weak_from_this();
auto failed = std::make_shared(false);
consumers_.forEachValue(
- [this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer, SharedFuture future) {
+ [this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer,
+ const SharedFuture& future) {
consumer->seekAsync(seekArg, [this, weakSelf, callback, failed, future](Result result) {
auto self = weakSelf.lock();
if (!self || failed->load(std::memory_order_acquire)) {
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index e443496d..7116b1c5 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -31,7 +31,7 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
-NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer,
+NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr &client, ConsumerImpl &consumer,
const ConsumerConfiguration &conf)
: consumer_(consumer),
timerInterval_(0),
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index 472e9763..bf1d9318 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -42,7 +42,8 @@ using ExecutorServicePtr = std::shared_ptr;
class NegativeAcksTracker : public std::enable_shared_from_this {
public:
- NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer, const ConsumerConfiguration &conf);
+ NegativeAcksTracker(const ClientImplPtr &client, ConsumerImpl &consumer,
+ const ConsumerConfiguration &conf);
NegativeAcksTracker(const NegativeAcksTracker &) = delete;
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 4178096c..9b2f5c62 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -36,8 +36,8 @@ namespace pulsar {
const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-";
-PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const TopicNamePtr topicName,
- const unsigned int numPartitions,
+PartitionedProducerImpl::PartitionedProducerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName,
+ unsigned int numPartitions,
const ProducerConfiguration& config,
const ProducerInterceptorsPtr& interceptors)
: client_(client),
@@ -79,7 +79,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
}
}
-PartitionedProducerImpl::~PartitionedProducerImpl() { shutdown(); }
+PartitionedProducerImpl::~PartitionedProducerImpl() { internalShutdown(); }
// override
const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
@@ -144,9 +144,8 @@ void PartitionedProducerImpl::start() {
}
}
-void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result,
- ProducerImplBaseWeakPtr producerWeakPtr,
- unsigned int partitionIndex) {
+void PartitionedProducerImpl::handleSinglePartitionProducerCreated(
+ Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, unsigned int partitionIndex) {
// to indicate, we are doing cleanup using closeAsync after producer create
// has failed and the invocation of closeAsync is not from client
const auto numPartitions = getNumPartitionsWithLock();
@@ -234,9 +233,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
} else {
// Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
producer->getProducerCreatedFuture().addListener(
- [msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
+ [msg, callback](Result result, const ProducerImplBaseWeakPtr& weakProducer) {
if (result == ResultOk) {
- weakProducer.lock()->sendAsync(msg, std::move(callback));
+ weakProducer.lock()->sendAsync(msg, callback);
} else if (callback) {
callback(result, {});
}
@@ -245,7 +244,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// override
-void PartitionedProducerImpl::shutdown() {
+void PartitionedProducerImpl::shutdown() { internalShutdown(); }
+
+void PartitionedProducerImpl::internalShutdown() {
cancelTimers();
interceptors_->close();
auto client = client_.lock();
@@ -285,7 +286,7 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
auto closeCallback = [this, originalCallback](Result result) {
if (result == ResultOk) {
- shutdown();
+ internalShutdown();
}
if (originalCallback) {
originalCallback(result);
@@ -331,9 +332,8 @@ void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
}
// `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()`
-void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
- const unsigned int partitionIndex,
- CloseCallback callback) {
+void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result, unsigned int partitionIndex,
+ const CloseCallback& callback) {
if (state_ == Failed) {
// we should have already notified the client by callback
return;
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 610c74ed..40f2d34d 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -20,6 +20,7 @@
#include
#include
+#include
#include
#include
#include
@@ -47,7 +48,7 @@ using TopicNamePtr = std::shared_ptr;
class PartitionedProducerImpl : public ProducerImplBase,
public std::enable_shared_from_this {
public:
- enum State
+ enum State : uint8_t
{
Pending,
Ready,
@@ -59,8 +60,9 @@ class PartitionedProducerImpl : public ProducerImplBase,
typedef std::unique_lock Lock;
- PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
- const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors);
+ PartitionedProducerImpl(const ClientImplPtr& ptr, const TopicNamePtr& topicName,
+ unsigned int numPartitions, const ProducerConfiguration& config,
+ const ProducerInterceptorsPtr& interceptors);
virtual ~PartitionedProducerImpl();
// overrided methods from ProducerImplBase
@@ -75,6 +77,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void closeAsync(CloseCallback callback) override;
void start() override;
void shutdown() override;
+ void internalShutdown();
bool isClosed() override;
const std::string& getTopic() const override;
Future getProducerCreatedFuture() override;
@@ -82,11 +85,12 @@ class PartitionedProducerImpl : public ProducerImplBase,
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;
- void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
+ void handleSinglePartitionProducerCreated(Result result,
+ const ProducerImplBaseWeakPtr& producerBaseWeakPtr,
const unsigned int partitionIndex);
void createLazyPartitionProducer(const unsigned int partitionIndex);
- void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
- CloseCallback callback);
+ void handleSinglePartitionProducerClose(Result result, unsigned int partitionIndex,
+ const CloseCallback& callback);
void notifyResult(CloseCallback closeCallback);
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc
index 73aa874a..9f3fbb9c 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -30,10 +30,10 @@ using namespace pulsar;
using std::chrono::seconds;
PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
- ClientImplPtr client, const std::string pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode,
+ const ClientImplPtr& client, const std::string& pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode,
const std::vector& topics, const std::string& subscriptionName,
- const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_,
- const ConsumerInterceptorsPtr interceptors)
+ const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_,
+ const ConsumerInterceptorsPtr& interceptors)
: MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf,
lookupServicePtr_, interceptors),
patternString_(pattern),
@@ -89,8 +89,8 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er
std::placeholders::_1, std::placeholders::_2));
}
-void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result result,
- const NamespaceTopicsPtr topics) {
+void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(Result result,
+ const NamespaceTopicsPtr& topics) {
if (result != ResultOk) {
LOG_ERROR("Error in Getting topicsOfNameSpace. result: " << result);
resetAutoDiscoveryTimer();
@@ -132,7 +132,8 @@ void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result resu
onTopicsAdded(topicsAdded, topicsAddedCallback);
}
-void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback) {
+void PatternMultiTopicsConsumerImpl::onTopicsAdded(const NamespaceTopicsPtr& addedTopics,
+ const ResultCallback& callback) {
// start call subscribeOneTopicAsync for each single topic
if (addedTopics->empty()) {
@@ -152,9 +153,9 @@ void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr addedTopic
}
}
-void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result, const std::string& topic,
- std::shared_ptr> topicsNeedCreate,
- ResultCallback callback) {
+void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(
+ Result result, const std::string& topic, const std::shared_ptr>& topicsNeedCreate,
+ const ResultCallback& callback) {
(*topicsNeedCreate)--;
if (result != ResultOk) {
@@ -169,8 +170,8 @@ void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result, co
}
}
-void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedTopics,
- ResultCallback callback) {
+void PatternMultiTopicsConsumerImpl::onTopicsRemoved(const NamespaceTopicsPtr& removedTopics,
+ const ResultCallback& callback) {
// start call subscribeOneTopicAsync for each single topic
if (removedTopics->empty()) {
LOG_DEBUG("no topics need unsubscribe");
@@ -208,7 +209,7 @@ NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(
for (const auto& topicStr : topics) {
auto topic = TopicName::removeDomain(topicStr);
if (PULSAR_REGEX_NAMESPACE::regex_match(topic, pattern)) {
- topicsResultPtr->push_back(std::move(topicStr));
+ topicsResultPtr->push_back(topicStr);
}
}
return topicsResultPtr;
@@ -241,12 +242,12 @@ void PatternMultiTopicsConsumerImpl::start() {
}
}
-void PatternMultiTopicsConsumerImpl::shutdown() {
+PatternMultiTopicsConsumerImpl::~PatternMultiTopicsConsumerImpl() {
cancelTimers();
- MultiTopicsConsumerImpl::shutdown();
+ internalShutdown();
}
-void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+void PatternMultiTopicsConsumerImpl::closeAsync(const ResultCallback& callback) {
cancelTimers();
MultiTopicsConsumerImpl::closeAsync(callback);
}
diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h
index f272df22..63527965 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -48,12 +48,13 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
// which only contains after namespace part.
// when subscribe, client will first get all topics that match given pattern.
// `topics` contains the topics that match `patternString`.
- PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string patternString,
+ PatternMultiTopicsConsumerImpl(const ClientImplPtr& client, const std::string& patternString,
CommandGetTopicsOfNamespace_Mode getTopicsMode,
const std::vector& topics,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
- const LookupServicePtr lookupServicePtr_,
- const ConsumerInterceptorsPtr interceptors);
+ const LookupServicePtr& lookupServicePtr_,
+ const ConsumerInterceptorsPtr& interceptors);
+ ~PatternMultiTopicsConsumerImpl() override;
const PULSAR_REGEX_NAMESPACE::regex getPattern();
@@ -67,9 +68,8 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
static NamespaceTopicsPtr topicsListsMinus(std::vector& list1,
std::vector& list2);
- virtual void closeAsync(ResultCallback callback);
- virtual void start();
- virtual void shutdown();
+ void closeAsync(const ResultCallback& callback) override;
+ void start() override;
private:
const std::string patternString_;
@@ -82,11 +82,12 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
void cancelTimers() noexcept;
void resetAutoDiscoveryTimer();
- void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics);
- void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback);
- void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback callback);
- void handleOneTopicAdded(const Result result, const std::string& topic,
- std::shared_ptr> topicsNeedCreate, ResultCallback callback);
+ void timerGetTopicsOfNamespace(Result result, const NamespaceTopicsPtr& topics);
+ void onTopicsAdded(const NamespaceTopicsPtr& addedTopics, const ResultCallback& callback);
+ void onTopicsRemoved(const NamespaceTopicsPtr& removedTopics, const ResultCallback& callback);
+ void handleOneTopicAdded(Result result, const std::string& topic,
+ const std::shared_ptr>& topicsNeedCreate,
+ const ResultCallback& callback);
std::weak_ptr weak_from_this() noexcept {
return std::static_pointer_cast(shared_from_this());
diff --git a/lib/Producer.cc b/lib/Producer.cc
index fc588ba4..1adf23c4 100644
--- a/lib/Producer.cc
+++ b/lib/Producer.cc
@@ -28,7 +28,7 @@ static const std::string EMPTY_STRING;
Producer::Producer() : impl_() {}
-Producer::Producer(ProducerImplBasePtr impl) : impl_(impl) {}
+Producer::Producer(const ProducerImplBasePtr& impl) : impl_(impl) {}
const std::string& Producer::getTopic() const { return impl_ != NULL ? impl_->getTopic() : EMPTY_STRING; }
@@ -58,7 +58,7 @@ Result Producer::send(const Message& msg, MessageId& messageId) {
return promise.getFuture().get(messageId);
}
-void Producer::sendAsync(const Message& msg, SendCallback callback) {
+void Producer::sendAsync(const Message& msg, const SendCallback& callback) {
if (!impl_) {
callback(ResultProducerNotInitialized, msg.getMessageId());
return;
@@ -82,7 +82,7 @@ Result Producer::close() {
return result;
}
-void Producer::closeAsync(CloseCallback callback) {
+void Producer::closeAsync(const CloseCallback& callback) {
if (!impl_) {
callback(ResultProducerNotInitialized);
return;
@@ -100,7 +100,7 @@ Result Producer::flush() {
return result;
}
-void Producer::flushAsync(FlushCallback callback) {
+void Producer::flushAsync(const FlushCallback& callback) {
if (!impl_) {
callback(ResultProducerNotInitialized);
return;
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 0e141ae1..278871c4 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -180,7 +180,7 @@ ProducerConfiguration::BatchingType ProducerConfiguration::getBatchingType() con
const CryptoKeyReaderPtr ProducerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
ProducerConfiguration& ProducerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
- impl_->cryptoKeyReader = cryptoKeyReader;
+ impl_->cryptoKeyReader = std::move(cryptoKeyReader);
return *this;
}
@@ -201,7 +201,7 @@ bool ProducerConfiguration::isEncryptionEnabled() const {
return (!impl_->encryptionKeys.empty() && (impl_->cryptoKeyReader != NULL));
}
-ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key) {
+ProducerConfiguration& ProducerConfiguration::addEncryptionKey(const std::string& key) {
impl_->encryptionKeys.insert(key);
return *this;
}
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 4399ce5f..cc8f3204 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -48,7 +48,7 @@ DECLARE_LOG_OBJECT()
using std::chrono::milliseconds;
-ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
+ProducerImpl::ProducerImpl(const ClientImplPtr& client, const TopicName& topicName,
const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors,
int32_t partition, bool retryOnCreationError)
: HandlerBase(client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition),
@@ -114,11 +114,11 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
}
ProducerImpl::~ProducerImpl() {
- LOG_DEBUG(getName() << "~ProducerImpl");
- shutdown();
+ LOG_DEBUG(producerStr_ << "~ProducerImpl");
+ internalShutdown();
printStats();
if (state_ == Ready || state_ == Pending) {
- LOG_WARN(getName() << "Destroyed producer which was not properly closed");
+ LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed");
}
}
@@ -352,7 +352,7 @@ void ProducerImpl::failPendingMessages(Result result, bool withLock) {
}
}
-void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
+void ProducerImpl::resendMessages(const ClientConnectionPtr& cnx) {
if (pendingMessagesQueue_.empty()) {
return;
}
@@ -879,7 +879,7 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
return true;
}
- std::unique_ptr op{std::move(pendingMessagesQueue_.front().release())};
+ std::unique_ptr op{pendingMessagesQueue_.front().release()};
uint64_t expectedSequenceId = op->sendArgs->sequenceId;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack failure for msg " << sequenceId //
@@ -993,7 +993,9 @@ void ProducerImpl::start() {
}
}
-void ProducerImpl::shutdown() {
+void ProducerImpl::shutdown() { internalShutdown(); }
+
+void ProducerImpl::internalShutdown() {
resetCnx();
interceptors_->close();
auto client = client_.lock();
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index f650b1f9..77bd6d1a 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -71,7 +71,7 @@ class MessageMetadata;
class ProducerImpl : public HandlerBase, public ProducerImplBase {
public:
- ProducerImpl(ClientImplPtr client, const TopicName& topic,
+ ProducerImpl(const ClientImplPtr& client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration,
const ProducerInterceptorsPtr& interceptors, int32_t partition = -1,
bool retryOnCreationError = false);
@@ -85,6 +85,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
void closeAsync(CloseCallback callback) override;
void start() override;
void shutdown() override;
+ void internalShutdown();
bool isClosed() override;
const std::string& getTopic() const override;
Future getProducerCreatedFuture() override;
@@ -143,7 +144,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
- void resendMessages(ClientConnectionPtr cnx);
+ void resendMessages(const ClientConnectionPtr& cnx);
void refreshEncryptionKey(const ASIO_ERROR& ec);
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h
index 27cfced1..25a12c8a 100644
--- a/lib/ProducerImplBase.h
+++ b/lib/ProducerImplBase.h
@@ -41,8 +41,8 @@ class ProducerImplBase {
virtual void sendAsync(const Message& msg, SendCallback callback) = 0;
virtual void closeAsync(CloseCallback callback) = 0;
virtual void start() = 0;
- virtual void shutdown() = 0;
virtual bool isClosed() = 0;
+ virtual void shutdown() = 0;
virtual const std::string& getTopic() const = 0;
virtual Future getProducerCreatedFuture() = 0;
virtual void triggerFlush() = 0;
diff --git a/lib/ProtobufNativeSchema.cc b/lib/ProtobufNativeSchema.cc
index 5cddf74f..632943da 100644
--- a/lib/ProtobufNativeSchema.cc
+++ b/lib/ProtobufNativeSchema.cc
@@ -39,8 +39,8 @@ SchemaInfo createProtobufNativeSchema(const google::protobuf::Descriptor* descri
}
const auto fileDescriptor = descriptor->file();
- const std::string rootMessageTypeName = descriptor->full_name();
- const std::string rootFileDescriptorName = fileDescriptor->name();
+ const std::string& rootMessageTypeName = descriptor->full_name();
+ const std::string& rootFileDescriptorName = fileDescriptor->name();
FileDescriptorSet fileDescriptorSet;
internalCollectFileDescriptors(fileDescriptor, fileDescriptorSet);
diff --git a/lib/Reader.cc b/lib/Reader.cc
index c02fb2e5..e7d907a3 100644
--- a/lib/Reader.cc
+++ b/lib/Reader.cc
@@ -29,7 +29,7 @@ static const std::string EMPTY_STRING;
Reader::Reader() : impl_() {}
-Reader::Reader(ReaderImplPtr impl) : impl_(impl) {}
+Reader::Reader(const ReaderImplPtr& impl) : impl_(impl) {}
const std::string& Reader::getTopic() const { return impl_ != NULL ? impl_->getTopic() : EMPTY_STRING; }
@@ -49,7 +49,7 @@ Result Reader::readNext(Message& msg, int timeoutMs) {
return impl_->readNext(msg, timeoutMs);
}
-void Reader::readNextAsync(ReadNextCallback callback) {
+void Reader::readNextAsync(const ReadNextCallback& callback) {
if (!impl_) {
return callback(ResultConsumerNotInitialized, {});
}
@@ -66,7 +66,7 @@ Result Reader::close() {
return result;
}
-void Reader::closeAsync(ResultCallback callback) {
+void Reader::closeAsync(const ResultCallback& callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
@@ -75,7 +75,7 @@ void Reader::closeAsync(ResultCallback callback) {
impl_->closeAsync(callback);
}
-void Reader::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
+void Reader::hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized, false);
return;
@@ -91,7 +91,7 @@ Result Reader::hasMessageAvailable(bool& hasMessageAvailable) {
return promise.getFuture().get(hasMessageAvailable);
}
-void Reader::seekAsync(const MessageId& msgId, ResultCallback callback) {
+void Reader::seekAsync(const MessageId& msgId, const ResultCallback& callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
@@ -99,7 +99,7 @@ void Reader::seekAsync(const MessageId& msgId, ResultCallback callback) {
impl_->seekAsync(msgId, callback);
}
-void Reader::seekAsync(uint64_t timestamp, ResultCallback callback) {
+void Reader::seekAsync(uint64_t timestamp, const ResultCallback& callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
@@ -125,7 +125,7 @@ Result Reader::seek(uint64_t timestamp) {
bool Reader::isConnected() const { return impl_ && impl_->isConnected(); }
-void Reader::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+void Reader::getLastMessageIdAsync(const GetLastMessageIdCallback& callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized, MessageId());
return;
diff --git a/lib/ReaderConfiguration.cc b/lib/ReaderConfiguration.cc
index f1dba5eb..054010f8 100644
--- a/lib/ReaderConfiguration.cc
+++ b/lib/ReaderConfiguration.cc
@@ -41,7 +41,7 @@ ReaderConfiguration& ReaderConfiguration::setSchema(const SchemaInfo& schemaInfo
const SchemaInfo& ReaderConfiguration::getSchema() const { return impl_->schemaInfo; }
ReaderConfiguration& ReaderConfiguration::setReaderListener(ReaderListener readerListener) {
- impl_->readerListener = readerListener;
+ impl_->readerListener = std::move(readerListener);
impl_->hasReaderListener = true;
return *this;
}
@@ -71,7 +71,7 @@ bool ReaderConfiguration::isReadCompacted() const { return impl_->readCompacted;
void ReaderConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }
void ReaderConfiguration::setInternalSubscriptionName(std::string internalSubscriptionName) {
- impl_->internalSubscriptionName = internalSubscriptionName;
+ impl_->internalSubscriptionName = std::move(internalSubscriptionName);
}
const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
@@ -107,7 +107,7 @@ bool ReaderConfiguration::isEncryptionEnabled() const { return impl_->cryptoKeyR
const CryptoKeyReaderPtr ReaderConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
ReaderConfiguration& ReaderConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
- impl_->cryptoKeyReader = cryptoKeyReader;
+ impl_->cryptoKeyReader = std::move(cryptoKeyReader);
return *this;
}
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index f41106e4..7fa7e8b9 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -36,9 +36,9 @@ ConsumerConfiguration consumerConfigOfReader;
static ResultCallback emptyCallback;
-ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
- const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
- ReaderCallback readerCreatedCallback)
+ReaderImpl::ReaderImpl(const ClientImplPtr& client, const std::string& topic, int partitions,
+ const ReaderConfiguration& conf, const ExecutorServicePtr& listenerExecutor,
+ const ReaderCallback& readerCreatedCallback)
: topic_(topic),
partitions_(partitions),
client_(client),
@@ -46,7 +46,7 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int
readerCreatedCallback_(readerCreatedCallback) {}
void ReaderImpl::start(const MessageId& startMessageId,
- std::function callback) {
+ const std::function& callback) {
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
@@ -130,7 +130,7 @@ Result ReaderImpl::readNext(Message& msg, int timeoutMs) {
return res;
}
-void ReaderImpl::readNextAsync(ReceiveCallback callback) {
+void ReaderImpl::readNextAsync(const ReceiveCallback& callback) {
auto self = shared_from_this();
consumer_->receiveAsync([self, callback](Result result, const Message& message) {
self->acknowledgeIfNecessary(result, message);
@@ -138,7 +138,7 @@ void ReaderImpl::readNextAsync(ReceiveCallback callback) {
});
}
-void ReaderImpl::messageListener(Consumer consumer, const Message& msg) {
+void ReaderImpl::messageListener(const Consumer& consumer, const Message& msg) {
readerListener_(Reader(shared_from_this()), msg);
acknowledgeIfNecessary(ResultOk, msg);
}
@@ -156,20 +156,20 @@ void ReaderImpl::acknowledgeIfNecessary(Result result, const Message& msg) {
}
}
-void ReaderImpl::closeAsync(ResultCallback callback) { consumer_->closeAsync(callback); }
+void ReaderImpl::closeAsync(const ResultCallback& callback) { consumer_->closeAsync(callback); }
-void ReaderImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
+void ReaderImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) {
consumer_->hasMessageAvailableAsync(callback);
}
-void ReaderImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
+void ReaderImpl::seekAsync(const MessageId& msgId, const ResultCallback& callback) {
consumer_->seekAsync(msgId, callback);
}
-void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
+void ReaderImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) {
consumer_->seekAsync(timestamp, callback);
}
-void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+void ReaderImpl::getLastMessageIdAsync(const GetLastMessageIdCallback& callback) {
consumer_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
callback(result, response.getLastMessageId());
});
diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h
index 3731990d..020a5037 100644
--- a/lib/ReaderImpl.h
+++ b/lib/ReaderImpl.h
@@ -58,35 +58,36 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader;
class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this {
public:
- ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
- const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
- ReaderCallback readerCreatedCallback);
+ ReaderImpl(const ClientImplPtr& client, const std::string& topic, int partitions,
+ const ReaderConfiguration& conf, const ExecutorServicePtr& listenerExecutor,
+ const ReaderCallback& readerCreatedCallback);
- void start(const MessageId& startMessageId, std::function callback);
+ void start(const MessageId& startMessageId,
+ const std::function& callback);
const std::string& getTopic() const;
Result readNext(Message& msg);
Result readNext(Message& msg, int timeoutMs);
- void readNextAsync(ReceiveCallback callback);
+ void readNextAsync(const ReceiveCallback& callback);
- void closeAsync(ResultCallback callback);
+ void closeAsync(const ResultCallback& callback);
Future getReaderCreatedFuture();
ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; }
- void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
+ void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback);
- void seekAsync(const MessageId& msgId, ResultCallback callback);
- void seekAsync(uint64_t timestamp, ResultCallback callback);
+ void seekAsync(const MessageId& msgId, const ResultCallback& callback);
+ void seekAsync(uint64_t timestamp, const ResultCallback& callback);
- void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+ void getLastMessageIdAsync(const GetLastMessageIdCallback& callback);
bool isConnected() const;
private:
- void messageListener(Consumer consumer, const Message& msg);
+ void messageListener(const Consumer& consumer, const Message& msg);
void acknowledgeIfNecessary(Result result, const Message& msg);
diff --git a/lib/Schema.cc b/lib/Schema.cc
index c8f8b4d1..47777763 100644
--- a/lib/Schema.cc
+++ b/lib/Schema.cc
@@ -53,7 +53,7 @@ PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
return "UnknownSchemaType";
}
-PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string encodingTypeStr) {
+PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(const std::string &encodingTypeStr) {
if (encodingTypeStr == "INLINE") {
return KeyValueEncodingType::INLINE;
} else if (encodingTypeStr == "SEPARATED") {
@@ -104,7 +104,7 @@ PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType) {
return "UnknownSchemaType";
}
-PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr) {
+PULSAR_PUBLIC SchemaType enumSchemaType(const std::string &schemaTypeStr) {
if (schemaTypeStr == "NONE") {
return NONE;
} else if (schemaTypeStr == "STRING") {
@@ -181,8 +181,8 @@ SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchem
properties.emplace(VALUE_SCHEMA_PROPS, writeJson(valueSchema.getProperties()));
properties.emplace(KV_ENCODING_TYPE, strEncodingType(keyValueEncodingType));
- std::string keySchemaStr = keySchema.getSchema();
- std::string valueSchemaStr = valueSchema.getSchema();
+ const auto &keySchemaStr = keySchema.getSchema();
+ const auto &valueSchemaStr = valueSchema.getSchema();
impl_ = std::make_shared(KEY_VALUE, "KeyValue",
mergeKeyValueSchema(keySchemaStr, valueSchemaStr), properties);
}
diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h
index 26fc59ed..033802fc 100644
--- a/lib/SharedBuffer.h
+++ b/lib/SharedBuffer.h
@@ -46,8 +46,8 @@ class SharedBuffer {
SharedBuffer& operator=(const SharedBuffer&) = default;
// Move constructor.
- SharedBuffer(SharedBuffer&& right) { *this = std::move(right); }
- SharedBuffer& operator=(SharedBuffer&& right) {
+ SharedBuffer(SharedBuffer&& right) noexcept { *this = std::move(right); }
+ SharedBuffer& operator=(SharedBuffer&& right) noexcept {
this->data_ = std::move(right.data_);
this->ptr_ = right.ptr_;
diff --git a/lib/TableView.cc b/lib/TableView.cc
index 2ff6e621..fc632315 100644
--- a/lib/TableView.cc
+++ b/lib/TableView.cc
@@ -25,7 +25,7 @@ namespace pulsar {
TableView::TableView() {}
-TableView::TableView(TableViewImplPtr impl) : impl_(impl) {}
+TableView::TableView(TableViewImplPtr impl) : impl_(std::move(impl)) {}
bool TableView::retrieveValue(const std::string& key, std::string& value) {
if (impl_) {
@@ -62,19 +62,19 @@ std::size_t TableView::size() const {
return 0;
}
-void TableView::forEach(TableViewAction action) {
+void TableView::forEach(const TableViewAction& action) {
if (impl_) {
impl_->forEach(action);
}
}
-void TableView::forEachAndListen(TableViewAction action) {
+void TableView::forEachAndListen(const TableViewAction& action) {
if (impl_) {
impl_->forEachAndListen(action);
}
}
-void TableView::closeAsync(ResultCallback callback) {
+void TableView::closeAsync(const ResultCallback& callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index e434e60c..e283a6fc 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -28,7 +28,7 @@ namespace pulsar {
DECLARE_LOG_OBJECT()
-TableViewImpl::TableViewImpl(ClientImplPtr client, const std::string& topic,
+TableViewImpl::TableViewImpl(const ClientImplPtr& client, const std::string& topic,
const TableViewConfiguration& conf)
: client_(client), topic_(topic), conf_(conf) {}
@@ -40,7 +40,7 @@ Future TableViewImpl::start() {
readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName);
TableViewImplPtr self = shared_from_this();
- ReaderCallback readerCallback = [self, promise](Result res, Reader reader) {
+ ReaderCallback readerCallback = [self, promise](Result res, const Reader& reader) {
if (res == ResultOk) {
self->reader_ = reader.impl_;
self->readAllExistingMessages(promise, TimeUtils::currentTimeMillis(), 0);
@@ -76,15 +76,15 @@ std::unordered_map TableViewImpl::snapshot() { return
std::size_t TableViewImpl::size() const { return data_.size(); }
-void TableViewImpl::forEach(TableViewAction action) { data_.forEach(action); }
+void TableViewImpl::forEach(const TableViewAction& action) { data_.forEach(action); }
-void TableViewImpl::forEachAndListen(TableViewAction action) {
+void TableViewImpl::forEachAndListen(const TableViewAction& action) {
data_.forEach(action);
Lock lock(listenersMutex_);
listeners_.emplace_back(action);
}
-void TableViewImpl::closeAsync(ResultCallback callback) {
+void TableViewImpl::closeAsync(const ResultCallback& callback) {
if (reader_) {
reader_->closeAsync([callback, this](Result result) {
reader_.reset();
@@ -118,7 +118,7 @@ void TableViewImpl::handleMessage(const Message& msg) {
}
}
-void TableViewImpl::readAllExistingMessages(Promise promise, long startTime,
+void TableViewImpl::readAllExistingMessages(const Promise& promise, long startTime,
long messagesRead) {
std::weak_ptr weakSelf{shared_from_this()};
reader_->hasMessageAvailableAsync(
diff --git a/lib/TableViewImpl.h b/lib/TableViewImpl.h
index 3ca3f754..eda5247d 100644
--- a/lib/TableViewImpl.h
+++ b/lib/TableViewImpl.h
@@ -41,7 +41,7 @@ typedef std::shared_ptr