Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-1257 don't leak kafka messages on failed send to broker #811

Closed
wants to merge 1 commit into from

Conversation

szaszm
Copy link
Member

@szaszm szaszm commented Jun 12, 2020

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

if (err) {
// the message only takes ownership of the headers in case of success
RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
// in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
// in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this processor, could we have a scenario where the messageDeliveryCallback is not eventually called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand the API documentation, in case this function returns no error, the callback should be called.

Copy link
Contributor

@arpadboda arpadboda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, tests show no leak even in very high throughput case.

@arpadboda arpadboda closed this in bd707e4 Jun 15, 2020
@szaszm szaszm deleted the MINIFICPP-1257 branch June 15, 2020 19:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants