-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-storm] Fix NPE while emitting next tuple #3991
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small suggestions - else LGTM
|
||
while ((msg = failedMessages.peek()) != null) { | ||
MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId()); | ||
if (messageRetries != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this if statement to a separate function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't much worry about it because this piece of code is not being reused and nextTuple
anyway is calling emitNextAvailableTuple
and I don't think we want to do another method call for this small logic.
|
||
// messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries | ||
// then remove it from failed message queue as well. | ||
failedMessages.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a debug log here, in case people complain that their messages are not delivered or redelivered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes..let me add it.
run integration tests |
Motivation
[PulsarSpout] removes messages from pendingMessageRetries but it doesn't remove from the
failedMessages
queue because of that PulsarSpout throws NPE while emitting next tuple