Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Negative acknowledgement doesn't remove the message id from UnAckedMessageTracker when message id is instance of BatchMessageIdImpl #6869

Closed
lhotari opened this issue May 4, 2020 · 11 comments
Assignees
Labels
area/client lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented May 4, 2020

Describe the bug

The actual symptom was that when using the DLQ feature, the redelivery counts were not consistent in a use case where negative acknowledgements are used. Messages would get redelivered more times than the configured maxRedeliverCount on the DeadLetterPolicy.

I observed this type of log messages in the log output:

14:20:07.080 [pulsar-timer-4-1] WARN  o.a.p.c.impl.UnAckedMessageTracker - [ConsumerBase{subscription='Test-Subscriber', consumerName='f194e', topic='test-topic'}] 5 messages have timed-out

By debugging, I noticed that calling org.apache.pulsar.client.api.Consumer#negativeAcknowledge(org.apache.pulsar.client.api.MessageId) doesn't remove the message id from UnAckedMessageTracker when the message is instance of BatchMessageIdImpl.

Expected behavior

org.apache.pulsar.client.impl.UnAckedMessageTracker implementation should encapsulate the fact that the message id must be MessageIdImpl and not BatchMessageIdImpl.

Currently the logic to first convert a BatchMessageIdImpl is done on the calling side (examples:

if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
stats.incrementNumAcksSent(batchMessageId.getBatchSize());
unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.remove(new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
}
} else {
,
if (id instanceof BatchMessageIdImpl) {
// do not add each item in batch message into tracker
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
}
if (hasParentConsumer) {
// we should no longer track this message, TopicsConsumer will take care from now onwards
unAckedMessageTracker.remove(id);
} else {
unAckedMessageTracker.add(id);
}
)

Since the caller has to convert the message id before calling UnAckedMessageTracker add or remove methods, it seems that this leads to error prone usage of the UnAckedMessageTracker class. Currently the conversion to MessageIdImpl is missing in the negative acknowledgement method:

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(messageId);

Work around

One workaround is to convert a possible BatchMessageIdImpl to MessageIdImpl before calling the negativeAcknowledge method. Something like this

   ...
   consumer.negativeAcknowledge(convertMessageIdForNack(message.getMessageId()));
   ...

    // workaround Pulsar bug regarding negative acknowledgements
    private static MessageId convertMessageIdForNack(MessageId messageId) {
        if (messageId instanceof BatchMessageIdImpl) {
            // use similar logic as there is in org.apache.pulsar.client.impl.NegativeAcksTracker#add
            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
            return new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
                    batchMessageId.getPartitionIndex());
        } else {
            return messageId;
        }
    }
@lhotari lhotari added the type/bug The PR fixed a bug or issue reported a bug label May 4, 2020
@lhotari
Copy link
Member Author

lhotari commented May 4, 2020

I wonder if issue #5969 is related.

@codelipenghui
Copy link
Contributor

@lhotari Yes, the problem is related to batch index acknowledgment. We will advance merge #6052. I think this issue can be fixed by enabling batch index acknowledgment.

@lhotari
Copy link
Member Author

lhotari commented Jun 2, 2020

@lhotari Yes, the problem is related to batch index acknowledgment. We will advance merge #6052. I think this issue can be fixed by enabling batch index acknowledgment.

@codelipenghui It seems that #6052 was merged to master, however there doesn't seem to be any new logic to properly handle negative acknowledgements when the message id is instance of BatchMessageIdImpl. Perhaps I missed it when I tried to find the way it's fixed.

@codelipenghui
Copy link
Contributor

@lhotari Thanks for your feedback. Looks #6052 just can avoid the acked messages in the batch message deliver to the consumer. We need to find a way to handle negative acknowledgment. We need to remove the message ID from the unack message tracker and send the redelivery request to the broker until all batch indexes of the batch message are processed(ack or negative ack).

@lhotari
Copy link
Member Author

lhotari commented Jun 8, 2020

Hi @codelipenghui , I noticed on Twitter a notice that the release for 2.6.0 will be cut soon. Would it be possible to include a fix in 2.6.0 for this nack & BatchMessageImpl issue?

@wolfstudy
Copy link
Member

Move the task to 2.6.2

@codelipenghui
Copy link
Contributor

@lhotari Sorry, I missed your last comment. We will try to fix it on 2.6.2.

@wolfstudy
Copy link
Member

Move this issue to 2.7.0.

@sijie sijie removed this from the 2.7.0 milestone Nov 12, 2020
@devinbost
Copy link
Contributor

Any updates on this?

@Technoboy-
Copy link
Contributor

Hi @lhotari , I think this issue has been fixed by #9440.

@lhotari
Copy link
Member Author

lhotari commented May 5, 2023

fixed by #9440

@lhotari lhotari closed this as completed May 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

7 participants