Skip to content

[fix][broker]Can't consume messages for a long time due to Entry Filter#17782

Open
poorbarcode wants to merge 10 commits intoapache:masterfrom
poorbarcode:flaky/FilterEntryTest.java
Open

[fix][broker]Can't consume messages for a long time due to Entry Filter#17782
poorbarcode wants to merge 10 commits intoapache:masterfrom
poorbarcode:flaky/FilterEntryTest.java

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Sep 21, 2022

Fixes:

Motivation

When there are two consumers, users can specify the consumption behavior of each consumer by Entry filter:

  • case: consumer_1 can consume 60% of the messages, consumer_2 can consume 60% of the messages, and there is 10% intersection between consumer_1 and consumer_2

If returns FilterResult.RESCHEDULE for more than 10% of messages, then it's possible: some message that can only be consumed by consumer_1 keeps redelivered to consumer_2, and some message that can only be consumed by consumer_2 keeps redelivered to consumer_1. Then the problem occurs:

  • These messages can not be consumed anymore for a long time
  • The number of redeliveries of these messages has been increasing ( redelivery by Entry Filter ), see code below(line: 141):

} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
entry.release();
continue;
}

You can reproduce the problem by run FilterEntryTest.testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription 20 times

Modifications

When a message is redelivered by the same consumer more than 3 times, make that consumer pause to receive this message for 1 second.
Since tracking the consumption of all the messages cost memory too much, we trace only the earliest message.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Matching PR in forked repository

PR in forked repository:

@poorbarcode poorbarcode changed the title [ci-run][fix][broker]Can't consume messages for a long time due to Entry Filter [fix][broker]Can't consume messages for a long time due to Entry Filter Sep 21, 2022
@github-actions
Copy link

@poorbarcode Please provide a correct documentation label for your PR.
Instructions see Pulsar Documentation Label Guide.

@poorbarcode
Copy link
Contributor Author

This PR should merge into the following branches:

  • master
  • brnach-2.11

if (!redeliveryTracker.hasRedeliveredEntry(entries)){
return super.getNextConsumer();
}
if (redeliveryTracker instanceof InMemoryAndPreventCycleFilterRedeliveryTracker tracker){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not use "instanceof"
we should leverage polimorphism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems unresolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed

}
if (redeliveryTracker instanceof InMemoryAndPreventCycleFilterRedeliveryTracker tracker){
if (tracker.pausedConsumerCount() == consumerSet.size()){
log.warn("No consumers are currently able to consume the first redelivery entry {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will be logged may times, and actually it is not a problem.

I am testing with some heavy load, in order to tell more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already fixed.

return super.getNextConsumer();
} else {
Consumer nextConsumer = null;
while (true){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a endless loop in some cases.
we cannot keep this thread busy forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already fixed.

pausedConsumers.clear();
}

private static int comparePosition(Position pos1, Position pos2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method should go somewhere in Position or in some utility class
like comparePositionByEntryId

please check in the code if we already have something like that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed. I didn't find the right tool method, so I added a new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems you are not using the new utility method you introduce here

Copy link
Contributor Author

@poorbarcode poorbarcode Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is not used, we should drop it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already removed

return false;
}
// Automatically becomes invalid after 1s, because users may use time to filter the Entry.
return System.currentTimeMillis() - pauseTime < 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be configurable, maybe next to ServiceConfiguration#dispatcherEntryFilterRescheduledMessageDelay ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed. Could you please help me check whether DOC is suitable?


void clear();

default boolean hasRedeliveredEntry(List<Entry> entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the default implementation should be no-op, this way we don't add load to users who don't need this feature

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that since this method exists, it should provide the correct logic, so it should not be no-op

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui what's your take on this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method already moved into InMemoryAndPreventCycleFilterRedeliveryTracker, see #17782 (comment)

@poorbarcode poorbarcode force-pushed the flaky/FilterEntryTest.java branch from 2bb0869 to 8d3986c Compare October 4, 2022 07:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs release/4.0.10 Stale

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants