Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed race condition while triggering message redelivery after an ack-timeout event #5276

Merged

Conversation

merlimat
Copy link
Contributor

Motivation

There is a race condition when processing a request for redeliveries after an ack-timeout event. The issue is around the update on the pendingAcksMap in Consumer handler.

When we request to re-deliver all messages, we do:

  1. trigger re-delivery
    i. go through pendingAcksMap and read some of the entries
    ii. when an entry is read, add the id to pendingAcksMap and then dispatch
  2. from the original thread:
    i. go through pendingAcksMap to update stats and then clear the map

If the reading of the entries to be redelivered is very fast (eg. immediately replayed from cache), the clearing might step in after the dispatching thread has re-added the pending entries.
Therefore, at the next iteration, the broker won't re-deliver these messages which will stay in backlog.

Modifications

Ensure that pendingAcksMap is correctly updated without a full clear and anyway before any dispatching will take place.

@merlimat merlimat added the type/bug The PR fixed a bug or issue reported a bug label Sep 25, 2019
@merlimat merlimat added this to the 2.4.2 milestone Sep 25, 2019
@merlimat merlimat self-assigned this Sep 25, 2019
@massakam
Copy link
Contributor

rerun java8 tests
rerun integration tests

@massakam
Copy link
Contributor

rerun java8 tests

1 similar comment
@massakam
Copy link
Contributor

rerun java8 tests

@merlimat
Copy link
Contributor Author

run java8 tests

2 similar comments
@merlimat
Copy link
Contributor Author

run java8 tests

@merlimat
Copy link
Contributor Author

run java8 tests

@merlimat
Copy link
Contributor Author

rerun java8 tests

});

for (PositionImpl p : pendingPositions) {
pendingAcks.remove(p.getLedgerId(), p.getEntryId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingPositions will have same number of positions as pendingAcks.. so, why can't we just clear pendingAcks here and anyway, subscription:: redeliverUnacknowledgedMessages is happening later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to avoid missing any insertion that might be happening into pendingAcks from a different thread. (eg: if the dispatcher thread was actually sending messages at the same time)

@rdhabalia
Copy link
Contributor

rerun java8 tests

@jiazhai
Copy link
Member

jiazhai commented Oct 7, 2019

rerun java8 tests

@merlimat merlimat merged commit e840375 into apache:master Oct 7, 2019
wolfstudy pushed a commit that referenced this pull request Nov 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants