Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-1257 don't leak kafka messages on failed send to broker #811

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 10 additions & 6 deletions extensions/librdkafka/PublishKafka.h
Expand Up @@ -217,10 +217,10 @@ class PublishKafka : public core::Processor {
}

rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
const auto messages_copy = this->messages_;
const std::shared_ptr<Messages> messages_ptr_copy = this->messages_;
const auto flow_file_index_copy = this->flow_file_index_;
const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
auto &message = flow_file.messages.at(segment_num);
message.err_code = rkmessage->err;
message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
Expand All @@ -233,9 +233,13 @@ class PublishKafka : public core::Processor {

const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
if (err) {
// the message only takes ownership of the headers in case of success
RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
// in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this processor, could we have a scenario where the messageDeliveryCallback is not eventually called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand the API documentation, in case this function returns no error, the callback should be called.

(void)callback_ptr.release();
} else {
// in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
rd_kafka_headers_destroy(hdrs_copy);
}
return err;
Expand Down