Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 11493] Fix #11493. Simple implementation of getting number of references from C++ client #11535

Merged
14 changes: 14 additions & 0 deletions pulsar-client-cpp/include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,20 @@ class PULSAR_PUBLIC Client {
*/
void shutdown();

/**
* @brief Get the number of alive producers on the current client.
*
* @return The number of alive producers on the current client.
*/
uint64_t getNumberOfProducers();

/**
* @brief Get the number of alive consumers on the current client.
*
* @return The number of alive consumers on the current client.
*/
uint64_t getNumberOfConsumers();

private:
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections);
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,7 @@ Result Client::close() {
void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); }

void Client::shutdown() { impl_->shutdown(); }

uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); }
uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); }
} // namespace pulsar
24 changes: 24 additions & 0 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,30 @@ uint64_t ClientImpl::newRequestId() {
return requestIdGenerator_++;
}

uint64_t ClientImpl::getNumberOfProducers() {
Lock lock(mutex_);
uint64_t numberOfAliveProducers = 0;
for (const auto& producer : producers_) {
const auto& producerImpl = producer.lock();
if (producerImpl) {
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
}
}
return numberOfAliveProducers;
}

uint64_t ClientImpl::getNumberOfConsumers() {
Lock lock(mutex_);
uint64_t numberOfAliveConsumers = 0;
for (const auto& consumer : consumers_) {
const auto consumerImpl = consumer.lock();
if (consumerImpl) {
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
}
}
return numberOfAliveConsumers;
}

const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; }

} /* namespace pulsar */
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
uint64_t newConsumerId();
uint64_t newRequestId();

uint64_t getNumberOfProducers();
uint64_t getNumberOfConsumers();

const ClientConfiguration& getClientConfig() const;

const ClientConfiguration& conf() const;
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1233,4 +1233,6 @@ bool ConsumerImpl::isConnected() const {
return !getCnx().expired() && state_ == Ready;
}

uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }

} /* namespace pulsar */
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class ConsumerImpl : public ConsumerImplBase,
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;

virtual void disconnectConsumer();
Result fetchSingleMessageFromBroker(Message& msg);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ConsumerImplBase {
virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedConsumer() = 0;

private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,16 @@ bool MultiTopicsConsumerImpl::isConnected() const {
}
return true;
}

uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
Lock lock(mutex_);
uint64_t numberOfConnectedConsumer = 0;
const auto consumers = consumers_;
Sunny-Island marked this conversation as resolved.
Show resolved Hide resolved
lock.unlock();
for (const auto& topicAndConsumer : consumers) {
if (topicAndConsumer.second->isConnected()) {
numberOfConnectedConsumer++;
}
}
return numberOfConnectedConsumer;
}
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
// return first topic name when all topics name valid, or return null pointer
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,4 +635,17 @@ bool PartitionedConsumerImpl::isConnected() const {
return true;
}

uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() {
uint64_t numberOfConnectedConsumer = 0;
Lock consumersLock(consumersMutex_);
const auto consumers = consumers_;
consumersLock.unlock();
for (const auto& consumer : consumers) {
if (consumer->isConnected()) {
numberOfConnectedConsumer++;
}
}
return numberOfConnectedConsumer;
}

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,17 @@ bool PartitionedProducerImpl::isConnected() const {
return true;
}

uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
uint64_t numberOfConnectedProducer = 0;
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers) {
if (producer->isConnected()) {
numberOfConnectedProducer++;
}
}
return numberOfConnectedProducer;
}

} // namespace pulsar
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;

uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,5 +826,7 @@ bool ProducerImpl::isConnected() const {
return !getCnx().expired() && state_ == Ready;
}

uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }

} // namespace pulsar
/* namespace pulsar */
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ProducerImpl : public HandlerBase,
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;

bool removeCorruptMessage(uint64_t sequenceId);

Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ProducerImplBase {
virtual void triggerFlush() = 0;
virtual void flushAsync(FlushCallback callback) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedProducer() = 0;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
61 changes: 61 additions & 0 deletions pulsar-client-cpp/tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include <gtest/gtest.h>

#include "HttpHelper.h"

#include <future>
#include <pulsar/Client.h>
#include "../lib/checksum/ChecksumProvider.h"
Expand Down Expand Up @@ -114,3 +116,62 @@ TEST(ClientTest, testConnectTimeout) {
clientLow.close();
clientDefault.close();
}

TEST(ClientTest, testGetNumberOfReferences) {
Client client("pulsar://localhost:6650");

// Producer test
uint64_t numberOfProducers = 0;
const std::string nonPartitionedTopic =
"testGetNumberOfReferencesNonPartitionedTopic" + std::to_string(time(nullptr));

const std::string partitionedTopic =
"testGetNumberOfReferencesPartitionedTopic" + std::to_string(time(nullptr));
Producer producer;
client.createProducer(nonPartitionedTopic, producer);
numberOfProducers = 1;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());

producer.close();
numberOfProducers = 0;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());

// PartitionedProducer
int res = makePutRequest(
"http://localhost:8080/admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

client.createProducer(partitionedTopic, producer);
numberOfProducers = 2;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());
Sunny-Island marked this conversation as resolved.
Show resolved Hide resolved
producer.close();
numberOfProducers = 0;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());

// Consumer test
uint64_t numberOfConsumers = 0;

Consumer consumer1;
client.subscribe(nonPartitionedTopic, "consumer-1", consumer1);
numberOfConsumers = 1;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());

consumer1.close();
numberOfConsumers = 0;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());

Consumer consumer2;
Consumer consumer3;
client.subscribe(partitionedTopic, "consumer-2", consumer2);
numberOfConsumers = 2;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
client.subscribe(nonPartitionedTopic, "consumer-3", consumer3);
numberOfConsumers = 3;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
consumer2.close();
consumer3.close();
numberOfConsumers = 0;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());

client.close();
}