Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition
// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed, msg.getMessageId());
if (callback) {
callback(ResultAlreadyClosed, msg.getMessageId());
}
return;
}

Expand All @@ -209,7 +211,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
// change me: abort or notify failure in callback?
// change to appropriate error if callback
callback(ResultUnknownError, msg.getMessageId());
if (callback) {
callback(ResultUnknownError, msg.getMessageId());
}
return;
}
// find a producer for that partition, index should start from 0
Expand All @@ -223,7 +227,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
producersLock.unlock();

// send message on that partition
producer->sendAsync(msg, callback);
if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) {
producer->sendAsync(msg, std::move(callback));
} else {
// Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
producer->getProducerCreatedFuture().addListener(
[msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
if (result == ResultOk) {
weakProducer.lock()->sendAsync(msg, std::move(callback));
} else if (callback) {
callback(result, {});
}
});
}
}

// override
Expand Down
8 changes: 2 additions & 6 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,16 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
userProvidedProducerName_(false),
producerStr_("[" + topic() + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
batchTimer_(executor_->createDeadlineTimer()),
lastSequenceIdPublished_(conf.getInitialSequenceId()),
msgSequenceGenerator_(lastSequenceIdPublished_ + 1),
sendTimer_(executor_->createDeadlineTimer()),
dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000),
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
interceptors_(interceptors) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
<< " id: " << producerId_);

int64_t initialSequenceId = conf.getInitialSequenceId();
lastSequenceIdPublished_ = initialSequenceId;
msgSequenceGenerator_ = initialSequenceId + 1;

if (!producerName_.empty()) {
userProvidedProducerName_ = true;
}
Expand Down
7 changes: 5 additions & 2 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_

#include <atomic>
#include <boost/optional.hpp>
#include <list>
#include <memory>
Expand Down Expand Up @@ -103,6 +104,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {

ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); }

bool ready() const { return producerCreatedPromise_.isComplete(); }

protected:
ProducerStatsBasePtr producerStatsBasePtr_;

Expand Down Expand Up @@ -169,13 +172,13 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
bool userProvidedProducerName_;
std::string producerStr_;
uint64_t producerId_;
int64_t msgSequenceGenerator_;

std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
DeadlineTimerPtr batchTimer_;
PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);

volatile int64_t lastSequenceIdPublished_;
std::atomic<int64_t> lastSequenceIdPublished_;
std::atomic<int64_t> msgSequenceGenerator_;
std::string schemaVersion_;

DeadlineTimerPtr sendTimer_;
Expand Down
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE -DTEST_CONF_DIR="${TEST_CONF_DIR}")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})

add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
26 changes: 26 additions & 0 deletions tests/chunkdedup/ChunkDedupTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <time.h>

#include "../HttpHelper.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"

Expand Down Expand Up @@ -47,6 +49,30 @@ TEST(ChunkDedupTest, testSendChunks) {
client.close();
}

TEST(ChunkDedupTest, testLazyPartitionedProducer) {
std::string topic = "test-lazy-partitioned-producer-" + std::to_string(time(nullptr));
Client client{"pulsar://localhost:6650"};
ProducerConfiguration conf;
conf.setLazyStartPartitionedProducers(true);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));

constexpr int numPartitions = 3;
int res =
makePutRequest("http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

for (int i = 0; i < 10; i++) {
const auto key = std::to_string(i % numPartitions);
MessageId msgId;
producer.send(MessageBuilder().setPartitionKey(key).setContent("msg-" + std::to_string(i)).build(),
msgId);
ASSERT_TRUE(msgId.ledgerId() >= 0 && msgId.entryId() >= 0) << "i: " << i << ", msgId: " << msgId;
}
client.close();
}

int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down