-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] The offline consumer is not removed from the consumerList #14970
base: master
Are you sure you want to change the base?
Conversation
#13787 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I wonder if it would be better to prevent adding duplicates to the list and doing a check in addConsumer? I think that would be a better solution to the problem.
I also have a suggestion to use removeIf
there isn't a way to prevent duplicates getting added to the list.
@Override | ||
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { | ||
// decrement unack-message count for removed consumer | ||
addUnAckedMessages(-consumer.getUnackedMessages()); | ||
if (consumerSet.removeAll(consumer) == 1) { | ||
consumerList.remove(consumer); | ||
removeAllConsumer(consumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removeAllConsumer
doesn't look great. I wonder if using .removeIf
would be better:
removeAllConsumer(consumer); | |
consumerList.removeIf(consumer::equals); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this sentiment. I wonder about the data structure itself, too. Why is it a list and not a set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @lhotari 's suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree to @lhotari 's suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed:
1.do a check in addConsumer;
2.use removeIf;
PTAL,thanks! @lhotari @eolivelli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lordcheng10 there's a minor checkstyle issue in the recent changes. please check that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
PTAL,thanks! @lhotari
In fact, the problem here may be the problem of consumer equality, similar to the previous producer problem. When there are duplicate consumers in the consumerList, it is possible to remove the actually successfully registered consumers when removing them. In our scenario, we also found a problem: when the consumer reconnects, the broker no longer pushes messages to these consumers. Through the stats command, we see these The consumer's permit is 0, and it is a consumer that has been removed from the consumerSet and consumers. The wrong consumer could be removed . So the fundamental problem here is actually the definition of consumer equality. I think we might need to do something like #12846 on the definition of equality for consumers. |
I think we also need to implement a method similar to isSuccessorTo to determine whether the consumer can be overridden, what do you think? @lhotari @michaeljmarshall @eolivelli like this:
|
@lordcheng10 - that is a great finding. I think it seems very reasonable to adjust the consumer logic, but I still haven't researched the consumer remove logic, so I am not authoritative on the subject. I wonder if we'd be able to add a test that shows the |
2.use removeIf;
log.warn("[{}] Consumer with the same id is already created:" | ||
+ " consumerId={}, consumer={}", | ||
consumer.cnx().clientAddress(), consumer.consumerId(), consumer); | ||
throw new BrokerServiceException("Consumer with the same id is already created!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using ConsumerBusyException ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
+ " consumerId={}, consumer={}", | ||
consumer.cnx().clientAddress(), consumer.consumerId(), consumer); | ||
throw new BrokerServiceException("Consumer with the same id is already created!"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove useless newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. PTAL,thanks! @eolivelli
2.remove useless newline;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
although adding a test will help in understanding how to reproduce the problem
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@@ -153,6 +153,13 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce | |||
throw new ConsumerBusyException("Subscription reached max consumers limit"); | |||
} | |||
|
|||
if (consumerList.contains(consumer)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide the case of duplicated consumer?
I wonder if we should replace the old consumer with the new one. The older consumer maybe somewhat expired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes so I think we should write a method like org.apache.pulsar.broker.service.Producer#isSuccessorTo to judge whether the previous consumer object should be overwritten by consumerEpoch,like this:
public boolean isSuccessorTo(Consumer other){
return Objects.equals(cnx.clientAddress(), other.cnx.clientAddress()) && consumerId == other.consumerId
&& other.consumerEpoch < consumerEpoch;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide the case of duplicated consumer?
OK, I will try to find duplicate cases.
/pulsarbot run-failure-checks |
The pr had no activity for 30 days, mark with Stale label. |
The pr had no activity for 30 days, mark with Stale label. |
Motivation
We found a problem: when the consumer was removed, it was not removed from the consumerList, but it was removed from the consumerSet and consumers:
The log to remove the consumer is as follows:
10:31:40.404 [BookKeeperClientWorker-OrderedExecutor-30-0] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://teg_onion_onion_logcenter_data_gz/teg_onion_onion_logcenter_data_gz/teg_onion_onion_logcenter_data_gz-partition-51, name=t_teg_onion_b_teg_onion_onion_logcenter_data_gz_cg_consumer_001}, consumerId=51, consumerName=ff7f4103a3, address=/11.181.202.252:55902} with pending 0 acks
Documentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)