Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cle… #2466

Closed
wants to merge 1 commit into from

Conversation

Projects
None yet
4 participants
@onlyMIT
Copy link
Contributor

commented Dec 17, 2018

Test environment

  1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on one server to test Artemis, set the 'cleanSession' property to true;
  2. MQTT client: paho 1.2.0;
  3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T

Issue

Issue 1
Artemis broker has the following exception log:
[Thread-0 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)] 17:27:59,035 WARN [org.apache.activemq.artemis.utils.actors.OrderedExecutor] null: java.lang.NullPointerException at org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:] at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_101] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:]

Issue 2
After closing all client connections, 64 queues were not cleaned up。

Analysis and simulation reproduction

When the MQTT consumer client (cleanSession property set to true) reconnected,There is a certain probability that the queue will not be automatically cleared and a NullPointerException will be thrown.
This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when the Artemis broker to start processing a ‘new MQTT connection’ while closing the ‘old MQTT connection’.
Create an MQTT consumer (cleanSession: true, clientID: superConsumer, topic: mit.test) and connect to the Artemis broker. Create another MQTT consumer to set the same cleanSession, clientID, and topic, then start connecting with the Artemis broker. Close the two MQTT connections, and so many times after repeated trials, there is a probability to reproduce the two problems mentioned above.

Solution

Issue 1

When 'session.getProtocolManager().isClientConnected(clientId, session.getConnection())' is called, if the 'MQTTConnection' instance retrieved from 'connectedClients' is 'null', a NullPointerException is thrown. Add a non-null decision in the 'MQTTProtocolManager.isClientConnected' method.

Issue 2

  1. Remove ‘InterruptedException’ from the ‘MQTTConnectionManager.getSessionState’ method because the ‘InterruptedException’ exception will never be thrown in this method;
  2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' methods add 'synchronized' with the MQTTSessionState instance as a lock.In the Artemis broker, all MQTT connections using the same clientId share the same MQTTSessionState instance. After adding this lock, you can avoid calling the 'connect' and 'disconnect' methods on the MQTT connections with the same clientId.
  3. For the MQTT protocol, there is one and only one consumer connection per queue, which is a good choice for closing the old MQTT consumer before the new MQTT consumer connects.
    The original code could not effectively clean up the 'old consumer' in the queue when the 'new MQTT connection' was connected to the Artemis broker. Modify ‘MQTTSubscriptionManager.removeSubscription’ to get the queue consumer collection from the ‘QueueImpl’ instance and close them.
@michaelandrepearce

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2018

This looks like it needs a JIRA and some automated tests.

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Dec 18, 2018

@michaelandrepearce JIRA has been created : https://issues.apache.org/jira/projects/ARTEMIS/issues/ARTEMIS-2206?filter=addedrecently

Regarding automated testing, in addition to the simulation method that I said, do you have any better suggestions?

@onlyMIT onlyMIT changed the title NO-JIRA The MQTT consumer reconnection caused the queue to not be cle… JIRA-2206 The MQTT consumer reconnection caused the queue to not be cle… Dec 18, 2018

@onlyMIT onlyMIT changed the title JIRA-2206 The MQTT consumer reconnection caused the queue to not be cle… ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cle… Dec 18, 2018

@@ -113,32 +113,7 @@
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl;
import org.apache.activemq.artemis.core.server.ActivateCallback;

This comment has been minimized.

Copy link
@clebertsuconic

clebertsuconic Dec 18, 2018

Contributor

This will fail checkstyle. no * imports

This comment has been minimized.

Copy link
@onlyMIT

onlyMIT Dec 18, 2018

Author Contributor

Thanks, this is my IDE problem, I will pay attention to it.

@clebertsuconic

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2018

Please... squash your commits, and use the JIRA on the commit.

It's kind of hard for me to review your changes here.

what I saw on the first commit makes this a nogo.. but I didn't know if you fixed it on a later commit or not?

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Dec 18, 2018

@clebertsuconic this is my first pull request on GitHub。Thank you for your understanding, I will try to use JIRA to submit the code, if you find the problem, I hope you correct me.

@clebertsuconic

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2018

you do this on your branch:

git rebase -i HEAD~4

for each line, squash all 3 later commits into the first one,

Once you squash you will be sent to a vim (or another editor you configured) where you can change the tittle.

make the title reflect the JIRA,

git push origin -f (your branch name)

That will be enough to update this PR.

@clebertsuconic

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2018

If you don't know git, do some googling about squash and interactive rebase.

git push -f will update the Pull Request.

@onlyMIT onlyMIT force-pushed the onlyMIT:onlyMIT-artemis-2.7.0 branch from 1256d1a to 08c6823 Dec 19, 2018

@onlyMIT onlyMIT closed this Dec 19, 2018

@onlyMIT onlyMIT force-pushed the onlyMIT:onlyMIT-artemis-2.7.0 branch from 08c6823 to 09b3e25 Dec 19, 2018

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Dec 19, 2018

@clebertsuconic Sorry, the pull request has been closed due to my misoperation, and it has now been restored. thank you very much for your help

@onlyMIT onlyMIT reopened this Dec 19, 2018

@onlyMIT onlyMIT force-pushed the onlyMIT:onlyMIT-artemis-2.7.0 branch 3 times, most recently from c0164de to a17df85 Dec 19, 2018

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Dec 21, 2018

@michaelandrepearce @clebertsuconic Added automated testing. And added locks for sub and unSub operations。Please help check

@@ -3604,4 +3605,14 @@ private void deployReloadableConfigFromConfiguration() throws Exception {
return externalComponents;
}

@Override
public Set<Consumer> queueConsumersQuery(SimpleString queueName) {

This comment has been minimized.

Copy link
@jbertram

jbertram Jan 9, 2019

Contributor

This method isn't necessary. You can use the locateQueue method and simply invoke getConsumers on the returned value.

clientProvider.disconnect();
}
clientProviders.clear();
assertTrue(waitForBindings(server, queueName, false, 0, 0, 10000));

This comment has been minimized.

Copy link
@jbertram

jbertram Jan 9, 2019

Contributor

From what I can tell the overridden version of waitForBindings isn't necessary. You could just use something like:

assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null));

This comment has been minimized.

Copy link
@onlyMIT

onlyMIT Jan 10, 2019

Author Contributor

I pushed an update to addressyour suggestion. And removed synchronized from some methods ( MQTTConnectionManager.connect, MQTTConnectionManager.disconnect, MQTTSubscriptionManager.removeSubscription ).

@onlyMIT onlyMIT force-pushed the onlyMIT:onlyMIT-artemis-2.7.0 branch 2 times, most recently from a5f63e7 to d69508d Jan 10, 2019

@jbertram

This comment has been minimized.

Copy link
Contributor

commented Jan 11, 2019

@onlyMIT , this change is causing failures in org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest (which is not run by the PR build). Can you take a look at this?

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Jan 11, 2019

@jbertram Thanks to you review.I will check it tomorrow.

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Jan 12, 2019

@jbertram I found an interesting Issue; That is why my RP failures in org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.The master code in removeSubscription method, do not use locateQueue to query queue.

ServerConsumer consumer = consumers.get(address);
consumers.remove(address);
if (consumer != null) {
consumer.close(false);
consumerQoSLevels.remove(consumer.getID());
}

Tested in the master!!
Issue
Use MqttClusterRemoteSubscribeTest test. The internal queue name equal address name(use addressName to save Binding). It is wrong.
Use MQTTTest.testNoMessageReceivedAfterUnsubscribeMQTT test. The internal queue name equal "clientId+'.'addressName"(use "clientId'.'+addressName" to save Binding). It is good.

I created a jira ARTEMIS-2226
But I haven’t found out what caused this issue. Still looking for why!!!!!!!

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Jan 13, 2019

@jbertram I have a question to consult you and hope you can answer it.

  1. Why is there MULTICAST and ANYCAST routing types for Artemis subscriptions to MQTT.I think there should be only MULTICAST routing type in the MQTT protocol.
    Perhaps the purpose of this design is to enable Artemis' MQTT implementation to support both publish/subscribe and shared consumption functions.But it doesn't seem to support that.
    Two consumers using the same clientId, defining the same topic on separate nodes, produce ANYCAST addresses(as org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest).However, two consumers using the same clientId subscribe to the same address in the same node, and the latter consumer connection will cause the former consumer connection to be forcibly closed, and no ANYCAST address will be generated.

Even these are not problems. But this will export when using different keys(use addressName as key or use "clientId+'.'+addressName" as key) to store the binding, we can't make sure that all calls to the locateQueue(SimpleString queueName) method take this into account(as in addSubscription method,only use "clientId+'.'+addressName" as key to call locateQueue(SimpleString queueName) .I think most people are not aware of this detail when calling the locateQueue(SimpleString queueName) method or even if it is easy to be ignored, it is very unsafe.

I hope you can help me identify the problem so that I can determine how to solve it.

@clebertsuconic

This comment has been minimized.

Copy link
Contributor

commented Jan 17, 2019

@jbertram / @onlyMIT what's the status here? I was looking on PRs to merge but this one seems to be failing a test?

@jbertram

This comment has been minimized.

Copy link
Contributor

commented Jan 17, 2019

@clebertsuconic, this is still in progress. I haven't had any time to follow-up yet.

ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not b…
…e cleared, and caused Artemis broker to throw a NullPointerException.

When the MQTT consumer client (cleanSession property set to true) reconnected, there are certain probabilities that these two bugs will occur.
This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when new and old connections occur while operating the same queue for unsafe behavior.

ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cleared, and caused Artemis broker to throw a NullPointerException.

checkstyle fix

ARTEMIS-2206 Add automated testing

Sub and unSub also need to lock。And add automated testing.

ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cleared

Removed "synchronized" from some methods.
Added 'synchronized' to the 'getSessionState' method.
Resolve code conflicts

ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cleared

address MQTT consumer uses ANYCAST routing type to subscribe topic.

@onlyMIT onlyMIT force-pushed the onlyMIT:onlyMIT-artemis-2.7.0 branch from d69508d to f0b8611 Jan 18, 2019

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Jan 18, 2019

@clebertsuconic
I have changed this RP to address org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.
This change can address both ANYCAST and MULTICAST routing. Whether the ANYCAST is retained or deleted after the change will not cause problems with this RP's code.
Regarding the use of the ANYCAST route type subscription message in MQTT, I am trying to find a suitable way to solve the problem of inconsistent test results in clusters and single nodes. And I will use a new RP to address this.
Please help to review,Thanks (⁎˃ᴗ˂⁎)!

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Jan 29, 2019

I used to think that the issue with ARTEMIS-2226 causing failures in org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.
Actually, I was wrong.
I already created a new PR #2528 to resolve ARTEMIS-2226 .
I think this PR is done.

@michaelandrepearce

This comment has been minimized.

Copy link
Contributor

commented Mar 12, 2019

@onlyMIT i merged the newer pr, could you close this one then?

@clebertsuconic

This comment has been minimized.

Copy link
Contributor

commented Mar 12, 2019

I'm working on this

@asfgit asfgit closed this in c7fa858 Mar 12, 2019

@onlyMIT

This comment has been minimized.

Copy link
Contributor Author

commented Mar 13, 2019

@clebertsuconic @michaelandrepearce
This PR is different from #2528, it is to solve another issue. the issue that the queue cannot be cleared when the cleanSession=true consumer is closed.
And I am not sure if anyone has reviewed the latest version I submitted, because I didn't receive feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.