-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [broker] Make the new exclusive consumer instead the inactive one faster #21183
[fix] [broker] Make the new exclusive consumer instead the inactive one faster #21183
Conversation
Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); | ||
if (actConsumer != null) { | ||
actConsumer.cnx().checkConnectionLiveness(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check to be done?
Sure, I did this improve.
(Highlight) I have a concern:
Background: the PR #20026 changed the method dispatcher.addConsumer
to an asynchronous method, it broke the lock of synchronized(dispathcer.this)
, this change only affected the releases larger than 3.0.0
.
Concern: The improvement "wait for the connection liveness check done" relies on the asynchronous method dispatcher.addConsumer
. I am thinking about whether to accept the patch #20026 and fix the broken lock(this would make the logic complex), or revert this patch to make the logic simple.
I'd like to know your advice on the concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking with @codelipenghui @BewareMyPower @RobertIndie @gaoran10 @Technoboy- , I will write a new PR to fix the lock that broke by the PR #20026
...r/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
Outdated
Show resolved
Hide resolved
caf5428
to
653d031
Compare
rebase master |
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client assumed the connection was inactive, but the Broker assumed the connection was fine.
In addition, could you explain in which case could the case described in the PR happen?
pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(5000); | ||
} | ||
|
||
private AtomicBoolean startChannelMonitorToHandleUserTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is hard to understand. Maybe it's better to add some comments.
I removed AtomicBoolean channel1MonitorStopped = startChannelMonitorToHandleUserTask();
and channel1MonitorStopped.set(true);
and the tests still passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel
is typed EmbeddedChannel
. Once we call channel.execute(runnable)
, there is no background thread to run it.
So starting a background thread to trigger the tasks in the queue will make the test more stable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment for this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed AtomicBoolean channel1MonitorStopped = startChannelMonitorToHandleUserTask(); and channel1MonitorStopped.set(true); and the tests still passed.
- If you run the test
testHandleConsumerAfterClientChannelInactive
, It has a high probability of failure. - If you run the test
testHandleConsumerAfterClientChannelInactiveWhenDisabledFeatureConnectionLivenessCheckTimeoutMillis
, it will always be passed, because it just confirms that this fix will not be affected if disabled the featureconnectionLivenessCheckTimeoutMillis
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
Outdated
Show resolved
Hide resolved
653d031
to
34ab245
Compare
...r/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
Outdated
Show resolved
Hide resolved
…ne faster (#21183) ### Motivation There is an issue similar to the #21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` ### Modifications - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8)
…ne faster (#21183) There is an issue similar to the #21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8)
…ne faster (#21183) There is an issue similar to the #21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8)
…ne faster (apache#21183) There is an issue similar to the apache#21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8) (cherry picked from commit b796f56)
…ne faster (apache#21183) There is an issue similar to the apache#21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8) (cherry picked from commit b796f56)
…ne faster (apache#21183) There is an issue similar to the apache#21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8) (cherry picked from commit b796f56)
…ne faster (apache#21183) There is an issue similar to the apache#21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8) (cherry picked from commit b796f56)
…ne faster (apache#21183) There is an issue similar to the apache#21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8) (cherry picked from commit b796f56)
…ne faster (apache#21183) There is an issue similar to the apache#21155 fixed one. The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected` - Check the connection of the old consumer is available when the new one tries to subscribe (cherry picked from commit 29db8f8) (cherry picked from commit b796f56)
Motivation
There is an issue similar to the #21155 fixed one.
The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect an exclusive consumer, then got an error
Exclusive consumer is already connected
Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x