Skip to content

Commit

Permalink
add support in cpp client for 1 partitioned topic
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai committed Aug 22, 2019
1 parent aa77572 commit e028c6d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 6 deletions.
8 changes: 4 additions & 4 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
CreateProducerCallback callback) {
if (!result) {
ProducerImplBasePtr producer;
if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
partitionMetadata->getPartitions(), conf);
} else {
Expand Down Expand Up @@ -221,7 +221,7 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
return;
}

if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
callback(ResultOperationNotSupported, Reader());
return;
Expand Down Expand Up @@ -360,7 +360,7 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
conf.setConsumerName(generateRandomName());
}
ConsumerImplBasePtr consumer;
if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
if (conf.getReceiverQueueSize() == 0) {
LOG_ERROR("Can't use partitioned topic if the queue size is 0.");
callback(ResultInvalidConfiguration, Consumer());
Expand Down Expand Up @@ -435,7 +435,7 @@ void ClientImpl::handleGetPartitions(const Result result, const LookupDataResult

StringList partitions;

if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
for (unsigned int i = 0; i < partitionMetadata->getPartitions(); i++) {
partitions.push_back(topicName->getTopicPartitionName(i));
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));

int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions() : 1;
int numPartitions = partitionMetadata->getPartitions();
// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
Expand All @@ -176,7 +176,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate =
std::make_shared<std::atomic<int>>(numPartitions);

if (numPartitions == 1) {
// non-partitioned topic
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
internalListenerExecutor, NonPartitioned);
Expand Down
67 changes: 67 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2978,3 +2978,70 @@ TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
timeWaited += 500;
}
}

TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "testPartitionedTopicWithOnePartition";
std::string subsName = topicName + "-sub-";

// call admin api to make 1 partition
std::string url =
adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int putRes = makePutRequest(url, "1");
LOG_INFO("res = " << putRes);
ASSERT_FALSE(putRes != 204 && putRes != 409);

Consumer consumer1;
ConsumerConfiguration conf;
Result result = client.subscribe(topicName, subsName + "1", consumer1);
ASSERT_EQ(ResultOk, result);

Consumer consumer2;
result = client.subscribe(topicName + "-partition-0", subsName + "2", consumer2);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 2 consumer");

Producer producer1;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
result = client.createProducer(topicName, producerConf, producer1);
ASSERT_EQ(ResultOk, result);

Producer producer2;
result = client.createProducer(topicName + "-partition-0", producerConf, producer2);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 2 producer");

// create messages
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
Message msg = MessageBuilder().setContent("test-" + topicName + std::to_string(i)).build();
producer1.send(msg);
producer2.send(msg);
}

// produced 10 messages by each producer.
// expected receive 20 messages by each consumer.
for (int i = 0; i < numMessages * 2; i++) {
Message msg;
Result res = consumer1.receive(msg, 100);
ASSERT_EQ(ResultOk, res);
consumer1.acknowledge(msg);

res = consumer2.receive(msg, 100);
ASSERT_EQ(ResultOk, res);
consumer2.acknowledge(msg);
}

// No more messages expected
Message msg;
Result res = consumer1.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);

res = consumer2.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);
client.shutdown();
}

0 comments on commit e028c6d

Please sign in to comment.