Skip to content

Commit

Permalink
ARTEMIS-2174 Broker reconnect cause OOM with scale down
Browse files Browse the repository at this point in the history
When a node tries to reconnects to another node in a scale down cluster,
the reconnect request gets denied by the other node and keeps retrying,
which causes tasks in the ordered executor accumulate and eventually OOM.

The fix is to change the ActiveMQPacketHandler#handleCheckForFailover
to allow reconnect if the scale down node is the node itself.

(cherry picked from commit 6e89b22)
  • Loading branch information
howardgao authored and clebertsuconic committed Nov 14, 2018
1 parent 6f62113 commit 173b21e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
Expand Up @@ -120,7 +120,7 @@ public void handlePacket(final Packet packet) {

private void handleCheckForFailover(CheckFailoverMessage failoverMessage) {
String nodeID = failoverMessage.getNodeID();
boolean okToFailover = nodeID == null || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
boolean okToFailover = nodeID == null || server.getNodeID().toString().equals(nodeID) || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
channel1.send(new CheckFailoverReplyMessage(okToFailover));
}

Expand Down
Expand Up @@ -311,6 +311,10 @@ public void setReplicatedClusterName(String replicatedClusterName) {
this.replicatedClusterName = new SimpleString(replicatedClusterName);
}

public Map<SimpleString, ServerLocatorInternal> getLocators() {
return this.locators;
}

/**
* a handler for handling packets sent between the cluster.
*/
Expand Down
Expand Up @@ -16,9 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.server;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand All @@ -29,13 +35,18 @@
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
Expand Down Expand Up @@ -146,6 +157,76 @@ public void testBasicScaleDown() throws Exception {
removeConsumer(0);
}


@Test
public void testScaleDownNodeReconnect() throws Exception {

try {
ClusterController controller = servers[0].getClusterManager().getClusterController();

Map<SimpleString, ServerLocatorInternal> locatorsMap = controller.getLocators();
Iterator<Map.Entry<SimpleString, ServerLocatorInternal>> iter = locatorsMap.entrySet().iterator();
assertTrue(iter.hasNext());
Map.Entry<SimpleString, ServerLocatorInternal> entry = iter.next();
ServerLocatorImpl locator = (ServerLocatorImpl) entry.getValue();

waitForClusterConnected(locator);

servers[1].stop();

servers[1].start();

//by this moment server0 is trying to reconnect to server1
//In normal case server1 will check if the reconnection's scaleDown
//server has been scaled down before granting the connection.
//but if the scaleDown is server1 itself, it should grant
//the connection without checking scaledown state against it.
//Otherwise the connection will never be estabilished, and more,
//the repetitive reconnect attempts will cause
//ClientSessionFactory's closeExecutor to be filled with
//tasks that keep growing until OOM.
checkClusterConnectionExecutorNotBlocking(locator);
} finally {
servers[1].stop();
servers[0].stop();
}
}

private void checkClusterConnectionExecutorNotBlocking(ServerLocatorImpl locator) throws NoSuchFieldException, IllegalAccessException {
Field factoriesField = locator.getClass().getDeclaredField("factories");
factoriesField.setAccessible(true);
Set factories = (Set) factoriesField.get(locator);
assertEquals(1, factories.size());

ClientSessionFactoryImpl factory = (ClientSessionFactoryImpl) factories.iterator().next();

Field executorField = factory.getClass().getDeclaredField("closeExecutor");
executorField.setAccessible(true);
Executor pool = (Executor) executorField.get(factory);
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(()->
latch.countDown()
);
boolean result = false;
try {
result = latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
assertTrue("executor got blocked.", result);
}

private void waitForClusterConnected(ServerLocatorImpl locator) throws Exception {

boolean result = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !locator.getTopology().isEmpty();
}
}, 5000);

assertTrue("topology should not be empty", result);
}

@Test
public void testStoreAndForward() throws Exception {
final int TEST_SIZE = 50;
Expand Down

0 comments on commit 173b21e

Please sign in to comment.