Skip to content

Commit

Permalink
Merge Request for #4808: TYPO in C++ client producer method for proce…
Browse files Browse the repository at this point in the history
…ssing failure case, and add corresponding unit test case. (#4873)

Definitely, this is a typo. This method is dealing with the Failed Message with the GIVEN result, but not a CERTAIN result.

Contribution Checklist
#4808 : TYPO in C++ client producer method for processing failure case
Add c++ client producer failure message unit test case.

UT passed:

BatchMessageTest
(cherry picked from commit b90b4ea)
  • Loading branch information
easyfan authored and jiazhai committed Aug 28, 2019
1 parent ade9542 commit e4790f3
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pulsar-client-cpp/include/pulsar/Producer.h
Expand Up @@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer {
friend class PulsarWrapper;

ProducerImplBasePtr impl_;

// For unit test case BatchMessageTest::producerFailureResult only
void producerFailMessages(Result result);
};
} // namespace pulsar

Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/Producer.cc
Expand Up @@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) {

impl_->flushAsync(callback);
}

void Producer::producerFailMessages(Result result) {
if (impl_) {
ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
producerImpl->failPendingMessages(result);
}
}
} // namespace pulsar
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ProducerImpl.cc
Expand Up @@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) {
}

// this function can handle null pointer
BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL);
BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL);
}

void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Expand Up @@ -43,6 +43,8 @@ typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;

class PulsarFriend;

class Producer;

struct OpSendMsg {
Message msg_;
SendCallback sendCallback_;
Expand Down Expand Up @@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase,

friend class PulsarFriend;

friend class Producer;

friend class BatchMessageContainer;

virtual void connectionOpened(const ClientConnectionPtr& connection);
Expand Down
34 changes: 34 additions & 0 deletions pulsar-client-cpp/tests/BatchMessageTest.cc
Expand Up @@ -30,6 +30,8 @@
#include <thread>
#include "LogUtils.h"
#include "PulsarFriend.h"
#include <unistd.h>
#include <functional>
#include "ConsumerTest.h"
#include "HttpHelper.h"
DECLARE_LOG_OBJECT();
Expand All @@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) {
LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
}

static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); }

static int globalPublishCountSuccess = 0;
static int globalPublishCountQueueFull = 0;

Expand Down Expand Up @@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) {
// Number of messages consumed
ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
}

TEST(BatchMessageTest, producerFailureResult) {
std::string testName = std::to_string(epochTime) + "testCumulativeAck";

ClientConfiguration clientConfig;
clientConfig.setStatsIntervalInSeconds(100);

Client client(lookupUrl, clientConfig);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;

int batchSize = 100;
int numOfMessages = 10000;
ProducerConfiguration conf;

conf.setCompressionType(CompressionZLib);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBatchingMaxPublishDelayMs(50000);
conf.setBlockIfQueueFull(false);
conf.setMaxPendingMessages(10);

Result res = Result::ResultBrokerMetadataError;

client.createProducer(topicName, conf, producer);
Message msg = MessageBuilder().setContent("test").build();
producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res));
PulsarFriend::producerFailMessages(producer, res);
}
4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Expand Up @@ -56,6 +56,10 @@ class PulsarFriend {
return *producerImpl;
}

static void producerFailMessages(Producer producer, Result result) {
producer.producerFailMessages(result);
}

static ConsumerImpl& getConsumerImpl(Consumer consumer) {
ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get());
return *consumerImpl;
Expand Down

0 comments on commit e4790f3

Please sign in to comment.