-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] Optimize message replay for large backlog consumer #3732
Conversation
} | ||
|
||
@Override | ||
public <T> Set<T> items(int numberOfItems, BiFunction<Long, Long, T> longPairConverter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use some kind a specialized interface we can avoid 2 long
-> Long
conversions:
interface LongPairFunction<T> {
T apply(long a, long b);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I wanted to create it custom interface to reduce number of obj creation but then I missed it. Good catch. I will fix it.
|
||
@Override | ||
public <T> Set<T> items(int numberOfItems, BiFunction<Long, Long, T> longPairConverter) { | ||
Set<T> items = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to return a set? typically the iteration methods are used to go through it inline without intermediary collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this method is used by PersistentDispatcherMultipleConsumers to prepare the sub-set of replay-messages and then it passes this set to cursor.asyncReplayEntries
to read the entries. So, this method is not used for inline execution.
@@ -288,8 +287,8 @@ public void readMoreEntries() { | |||
return; | |||
} | |||
|
|||
Set<PositionImpl> messagesToReplayNow = messagesToReplay.items(messagesToRead).stream() | |||
.map(pair -> new PositionImpl(pair.first, pair.second)).collect(toSet()); | |||
Set<PositionImpl> messagesToReplayNow = messagesToReplay.items(messagesToRead, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having the messagesToReplay
to be sorted, wouldn't be easier to just make the messagesToReplayNow
to be a SortedSet
?
In this case, we don't care to have messagesToReplay
to be always sorted, just when there is a redelivery is when we care.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't be easier to just make the messagesToReplayNow to be a SortedSet ?
No because messagesToReplayNow
is a very small sub-set (100 msgs) of messagesToReplay
(> 1M msgs) and the main issue is messagesToReplay
can have more than million random messages and we want to read messages from the same ledger. making ONLY messagesToReplayNow
sorted set will not help because there is a possibility that 100 read messages from messagesToReplay
might be random from different ledgers and it will still perform random read across multiple ledgers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's true.
rerun java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
rerun java8 tests |
rerun java8 tests |
1 similar comment
rerun java8 tests |
add tests fix items arg with logPairFunction interface
rerun integration tests |
Motivation
It address #3731.
Modification
ConcurrentSortedLongPairSet
that helps broker to avoid random read across multiple managed-ledgers. It still usesConcurrentLongPairSet
to avoid object allocation for message-ids.<T> Set<T> items(int numberOfItems, BiFunction<Long, Long, T> longPairConverter)
method inLongPairSet
to avoid creating temporary LongPair objects.Note
I have done performance testing on
GrowablePriorityLongPairQueue
which I had introduced sometime back but its insert/remove is super slow and CPU intensive so, we can't use it in this scenario.