Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support in cpp client for 1 partitioned topic #5016

Merged
merged 2 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
CreateProducerCallback callback) {
if (!result) {
ProducerImplBasePtr producer;
if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
partitionMetadata->getPartitions(), conf);
} else {
Expand Down Expand Up @@ -221,7 +221,7 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
return;
}

if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
callback(ResultOperationNotSupported, Reader());
return;
Expand Down Expand Up @@ -360,7 +360,7 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
conf.setConsumerName(generateRandomName());
}
ConsumerImplBasePtr consumer;
if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
if (conf.getReceiverQueueSize() == 0) {
LOG_ERROR("Can't use partitioned topic if the queue size is 0.");
callback(ResultInvalidConfiguration, Consumer());
Expand Down Expand Up @@ -435,7 +435,7 @@ void ClientImpl::handleGetPartitions(const Result result, const LookupDataResult

StringList partitions;

if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
for (unsigned int i = 0; i < partitionMetadata->getPartitions(); i++) {
partitions.push_back(topicName->getTopicPartitionName(i));
}
Expand Down
16 changes: 9 additions & 7 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,23 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));

int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions() : 1;
int numPartitions = partitionMetadata->getPartitions();
int partitions = numPartitions == 0 ? 1 : numPartitions;

// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions)));
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));

Lock lock(mutex_);
topicsPartitions_.insert(std::make_pair(topicName->toString(), numPartitions));
topicsPartitions_.insert(std::make_pair(topicName->toString(), partitions));
lock.unlock();
numberTopicPartitions_->fetch_add(numPartitions);
numberTopicPartitions_->fetch_add(partitions);

std::shared_ptr<std::atomic<int>> partitionsNeedCreate =
std::make_shared<std::atomic<int>>(numPartitions);
std::shared_ptr<std::atomic<int>> partitionsNeedCreate = std::make_shared<std::atomic<int>>(partitions);

if (numPartitions == 1) {
// non-partitioned topic
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
internalListenerExecutor, NonPartitioned);
Expand Down
69 changes: 69 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2978,3 +2978,72 @@ TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
timeWaited += 500;
}
}

TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "testPartitionedTopicWithOnePartition";
std::string subsName = topicName + "-sub-";

// call admin api to make 1 partition
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int putRes = makePutRequest(url, "1");
LOG_INFO("res = " << putRes);
ASSERT_FALSE(putRes != 204 && putRes != 409);

Consumer consumer1;
ConsumerConfiguration conf;
Result result = client.subscribe(topicName, subsName + "1", consumer1);
ASSERT_EQ(ResultOk, result);

Consumer consumer2;
result = client.subscribe(topicName + "-partition-0", subsName + "2", consumer2);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 2 consumer");

Producer producer1;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
result = client.createProducer(topicName, producerConf, producer1);
ASSERT_EQ(ResultOk, result);

Producer producer2;
result = client.createProducer(topicName + "-partition-0", producerConf, producer2);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 2 producer");

// create messages
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
Message msg = MessageBuilder().setContent("test-producer1-" + topicName + std::to_string(i)).build();
producer1.send(msg);
msg = MessageBuilder().setContent("test-producer2-" + topicName + std::to_string(i)).build();
producer2.send(msg);
}

// produced 10 messages by each producer.
// expected receive 20 messages by each consumer.
for (int i = 0; i < numMessages * 2; i++) {
LOG_INFO("begin to receive message " << i);

Message msg;
Result res = consumer1.receive(msg, 100);
ASSERT_EQ(ResultOk, res);
consumer1.acknowledge(msg);

res = consumer2.receive(msg, 100);
ASSERT_EQ(ResultOk, res);
consumer2.acknowledge(msg);
}

// No more messages expected
Message msg;
Result res = consumer1.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);

res = consumer2.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);
client.shutdown();
}