-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recover cursor with correct readPosition and replay unackedMessages #446
Conversation
* @return | ||
*/ | ||
@Override | ||
public Set<? extends Position> getNotDeletedMessages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to go down this route.
When reading, the managed cursor is already skipping all the individually deleted messages. We should just set the read position to be always on (mark-delete + 1) when recovering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue is that skipping over the acked messages takes a lot of time, since the message stream over the individually deleted messages covers millions of messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reading, the managed cursor is already skipping all the individually deleted messages.
Yes, that's correct, cursor reads messages in chunk of size 100 and then filter out already deleted messages and then keep going on.
However, recently we have an issue for one of the subscription where there were large number of messages present between markDeletePosition
and last acked message in individuallyDeletedMessages
and out of which only few of them were unacked messages. So, cursor was keep reading from markDeletePosition
in chunk of size 100 and filtering out most of the messages. so, readPosition
was keep incrementing but consumer was not receiving the messages for long time.
Example: just added snapshot of individuallyDeletedMessages
and it was large
: {
"markDeletePosition" : "402212931:36356",
"readPosition" : "403263473:272",
"waitingReadOp" : false,
"pendingReadOps" : 1,
"messagesConsumedCounter" : -385138,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[(402212931:36357‥402258887:15270], (402258887:15271‥402312619:6246], (402312619:6247‥402343507:21490], .......
.......
.......
.......
(405361786:172156‥405361786:172253], (405361786:172265‥405361786:172272]]",
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. In this case then, we should skip the read position to the next available message.
Eg. :
- Mark-delete:
1:0
, - Individually deleted messages:
[(1:1..1:10]]
In this case 1:1
was not acked.
If I try to read 5 messages, I should get a list with 1:1
and the readPosition
should be moved to 1:11
.
This could be implemented such that, if I read and some of the messages were filtered, we check for the next unacked message to move the read position for next time.
My point here is that this issue should be handled at the ManagedLedger layer. By returning the unacked messages to the application we're kind of leaking back the logic to the broker.
Also, it might not only happen for the shared subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point here is that this issue should be handled at the ManagedLedger layer. By returning the unacked messages to the application we're kind of leaking back the logic to the broker.
Yes, that's correct, it should be handled by managed-layer only. Fixed it.
@@ -94,6 +94,8 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso | |||
this.readBatchSize = MaxReadBatchSize; | |||
this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() | |||
.getMaxUnackedMessagesPerSubscription(); | |||
this.cursor.getNotDeletedMessages().forEach(position -> this.messagesToReplay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change it to getNotDeletedMessages(this.messagesToReplay) to reduce garbage due to PositionImpl
i.e directly populate messagesToReplay instead of creating a temporary Set which will eventually be GC'ed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change LGTM. Just couple of minor details
* @param position | ||
* @return next availablePosition | ||
*/ | ||
private PositionImpl getNextAvailablePosition(PositionImpl position) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this method be combined with the cursor.getNextAvailablePosition()
?
if (log.isDebugEnabled()) { | ||
log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}", | ||
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count); | ||
} | ||
List<Entry> filteredEntries = cursor.filterReadEntries(returnedEntries); | ||
entries.addAll(filteredEntries); | ||
|
||
final Position nexReadPosition = getNextAvailablePosition(lastPosition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to avoid checking the range set if there are no filtered entries. There are some objects allocated in there when checking the ranges (which we should address at some point, though outside the scope of this change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
Right now, broker recovers stored individualDeletedMessages of cursor on cursor's initialization. However, it also sets cursor's readPosition to markDeletePosition and that can block message dispatching for sometime if distance between
readPosition
andlast-acked msg in individualDeletedMessages
is very large because dispatcher tries to read messages from readPosition and filters out deletedMessages from the list and it can take significant amount of time. Also, now unackMessages are not part of consumer's unackMessage list so, redelivery will not deliver these unack messages as well.Therefore, on cursor recovery broker should
Modifications
Result
It will fix message delivery delay at the bundle loading at broker. I think it should also fix #380