diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index f7957d61..a799d5c8 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -198,7 +198,9 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition // override void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) { if (state_ != Ready) { - callback(ResultAlreadyClosed, msg.getMessageId()); + if (callback) { + callback(ResultAlreadyClosed, msg.getMessageId()); + } return; } @@ -209,7 +211,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition); // change me: abort or notify failure in callback? // change to appropriate error if callback - callback(ResultUnknownError, msg.getMessageId()); + if (callback) { + callback(ResultUnknownError, msg.getMessageId()); + } return; } // find a producer for that partition, index should start from 0 @@ -223,7 +227,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac producersLock.unlock(); // send message on that partition - producer->sendAsync(msg, callback); + if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) { + producer->sendAsync(msg, std::move(callback)); + } else { + // Wrapping the callback into a lambda has overhead, so we check if the producer is ready first + producer->getProducerCreatedFuture().addListener( + [msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) { + if (result == ResultOk) { + weakProducer.lock()->sendAsync(msg, std::move(callback)); + } else if (callback) { + callback(result, {}); + } + }); + } } // override diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 80ee7358..76a999a3 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -60,8 +60,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, userProvidedProducerName_(false), producerStr_("[" + topic() + ", " + producerName_ + "] "), producerId_(client->newProducerId()), - msgSequenceGenerator_(0), batchTimer_(executor_->createDeadlineTimer()), + lastSequenceIdPublished_(conf.getInitialSequenceId()), + msgSequenceGenerator_(lastSequenceIdPublished_ + 1), sendTimer_(executor_->createDeadlineTimer()), dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000), memoryLimitController_(client->getMemoryLimitController()), @@ -69,11 +70,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, interceptors_(interceptors) { LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic() << " id: " << producerId_); - - int64_t initialSequenceId = conf.getInitialSequenceId(); - lastSequenceIdPublished_ = initialSequenceId; - msgSequenceGenerator_ = initialSequenceId + 1; - if (!producerName_.empty()) { userProvidedProducerName_ = true; } diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 91b95443..2fb0b886 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -19,6 +19,7 @@ #ifndef LIB_PRODUCERIMPL_H_ #define LIB_PRODUCERIMPL_H_ +#include #include #include #include @@ -103,6 +104,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); } + bool ready() const { return producerCreatedPromise_.isComplete(); } + protected: ProducerStatsBasePtr producerStatsBasePtr_; @@ -169,13 +172,13 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { bool userProvidedProducerName_; std::string producerStr_; uint64_t producerId_; - int64_t msgSequenceGenerator_; std::unique_ptr batchMessageContainer_; DeadlineTimerPtr batchTimer_; PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr); - volatile int64_t lastSequenceIdPublished_; + std::atomic lastSequenceIdPublished_; + std::atomic msgSequenceGenerator_; std::string schemaVersion_; DeadlineTimerPtr sendTimer_; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 993c2fd0..3bc5a156 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -72,5 +72,5 @@ add_executable(Oauth2Test oauth2/Oauth2Test.cc) target_compile_options(Oauth2Test PRIVATE -DTEST_CONF_DIR="${TEST_CONF_DIR}") target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH}) -add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc) +add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc) target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH}) diff --git a/tests/chunkdedup/ChunkDedupTest.cc b/tests/chunkdedup/ChunkDedupTest.cc index 609511f2..4c9c28e7 100644 --- a/tests/chunkdedup/ChunkDedupTest.cc +++ b/tests/chunkdedup/ChunkDedupTest.cc @@ -18,7 +18,9 @@ */ #include #include +#include +#include "../HttpHelper.h" #include "lib/Latch.h" #include "lib/LogUtils.h" @@ -47,6 +49,30 @@ TEST(ChunkDedupTest, testSendChunks) { client.close(); } +TEST(ChunkDedupTest, testLazyPartitionedProducer) { + std::string topic = "test-lazy-partitioned-producer-" + std::to_string(time(nullptr)); + Client client{"pulsar://localhost:6650"}; + ProducerConfiguration conf; + conf.setLazyStartPartitionedProducers(true); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + + constexpr int numPartitions = 3; + int res = + makePutRequest("http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions", + std::to_string(numPartitions)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + for (int i = 0; i < 10; i++) { + const auto key = std::to_string(i % numPartitions); + MessageId msgId; + producer.send(MessageBuilder().setPartitionKey(key).setContent("msg-" + std::to_string(i)).build(), + msgId); + ASSERT_TRUE(msgId.ledgerId() >= 0 && msgId.entryId() >= 0) << "i: " << i << ", msgId: " << msgId; + } + client.close(); +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();