From e4790f374d1036be43d1c091baae95b017ecd47c Mon Sep 17 00:00:00 2001 From: Easyfan Zheng Date: Mon, 5 Aug 2019 14:41:42 +0800 Subject: [PATCH] Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873) Definitely, this is a typo. This method is dealing with the Failed Message with the GIVEN result, but not a CERTAIN result. Contribution Checklist #4808 : TYPO in C++ client producer method for processing failure case Add c++ client producer failure message unit test case. UT passed: BatchMessageTest (cherry picked from commit b90b4ea1d1397e3707e6f51151492feff5a75ba6) --- pulsar-client-cpp/include/pulsar/Producer.h | 3 ++ pulsar-client-cpp/lib/Producer.cc | 7 +++++ pulsar-client-cpp/lib/ProducerImpl.cc | 2 +- pulsar-client-cpp/lib/ProducerImpl.h | 4 +++ pulsar-client-cpp/tests/BatchMessageTest.cc | 34 +++++++++++++++++++++ pulsar-client-cpp/tests/PulsarFriend.h | 4 +++ 6 files changed, 53 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h index c3f25ef3d420d..ae55093367dc6 100644 --- a/pulsar-client-cpp/include/pulsar/Producer.h +++ b/pulsar-client-cpp/include/pulsar/Producer.h @@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer { friend class PulsarWrapper; ProducerImplBasePtr impl_; + + // For unit test case BatchMessageTest::producerFailureResult only + void producerFailMessages(Result result); }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 1659a21e7878e..e02f1d8a5bb1f 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) { impl_->flushAsync(callback); } + +void Producer::producerFailMessages(Result result) { + if (impl_) { + ProducerImpl* producerImpl = static_cast(impl_.get()); + producerImpl->failPendingMessages(result); + } +} } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index bf9e3ac6e4f42..666df3b27f184 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) { } // this function can handle null pointer - BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL); + BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL); } void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index d7ac60342ae08..cb2a8a6e8a990 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -43,6 +43,8 @@ typedef std::shared_ptr MessageCryptoPtr; class PulsarFriend; +class Producer; + struct OpSendMsg { Message msg_; SendCallback sendCallback_; @@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase, friend class PulsarFriend; + friend class Producer; + friend class BatchMessageContainer; virtual void connectionOpened(const ClientConnectionPtr& connection); diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index c1bfe60895f92..62f68cfea6193 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -30,6 +30,8 @@ #include #include "LogUtils.h" #include "PulsarFriend.h" +#include +#include #include "ConsumerTest.h" #include "HttpHelper.h" DECLARE_LOG_OBJECT(); @@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) { LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString()); } +static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); } + static int globalPublishCountSuccess = 0; static int globalPublishCountQueueFull = 0; @@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) { // Number of messages consumed ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull); } + +TEST(BatchMessageTest, producerFailureResult) { + std::string testName = std::to_string(epochTime) + "testCumulativeAck"; + + ClientConfiguration clientConfig; + clientConfig.setStatsIntervalInSeconds(100); + + Client client(lookupUrl, clientConfig); + std::string topicName = "persistent://public/default/" + testName; + std::string subName = "subscription-name"; + Producer producer; + + int batchSize = 100; + int numOfMessages = 10000; + ProducerConfiguration conf; + + conf.setCompressionType(CompressionZLib); + conf.setBatchingMaxMessages(batchSize); + conf.setBatchingEnabled(true); + conf.setBatchingMaxPublishDelayMs(50000); + conf.setBlockIfQueueFull(false); + conf.setMaxPendingMessages(10); + + Result res = Result::ResultBrokerMetadataError; + + client.createProducer(topicName, conf, producer); + Message msg = MessageBuilder().setContent("test").build(); + producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res)); + PulsarFriend::producerFailMessages(producer, res); +} diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index 95c49f885f0e0..a50bd678c2493 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -56,6 +56,10 @@ class PulsarFriend { return *producerImpl; } + static void producerFailMessages(Producer producer, Result result) { + producer.producerFailMessages(result); + } + static ConsumerImpl& getConsumerImpl(Consumer consumer) { ConsumerImpl* consumerImpl = static_cast(consumer.impl_.get()); return *consumerImpl;